stillbox/pkg/talkgroups/cache.go

258 lines
5.2 KiB
Go
Raw Normal View History

2024-11-03 07:58:41 -05:00
package talkgroups
2024-10-31 16:14:38 -04:00
import (
"context"
2024-11-03 09:45:51 -05:00
"errors"
2024-11-02 11:39:02 -04:00
"sync"
"time"
2024-10-31 16:14:38 -04:00
2024-11-04 11:48:31 -05:00
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/pkg/config"
2024-11-03 07:19:03 -05:00
"dynatron.me/x/stillbox/pkg/database"
2024-11-02 09:41:48 -04:00
2024-11-02 11:39:02 -04:00
"github.com/jackc/pgx/v5"
2024-11-02 09:41:48 -04:00
"github.com/rs/zerolog/log"
2024-10-31 16:14:38 -04:00
)
2024-11-04 11:15:24 -05:00
type tgMap map[ID]*Talkgroup
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
type Store interface {
2024-11-03 09:45:51 -05:00
// TG retrieves a Talkgroup from the Store.
2024-11-04 11:15:24 -05:00
TG(ctx context.Context, tg ID) (*Talkgroup, error)
// TGs retrieves many talkgroups from the Store.
TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error)
2024-11-02 09:41:48 -04:00
2024-11-03 08:09:49 -05:00
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
2024-11-02 11:39:02 -04:00
2024-11-03 08:09:49 -05:00
// Load loads the provided packed talkgroup IDs into the Store.
2024-11-02 11:39:02 -04:00
Load(ctx context.Context, tgs []int64) error
2024-11-03 08:09:49 -05:00
// Invalidate invalidates any caching in the Store.
2024-11-02 11:39:02 -04:00
Invalidate()
2024-11-04 11:48:31 -05:00
// Weight returns the final weight of this talkgroup, including its static and rules-derived weight.
Weight(ctx context.Context, id ID, t time.Time) float64
// Hupper
HUP(*config.Config)
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
type CtxStoreKeyT string
const CtxStoreKey CtxStoreKeyT = "store"
func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, CtxStoreKey, s)
}
func StoreFrom(ctx context.Context) Store {
s, ok := ctx.Value(CtxStoreKey).(Store)
if !ok {
return NewCache()
}
return s
}
func (t *cache) HUP(_ *config.Config) {
t.Invalidate()
}
2024-11-03 08:09:49 -05:00
func (t *cache) Invalidate() {
2024-11-02 11:39:02 -04:00
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
2024-11-03 14:11:38 -05:00
t.AlertConfig.Invalidate()
2024-11-02 11:39:02 -04:00
}
2024-11-03 08:09:49 -05:00
type cache struct {
2024-11-02 11:39:02 -04:00
sync.RWMutex
2024-11-02 09:41:48 -04:00
AlertConfig
2024-10-31 16:14:38 -04:00
tgs tgMap
systems map[int32]string
}
2024-11-03 08:09:49 -05:00
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
2024-11-02 09:41:48 -04:00
tgs: make(tgMap),
systems: make(map[int32]string),
2024-11-03 14:11:38 -05:00
AlertConfig: NewAlertConfig(),
2024-10-31 16:14:38 -04:00
}
2024-11-02 11:39:02 -04:00
return tgc
}
2024-11-03 08:09:49 -05:00
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
2024-11-02 11:39:02 -04:00
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
2024-10-31 16:14:38 -04:00
}
2024-11-04 11:15:24 -05:00
func (t *cache) add(rec *Talkgroup) error {
2024-11-03 14:11:38 -05:00
t.Lock()
defer t.Unlock()
2024-11-03 08:44:34 -05:00
tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid))
2024-11-02 11:39:02 -04:00
t.tgs[tg] = rec
2024-11-03 08:44:34 -05:00
t.systems[int32(rec.System.ID)] = rec.System.Name
2024-11-02 11:39:02 -04:00
2024-11-03 14:11:38 -05:00
return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig)
2024-11-02 11:39:02 -04:00
}
2024-11-04 11:15:24 -05:00
func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) *Talkgroup {
return &Talkgroup{
2024-11-03 09:45:51 -05:00
Talkgroup: r.Talkgroup,
System: r.System,
Learned: r.Learned,
}
}
2024-11-04 11:15:24 -05:00
func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
r := make([]*Talkgroup, 0, len(tgs))
toGet := make(IDs, 0, len(tgs))
t.RLock()
for _, id := range tgs {
rec, has := t.tgs[id]
if has {
r = append(r, rec)
} else {
toGet = append(toGet, id)
}
}
t.RUnlock()
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, toGet.Packed())
if err != nil {
return nil, err
}
for _, rec := range tgRecords {
tg := rowToTalkgroup(rec)
err := t.add(tg)
if err != nil {
return nil, err
}
r = append(r, tg)
}
return r, nil
}
2024-11-03 08:09:49 -05:00
func (t *cache) Load(ctx context.Context, tgs []int64) error {
2024-11-02 11:39:02 -04:00
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
2024-10-31 16:14:38 -04:00
if err != nil {
return err
}
for _, rec := range tgRecords {
2024-11-03 09:45:51 -05:00
err := t.add(rowToTalkgroup(rec))
2024-11-02 09:41:48 -04:00
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
2024-10-31 16:14:38 -04:00
}
return nil
}
2024-11-03 14:16:26 -05:00
var ErrNotFound = errors.New("talkgroup not found")
2024-11-03 09:45:51 -05:00
func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
tg, err := t.TG(ctx, id)
if err != nil {
return 1.0
}
m := float64(tg.Weight)
m *= t.AlertConfig.ApplyAlertRules(id, tm)
2024-11-03 19:22:38 -05:00
return float64(m)
}
2024-11-04 11:15:24 -05:00
func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
2024-11-02 11:39:02 -04:00
t.RLock()
2024-10-31 16:14:38 -04:00
rec, has := t.tgs[tg]
2024-11-02 11:39:02 -04:00
t.RUnlock()
2024-10-31 16:14:38 -04:00
2024-11-02 11:39:02 -04:00
if has {
2024-11-03 09:45:51 -05:00
return rec, nil
2024-11-02 11:39:02 -04:00
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
2024-11-04 11:15:24 -05:00
return nil, ErrNotFound
2024-11-02 11:39:02 -04:00
default:
log.Error().Err(err).Msg("TG() cache add db get")
2024-11-04 11:15:24 -05:00
return nil, errors.Join(ErrNotFound, err)
2024-11-02 11:39:02 -04:00
}
if len(recs) < 1 {
2024-11-04 11:15:24 -05:00
return nil, ErrNotFound
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
err = t.add(rowToTalkgroup(recs[0]))
2024-11-02 11:39:02 -04:00
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
2024-11-03 14:16:26 -05:00
return rowToTalkgroup(recs[0]), errors.Join(ErrNotFound, err)
2024-11-02 11:39:02 -04:00
}
2024-11-03 09:45:51 -05:00
return rowToTalkgroup(recs[0]), nil
2024-10-31 16:14:38 -04:00
}
2024-11-03 08:09:49 -05:00
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
2024-11-03 14:11:38 -05:00
t.RLock()
2024-10-31 16:14:38 -04:00
n, has := t.systems[int32(id)]
2024-11-03 14:11:38 -05:00
t.RUnlock()
2024-11-02 11:39:02 -04:00
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
2024-11-03 14:11:38 -05:00
t.Lock()
t.systems[int32(id)] = sys
t.Unlock()
2024-11-02 11:39:02 -04:00
return sys, true
}
2024-10-31 16:14:38 -04:00
return n, has
}