stillbox/pkg/talkgroups/cache.go

223 lines
4.5 KiB
Go
Raw Normal View History

2024-11-03 07:58:41 -05:00
package talkgroups
2024-10-31 16:14:38 -04:00
import (
"context"
2024-11-03 09:45:51 -05:00
"errors"
2024-11-02 11:39:02 -04:00
"sync"
"time"
2024-10-31 16:14:38 -04:00
2024-11-04 11:48:31 -05:00
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/pkg/config"
2024-11-03 07:19:03 -05:00
"dynatron.me/x/stillbox/pkg/database"
2024-11-02 09:41:48 -04:00
2024-11-02 11:39:02 -04:00
"github.com/jackc/pgx/v5"
2024-11-02 09:41:48 -04:00
"github.com/rs/zerolog/log"
2024-10-31 16:14:38 -04:00
)
2024-11-03 09:45:51 -05:00
type tgMap map[ID]Talkgroup
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
type Store interface {
2024-11-03 09:45:51 -05:00
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (Talkgroup, error)
2024-11-02 09:41:48 -04:00
2024-11-03 08:09:49 -05:00
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
2024-11-02 11:39:02 -04:00
2024-11-03 08:09:49 -05:00
// Load loads the provided packed talkgroup IDs into the Store.
2024-11-02 11:39:02 -04:00
Load(ctx context.Context, tgs []int64) error
2024-11-03 08:09:49 -05:00
// Invalidate invalidates any caching in the Store.
2024-11-02 11:39:02 -04:00
Invalidate()
2024-11-04 11:48:31 -05:00
// Weight returns the final weight of this talkgroup, including its static and rules-derived weight.
Weight(ctx context.Context, id ID, t time.Time) float64
// Hupper
HUP(*config.Config)
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
type CtxStoreKeyT string
const CtxStoreKey CtxStoreKeyT = "store"
func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, CtxStoreKey, s)
}
func StoreFrom(ctx context.Context) Store {
s, ok := ctx.Value(CtxStoreKey).(Store)
if !ok {
return NewCache()
}
return s
}
func (t *cache) HUP(_ *config.Config) {
t.Invalidate()
}
2024-11-03 08:09:49 -05:00
func (t *cache) Invalidate() {
2024-11-02 11:39:02 -04:00
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
2024-11-03 14:11:38 -05:00
t.AlertConfig.Invalidate()
2024-11-02 11:39:02 -04:00
}
2024-11-03 08:09:49 -05:00
type cache struct {
2024-11-02 11:39:02 -04:00
sync.RWMutex
2024-11-02 09:41:48 -04:00
AlertConfig
2024-10-31 16:14:38 -04:00
tgs tgMap
systems map[int32]string
}
2024-11-03 08:09:49 -05:00
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
2024-11-02 09:41:48 -04:00
tgs: make(tgMap),
systems: make(map[int32]string),
2024-11-03 14:11:38 -05:00
AlertConfig: NewAlertConfig(),
2024-10-31 16:14:38 -04:00
}
2024-11-02 11:39:02 -04:00
return tgc
}
2024-11-03 08:09:49 -05:00
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
2024-11-02 11:39:02 -04:00
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
2024-10-31 16:14:38 -04:00
}
2024-11-03 09:45:51 -05:00
func (t *cache) add(rec Talkgroup) error {
2024-11-03 14:11:38 -05:00
t.Lock()
defer t.Unlock()
2024-11-03 08:44:34 -05:00
tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid))
2024-11-02 11:39:02 -04:00
t.tgs[tg] = rec
2024-11-03 08:44:34 -05:00
t.systems[int32(rec.System.ID)] = rec.System.Name
2024-11-02 11:39:02 -04:00
2024-11-03 14:11:38 -05:00
return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig)
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup {
return Talkgroup{
Talkgroup: r.Talkgroup,
System: r.System,
Learned: r.Learned,
}
}
2024-11-03 08:09:49 -05:00
func (t *cache) Load(ctx context.Context, tgs []int64) error {
2024-11-02 11:39:02 -04:00
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
2024-10-31 16:14:38 -04:00
if err != nil {
return err
}
for _, rec := range tgRecords {
2024-11-03 09:45:51 -05:00
err := t.add(rowToTalkgroup(rec))
2024-11-02 09:41:48 -04:00
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
2024-10-31 16:14:38 -04:00
}
return nil
}
2024-11-03 14:16:26 -05:00
var ErrNotFound = errors.New("talkgroup not found")
2024-11-03 09:45:51 -05:00
func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
tg, err := t.TG(ctx, id)
if err != nil {
return 1.0
}
m := float64(tg.Weight)
m *= t.AlertConfig.ApplyAlertRules(id, tm)
2024-11-03 19:22:38 -05:00
return float64(m)
}
2024-11-03 09:45:51 -05:00
func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) {
2024-11-02 11:39:02 -04:00
t.RLock()
2024-10-31 16:14:38 -04:00
rec, has := t.tgs[tg]
2024-11-02 11:39:02 -04:00
t.RUnlock()
2024-10-31 16:14:38 -04:00
2024-11-02 11:39:02 -04:00
if has {
2024-11-03 09:45:51 -05:00
return rec, nil
2024-11-02 11:39:02 -04:00
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
2024-11-03 14:16:26 -05:00
return Talkgroup{}, ErrNotFound
2024-11-02 11:39:02 -04:00
default:
log.Error().Err(err).Msg("TG() cache add db get")
2024-11-03 14:16:26 -05:00
return Talkgroup{}, errors.Join(ErrNotFound, err)
2024-11-02 11:39:02 -04:00
}
if len(recs) < 1 {
2024-11-03 14:16:26 -05:00
return Talkgroup{}, ErrNotFound
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
err = t.add(rowToTalkgroup(recs[0]))
2024-11-02 11:39:02 -04:00
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
2024-11-03 14:16:26 -05:00
return rowToTalkgroup(recs[0]), errors.Join(ErrNotFound, err)
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
return rowToTalkgroup(recs[0]), nil
2024-10-31 16:14:38 -04:00
}
2024-11-03 08:09:49 -05:00
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
2024-11-03 14:11:38 -05:00
t.RLock()
2024-10-31 16:14:38 -04:00
n, has := t.systems[int32(id)]
2024-11-03 14:11:38 -05:00
t.RUnlock()
2024-11-02 11:39:02 -04:00
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
2024-11-03 14:11:38 -05:00
t.Lock()
t.systems[int32(id)] = sys
t.Unlock()
2024-11-02 11:39:02 -04:00
return sys, true
}
2024-10-31 16:14:38 -04:00
return n, has
}