stillbox/pkg/talkgroups/store.go

361 lines
7.5 KiB
Go
Raw Normal View History

2024-11-03 07:58:41 -05:00
package talkgroups
2024-10-31 16:14:38 -04:00
import (
"context"
2024-11-03 09:45:51 -05:00
"errors"
2024-11-20 07:26:59 -05:00
"strings"
2024-11-02 11:39:02 -04:00
"sync"
"time"
2024-10-31 16:14:38 -04:00
2024-11-20 07:26:59 -05:00
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/config"
2024-11-03 07:19:03 -05:00
"dynatron.me/x/stillbox/pkg/database"
2024-11-02 09:41:48 -04:00
2024-11-02 11:39:02 -04:00
"github.com/jackc/pgx/v5"
2024-11-02 09:41:48 -04:00
"github.com/rs/zerolog/log"
2024-10-31 16:14:38 -04:00
)
2024-11-04 11:15:24 -05:00
type tgMap map[ID]*Talkgroup
2024-10-31 16:14:38 -04:00
2024-11-10 10:13:38 -05:00
var (
ErrNotFound = errors.New("talkgroup not found")
ErrNoSuchSystem = errors.New("no such system")
)
2024-11-03 08:09:49 -05:00
type Store interface {
2024-11-10 10:13:38 -05:00
// UpdateTG updates a talkgroup record.
UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error)
2024-11-20 07:26:59 -05:00
// UpsertTGs upserts a slice of talkgroups.
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error)
2024-11-03 09:45:51 -05:00
// TG retrieves a Talkgroup from the Store.
2024-11-04 11:15:24 -05:00
TG(ctx context.Context, tg ID) (*Talkgroup, error)
// TGs retrieves many talkgroups from the Store.
TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error)
2024-11-02 09:41:48 -04:00
2024-11-04 23:41:52 -05:00
// SystemTGs retrieves all Talkgroups associated with a System.
SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error)
2024-11-03 08:09:49 -05:00
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
2024-10-31 16:14:38 -04:00
2024-11-03 08:09:49 -05:00
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
2024-11-02 11:39:02 -04:00
2024-11-15 10:37:58 -05:00
// Load loads the provided talkgroup ID tuples into the Store.
Load(ctx context.Context, tgs database.TGTuples) error
2024-11-03 08:09:49 -05:00
// Invalidate invalidates any caching in the Store.
2024-11-02 11:39:02 -04:00
Invalidate()
2024-11-04 11:48:31 -05:00
// Weight returns the final weight of this talkgroup, including its static and rules-derived weight.
Weight(ctx context.Context, id ID, t time.Time) float64
// Hupper
HUP(*config.Config)
2024-11-02 11:39:02 -04:00
}
2024-11-10 10:28:04 -05:00
type storeCtxKey string
2024-11-03 09:45:51 -05:00
2024-11-10 10:28:04 -05:00
const StoreCtxKey storeCtxKey = "store"
2024-11-03 09:45:51 -05:00
func CtxWithStore(ctx context.Context, s Store) context.Context {
2024-11-10 10:28:04 -05:00
return context.WithValue(ctx, StoreCtxKey, s)
2024-11-03 09:45:51 -05:00
}
func StoreFrom(ctx context.Context) Store {
2024-11-10 10:28:04 -05:00
s, ok := ctx.Value(StoreCtxKey).(Store)
2024-11-03 09:45:51 -05:00
if !ok {
return NewCache()
}
return s
}
func (t *cache) HUP(_ *config.Config) {
t.Invalidate()
}
2024-11-03 08:09:49 -05:00
func (t *cache) Invalidate() {
2024-11-02 11:39:02 -04:00
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
}
2024-11-03 08:09:49 -05:00
type cache struct {
2024-11-02 11:39:02 -04:00
sync.RWMutex
2024-10-31 16:14:38 -04:00
tgs tgMap
systems map[int32]string
}
2024-11-03 08:09:49 -05:00
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
2024-11-12 09:57:38 -05:00
tgs: make(tgMap),
systems: make(map[int32]string),
2024-10-31 16:14:38 -04:00
}
2024-11-02 11:39:02 -04:00
return tgc
}
2024-11-03 08:09:49 -05:00
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
2024-11-02 11:39:02 -04:00
t.RLock()
2024-11-15 10:37:58 -05:00
var toLoad database.TGTuples
2024-11-02 11:39:02 -04:00
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
2024-11-15 10:37:58 -05:00
toLoad.Append(tg.System, tg.Talkgroup)
2024-11-02 11:39:02 -04:00
}
}
} else {
2024-11-15 10:37:58 -05:00
toLoad[0] = make([]uint32, 0, len(tgs))
toLoad[1] = make([]uint32, 0, len(tgs))
2024-11-02 11:39:02 -04:00
for _, g := range tgs {
2024-11-15 10:37:58 -05:00
toLoad.Append(g.System, g.Talkgroup)
2024-11-02 11:39:02 -04:00
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
2024-10-31 16:14:38 -04:00
}
2024-11-20 11:07:58 -05:00
func (t *cache) add(rec *Talkgroup) {
2024-11-03 14:11:38 -05:00
t.Lock()
defer t.Unlock()
2024-11-15 11:34:54 -05:00
tg := TG(rec.System.ID, rec.Talkgroup.TGID)
2024-11-02 11:39:02 -04:00
t.tgs[tg] = rec
2024-11-03 08:44:34 -05:00
t.systems[int32(rec.System.ID)] = rec.System.Name
2024-11-02 11:39:02 -04:00
}
2024-11-04 23:41:52 -05:00
type row interface {
2024-11-15 08:46:29 -05:00
database.GetTalkgroupsRow | database.GetTalkgroupsWithLearnedRow |
2024-11-15 10:37:58 -05:00
database.GetTalkgroupsWithLearnedBySystemRow | database.GetTalkgroupWithLearnedRow
2024-11-04 23:41:52 -05:00
GetTalkgroup() database.Talkgroup
GetSystem() database.System
GetLearned() bool
2024-11-03 09:45:51 -05:00
}
2024-11-04 23:41:52 -05:00
func rowToTalkgroup[T row](r T) *Talkgroup {
return &Talkgroup{
Talkgroup: r.GetTalkgroup(),
System: r.GetSystem(),
Learned: r.GetLearned(),
2024-11-04 11:15:24 -05:00
}
2024-11-04 23:41:52 -05:00
}
2024-11-04 11:15:24 -05:00
2024-11-20 11:07:58 -05:00
func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) []*Talkgroup {
2024-11-04 11:15:24 -05:00
for _, rec := range tgRecords {
tg := rowToTalkgroup(rec)
2024-11-20 11:07:58 -05:00
t.add(tg)
2024-11-04 11:15:24 -05:00
r = append(r, tg)
}
2024-11-20 11:07:58 -05:00
return r
2024-11-04 11:15:24 -05:00
}
2024-11-04 23:41:52 -05:00
func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
r := make([]*Talkgroup, 0, len(tgs))
var err error
if tgs != nil {
toGet := make(IDs, 0, len(tgs))
t.RLock()
for _, id := range tgs {
rec, has := t.tgs[id]
if has {
r = append(r, rec)
} else {
toGet = append(toGet, id)
}
}
t.RUnlock()
2024-11-15 10:37:58 -05:00
tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySysTGID(ctx, toGet.Tuples())
2024-11-04 23:41:52 -05:00
if err != nil {
return nil, err
}
2024-11-20 11:07:58 -05:00
return addToRowList(t, r, tgRecords), nil
2024-11-04 23:41:52 -05:00
}
// get all talkgroups
tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearned(ctx)
if err != nil {
return nil, err
}
2024-11-20 11:07:58 -05:00
return addToRowList(t, r, tgRecords), nil
2024-11-04 23:41:52 -05:00
}
2024-11-15 10:37:58 -05:00
func (t *cache) Load(ctx context.Context, tgs database.TGTuples) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySysTGID(ctx, tgs)
2024-10-31 16:14:38 -04:00
if err != nil {
return err
}
for _, rec := range tgRecords {
2024-11-20 11:07:58 -05:00
t.add(rowToTalkgroup(rec))
2024-10-31 16:14:38 -04:00
}
return nil
}
func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
tg, err := t.TG(ctx, id)
if err != nil {
return 1.0
}
m := float64(tg.Weight)
2024-11-12 09:57:38 -05:00
m *= tg.AlertConfig.Apply(tm)
2024-11-03 19:22:38 -05:00
return float64(m)
}
2024-11-04 23:41:52 -05:00
func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error) {
recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySystem(ctx, systemID)
if err != nil {
return nil, err
}
r := make([]*Talkgroup, 0, len(recs))
2024-11-20 11:07:58 -05:00
return addToRowList(t, r, recs), nil
2024-11-04 23:41:52 -05:00
}
2024-11-04 11:15:24 -05:00
func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
2024-11-02 11:39:02 -04:00
t.RLock()
2024-10-31 16:14:38 -04:00
rec, has := t.tgs[tg]
2024-11-02 11:39:02 -04:00
t.RUnlock()
2024-10-31 16:14:38 -04:00
2024-11-02 11:39:02 -04:00
if has {
2024-11-03 09:45:51 -05:00
return rec, nil
2024-11-02 11:39:02 -04:00
}
2024-11-15 10:37:58 -05:00
record, err := database.FromCtx(ctx).GetTalkgroupWithLearned(ctx, int32(tg.System), int32(tg.Talkgroup))
2024-11-02 11:39:02 -04:00
switch err {
case nil:
case pgx.ErrNoRows:
2024-11-04 11:15:24 -05:00
return nil, ErrNotFound
2024-11-02 11:39:02 -04:00
default:
2024-11-17 21:46:10 -05:00
log.Error().Err(err).Uint32("sys", tg.System).Uint32("tg", tg.Talkgroup).Msg("TG() cache add db get")
2024-11-04 11:15:24 -05:00
return nil, errors.Join(ErrNotFound, err)
2024-11-02 11:39:02 -04:00
}
2024-11-20 11:07:58 -05:00
t.add(rowToTalkgroup(record))
2024-11-02 11:39:02 -04:00
2024-11-15 10:37:58 -05:00
return rowToTalkgroup(record), nil
2024-10-31 16:14:38 -04:00
}
2024-11-03 08:09:49 -05:00
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
2024-11-03 14:11:38 -05:00
t.RLock()
2024-10-31 16:14:38 -04:00
n, has := t.systems[int32(id)]
2024-11-03 14:11:38 -05:00
t.RUnlock()
2024-11-02 11:39:02 -04:00
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
2024-11-03 14:11:38 -05:00
t.Lock()
t.systems[int32(id)] = sys
t.Unlock()
2024-11-02 11:39:02 -04:00
return sys, true
}
2024-10-31 16:14:38 -04:00
return n, has
}
2024-11-10 10:13:38 -05:00
func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error) {
2024-11-14 08:37:19 -05:00
sysName, has := t.SystemName(ctx, int(*input.SystemID))
2024-11-10 10:13:38 -05:00
if !has {
return nil, ErrNoSuchSystem
}
tg, err := database.FromCtx(ctx).UpdateTalkgroup(ctx, input)
if err != nil {
return nil, err
}
record := &Talkgroup{
Talkgroup: tg,
System: database.System{ID: int(tg.SystemID), Name: sysName},
}
t.add(record)
return record, nil
}
2024-11-20 07:26:59 -05:00
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) {
db := database.FromCtx(ctx)
sysName, hasSys := t.SystemName(ctx, system)
if !hasSys {
return nil, ErrNoSuchSystem
}
sys := database.System{
ID: system,
2024-11-20 07:26:59 -05:00
Name: sysName,
}
tgs := make([]*Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error {
for i := range input {
2024-11-20 07:26:59 -05:00
// normalize tags
for j, tag := range input[i].Tags {
input[i].Tags[j] = strings.ToLower(tag)
2024-11-20 07:26:59 -05:00
}
input[i].SystemID = int32(system)
input[i].Learned = common.PtrTo(false)
}
var oerr error
batch := db.UpsertTalkgroup(ctx, input)
defer batch.Close()
batch.QueryRow(func(_ int, r database.Talkgroup, err error) {
2024-11-20 07:26:59 -05:00
if err != nil {
oerr = err
return
2024-11-20 07:26:59 -05:00
}
tgs = append(tgs, &Talkgroup{
Talkgroup: r,
System: sys,
Learned: r.Learned,
2024-11-20 07:26:59 -05:00
})
})
if oerr != nil {
return oerr
2024-11-20 07:26:59 -05:00
}
return nil
}, pgx.TxOptions{})
if err != nil {
return nil, err
}
// update the cache
for _, tg := range tgs {
2024-11-20 11:07:58 -05:00
t.add(tg)
2024-11-20 07:26:59 -05:00
}
return tgs, nil
}