diff --git a/cmd/stillbox/main.go b/cmd/stillbox/main.go index 93235f1..b78b84e 100644 --- a/cmd/stillbox/main.go +++ b/cmd/stillbox/main.go @@ -9,8 +9,8 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/version" - "dynatron.me/x/stillbox/pkg/cmd/serve" "dynatron.me/x/stillbox/pkg/cmd/admin" + "dynatron.me/x/stillbox/pkg/cmd/serve" "dynatron.me/x/stillbox/pkg/config" "github.com/spf13/cobra" diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index b70fa3e..5d5d09d 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -11,11 +11,12 @@ import ( "text/template" "time" - cl "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/sinks" + talkgroups "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" @@ -46,14 +47,14 @@ type alerter struct { sync.RWMutex clock timeseries.Clock cfg config.Alerting - scorer trending.Scorer[cl.Talkgroup] - scores trending.Scores[cl.Talkgroup] + scorer trending.Scorer[talkgroups.ID] + scores trending.Scores[talkgroups.ID] lastScore time.Time sim *Simulation - alertCache map[cl.Talkgroup]Alert + alertCache map[talkgroups.ID]Alert renotify time.Duration notifier notify.Notifier - tgCache cl.TalkgroupCache + tgCache talkgroups.Store } type offsetClock time.Duration @@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption { } // New creates a new Alerter using the provided configuration. -func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter { +func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter { if !cfg.Enable { return &noopAlerter{} } as := &alerter{ cfg: cfg, - alertCache: make(map[cl.Talkgroup]Alert), + alertCache: make(map[talkgroups.ID]Alert), clock: timeseries.DefaultClock, renotify: DefaultRenotify, tgCache: tgCache, @@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al as.scorer = trending.NewScorer( trending.WithTimeSeries(as.newTimeSeries), - trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), - trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), - trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)), - trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), - trending.WithCountThreshold[cl.Talkgroup](CountThreshold), - trending.WithClock[cl.Talkgroup](as.clock), + trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)), + trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)), + trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)), + trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold), + trending.WithCountThreshold[talkgroups.ID](CountThreshold), + trending.WithClock[talkgroups.ID](as.clock), ) return as @@ -167,12 +168,12 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al var notifications []Alert for _, s := range as.scores { origScore := s.Score - tgr, has := as.tgCache.TG(ctx, s.ID) - if has { - if !tgr.Alert { + tgr, err := as.tgCache.TG(ctx, s.ID) + if err == nil { + if !tgr.Talkgroup.Alert { continue } - s.Score *= float64(tgr.Weight) + s.Score *= float64(tgr.Talkgroup.Weight) } if s.Score > as.cfg.AlertThreshold || testMode { @@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { } // scoredTGs gets a list of TGs. -func (as *alerter) scoredTGs() []cl.Talkgroup { - tgs := make([]cl.Talkgroup, 0, len(as.scores)) +func (as *alerter) scoredTGs() []talkgroups.ID { + tgs := make([]talkgroups.ID, 0, len(as.scores)) for _, s := range as.scores { tgs = append(tgs, s.ID) } @@ -275,7 +276,7 @@ type Alert struct { ID uuid.UUID Timestamp time.Time TGName string - Score trending.Score[cl.Talkgroup] + Score trending.Score[talkgroups.ID] OrigScore float64 Weight float32 Suppressed bool @@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { // makeAlert creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { +func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) { d := Alert{ ID: uuid.New(), Score: score, @@ -326,20 +327,20 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgr OrigScore: origScore, } - tgRecord, has := as.tgCache.TG(ctx, score.ID) - switch has { - case true: - d.Weight = tgRecord.Weight - if tgRecord.SystemName == "" { - tgRecord.SystemName = strconv.Itoa(int(score.ID.System)) + tgRecord, err := as.tgCache.TG(ctx, score.ID) + switch err { + case nil: + d.Weight = tgRecord.Talkgroup.Weight + if tgRecord.System.Name == "" { + tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) } - if tgRecord.Name != nil { - d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup) + if tgRecord.Talkgroup.Name != nil { + d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup) } else { - d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) + d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) } - case false: + default: system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) if has { d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) @@ -369,7 +370,7 @@ func (as *alerter) cleanCache() { } } -func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { +func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, @@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim defer as.Unlock() for rows.Next() { - var tg cl.Talkgroup + var tg talkgroups.ID var callDate time.Time if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { return count, err @@ -440,7 +441,7 @@ func (as *alerter) SinkType() string { return "alerting" } -func (as *alerter) Call(ctx context.Context, call *cl.Call) error { +func (as *alerter) Call(ctx context.Context, call *calls.Call) error { as.Lock() defer as.Unlock() as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) @@ -453,7 +454,7 @@ func (*alerter) Enabled() bool { return true } // noopAlerter is used when alerting is disabled. type noopAlerter struct{} -func (*noopAlerter) SinkType() string { return "noopAlerter" } -func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil } -func (*noopAlerter) Go(_ context.Context) {} -func (*noopAlerter) Enabled() bool { return false } +func (*noopAlerter) SinkType() string { return "noopAlerter" } +func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil } +func (*noopAlerter) Go(_ context.Context) {} +func (*noopAlerter) Enabled() bool { return false } diff --git a/pkg/alerting/simulate.go b/pkg/alerting/simulate.go index 326b7b5..6d646fa 100644 --- a/pkg/alerting/simulate.go +++ b/pkg/alerting/simulate.go @@ -12,8 +12,8 @@ import ( "dynatron.me/x/stillbox/internal/forms" "dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/trending" - cl "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/rs/zerolog/log" ) @@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) { } // Simulate begins the simulation using the DB handle from ctx. It returns final scores. -func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) { +func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) { now := time.Now() - tgc := cl.NewTalkgroupCache() + tgc := talkgroups.NewCache() s.Enable = true s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter) diff --git a/pkg/alerting/stats.go b/pkg/alerting/stats.go index 80d7777..e513edd 100644 --- a/pkg/alerting/stats.go +++ b/pkg/alerting/stats.go @@ -7,9 +7,9 @@ import ( "net/http" "time" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/jsontime" @@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) { return } - tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs)) + tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs)) for _, t := range tgs { - tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t + tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t } renderData := struct { - TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow - Scores trending.Scores[calls.Talkgroup] + TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow + Scores trending.Scores[talkgroups.ID] LastScore time.Time Simulation *Simulation Config config.Alerting diff --git a/pkg/calls/call.go b/pkg/calls/call.go index ea9c3a2..9b8e78d 100644 --- a/pkg/calls/call.go +++ b/pkg/calls/call.go @@ -7,6 +7,7 @@ import ( "dynatron.me/x/stillbox/internal/audio" "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/pb" + "dynatron.me/x/stillbox/pkg/talkgroups" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -128,3 +129,7 @@ func (c *Call) computeLength() (err error) { return nil } + +func (c *Call) TalkgroupTuple() talkgroups.ID { + return talkgroups.TG(c.System, c.Talkgroup) +} diff --git a/pkg/calls/filter.go b/pkg/calls/filter.go index fe45cd4..ecc1fc6 100644 --- a/pkg/calls/filter.go +++ b/pkg/calls/filter.go @@ -5,16 +5,18 @@ import ( "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/pb" + + tgs "dynatron.me/x/stillbox/pkg/talkgroups" ) type TalkgroupFilter struct { - Talkgroups []Talkgroup `json:"talkgroups,omitempty"` - TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"` - TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` - TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` - TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` + Talkgroups []tgs.ID `json:"talkgroups,omitempty"` + TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"` + TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` + TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` + TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` - talkgroups map[Talkgroup]bool + talkgroups map[tgs.ID]bool } func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) { @@ -25,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.Talkgroups); l > 0 { - tgf.Talkgroups = make([]Talkgroup, l) + tgf.Talkgroups = make([]tgs.ID, l) for i, t := range p.Talkgroups { - tgf.Talkgroups[i] = Talkgroup{ + tgf.Talkgroups[i] = tgs.ID{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -35,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.TalkgroupsNot); l > 0 { - tgf.TalkgroupsNot = make([]Talkgroup, l) + tgf.TalkgroupsNot = make([]tgs.ID, l) for i, t := range p.TalkgroupsNot { - tgf.TalkgroupsNot[i] = Talkgroup{ + tgf.TalkgroupsNot[i] = tgs.ID{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -51,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool { return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0 } -func (f *TalkgroupFilter) GetFinalTalkgroups() map[Talkgroup]bool { +func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool { return f.talkgroups } func (f *TalkgroupFilter) compile(ctx context.Context) error { - f.talkgroups = make(map[Talkgroup]bool) + f.talkgroups = make(map[tgs.ID]bool) for _, tg := range f.Talkgroups { f.talkgroups[tg] = true } @@ -69,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error { } for _, tg := range tagTGs { - f.talkgroups[Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true + f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true } } diff --git a/pkg/calls/talkgroups.go b/pkg/calls/talkgroups.go deleted file mode 100644 index 6d554b7..0000000 --- a/pkg/calls/talkgroups.go +++ /dev/null @@ -1,191 +0,0 @@ -package calls - -import ( - "context" - "fmt" - "sync" - "time" - - "dynatron.me/x/stillbox/pkg/database" - - "dynatron.me/x/stillbox/internal/ruletime" - "dynatron.me/x/stillbox/internal/trending" - - "github.com/jackc/pgx/v5" - "github.com/rs/zerolog/log" -) - -type Talkgroup struct { - System uint32 - Talkgroup uint32 -} - -func (c *Call) TalkgroupTuple() Talkgroup { - return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)} -} - -func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup { - return Talkgroup{ - System: uint32(sys), - Talkgroup: uint32(tgid), - } -} - -func (t Talkgroup) Pack() int64 { - // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) - return int64((int64(t.System) << 32) | int64(t.Talkgroup)) -} - -func (t Talkgroup) String() string { - return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) -} - -func PackedTGs(tg []Talkgroup) []int64 { - s := make([]int64, len(tg)) - - for i, v := range tg { - s[i] = v.Pack() - } - - return s -} - -type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow - -type TalkgroupCache interface { - TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) - SystemName(ctx context.Context, id int) (string, bool) - ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 - Hint(ctx context.Context, tgs []Talkgroup) error - Load(ctx context.Context, tgs []int64) error - Invalidate() -} - -func (t *talkgroupCache) Invalidate() { - t.Lock() - defer t.Unlock() - clear(t.tgs) - clear(t.systems) - clear(t.AlertConfig) -} - -type talkgroupCache struct { - sync.RWMutex - AlertConfig - tgs tgMap - systems map[int32]string -} - -func NewTalkgroupCache() TalkgroupCache { - tgc := &talkgroupCache{ - tgs: make(tgMap), - systems: make(map[int32]string), - AlertConfig: make(AlertConfig), - } - - return tgc -} - -func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error { - t.RLock() - var toLoad []int64 - if len(t.tgs) > len(tgs)/2 { // TODO: instrument this - for _, tg := range tgs { - _, ok := t.tgs[tg] - if !ok { - toLoad = append(toLoad, tg.Pack()) - } - } - - } else { - toLoad = make([]int64, 0, len(tgs)) - for _, g := range tgs { - toLoad = append(toLoad, g.Pack()) - } - } - - if len(toLoad) > 0 { - t.RUnlock() - return t.Load(ctx, toLoad) - } - - t.RUnlock() - return nil -} - -func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { - tg := TG(rec.SystemID, rec.Tgid) - t.tgs[tg] = rec - t.systems[rec.SystemID] = rec.SystemName - - return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) -} - -func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error { - tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) - if err != nil { - return err - } - - t.Lock() - defer t.Unlock() - - for _, rec := range tgRecords { - err := t.add(rec) - - if err != nil { - log.Error().Err(err).Msg("add alert config fail") - } - } - - return nil -} - -func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { - t.RLock() - rec, has := t.tgs[tg] - t.RUnlock() - - if has { - return rec, has - } - - recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) - switch err { - case nil: - case pgx.ErrNoRows: - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false - default: - log.Error().Err(err).Msg("TG() cache add db get") - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false - } - - if len(recs) < 1 { - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false - } - - t.Lock() - defer t.Unlock() - err = t.add(recs[0]) - if err != nil { - log.Error().Err(err).Msg("TG() cache add") - return recs[0], false - } - - return recs[0], true -} - -func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) { - n, has := t.systems[int32(id)] - - if !has { - sys, err := database.FromCtx(ctx).GetSystemName(ctx, id) - if err != nil { - return "", false - } - - return sys, true - } - - return n, has -} diff --git a/pkg/database/querier.go b/pkg/database/querier.go index b911805..4bf2177 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -22,14 +22,14 @@ type Querier interface { GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetDatabaseSize(ctx context.Context) (string, error) GetSystemName(ctx context.Context, systemID int) (string, error) - GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) + GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) - GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) - GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) + GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) + GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) GetUserByID(ctx context.Context, id int32) (User, error) GetUserByUID(ctx context.Context, id int32) (User, error) GetUserByUsername(ctx context.Context, username string) (User, error) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index 0b58f8a..8700be2 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -31,26 +31,30 @@ func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, erro } const getTalkgroup = `-- name: GetTalkgroup :one -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE id = systg2id($1, $2) ` -func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) { +type GetTalkgroupRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) { row := q.db.QueryRow(ctx, getTalkgroup, systemID, tgid) - var i Talkgroup + var i GetTalkgroupRow err := row.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ) return i, err } @@ -101,19 +105,17 @@ func (q *Queries) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]stri const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id($1, $2) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -121,39 +123,29 @@ WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE ` type GetTalkgroupWithLearnedRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - SystemName string `json:"system_name"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - AlphaTag *string `json:"alpha_tag"` - Alert bool `json:"alert"` - Weight float32 `json:"weight"` - AlertConfig []byte `json:"alert_config"` - Learned bool `json:"learned"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` } func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) { row := q.db.QueryRow(ctx, getTalkgroupWithLearned, systemID, tgid) var i GetTalkgroupWithLearnedRow err := row.Scan( - &i.ID, - &i.SystemID, - &i.SystemName, - &i.Tgid, - &i.Name, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.AlphaTag, - &i.Alert, - &i.Weight, - &i.AlertConfig, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, &i.Learned, ) return i, err @@ -161,19 +153,17 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -181,20 +171,9 @@ WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRU ` type GetTalkgroupWithLearnedByPackedIDsRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - SystemName string `json:"system_name"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - AlphaTag *string `json:"alpha_tag"` - Alert bool `json:"alert"` - Weight float32 `json:"weight"` - AlertConfig []byte `json:"alert_config"` - Learned bool `json:"learned"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` } func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) { @@ -207,19 +186,20 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar for rows.Next() { var i GetTalkgroupWithLearnedByPackedIDsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.SystemName, - &i.Tgid, - &i.Name, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.AlphaTag, - &i.Alert, - &i.Weight, - &i.AlertConfig, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, &i.Learned, ); err != nil { return nil, err @@ -233,26 +213,14 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar } const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many -SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, sys.id, sys.name FROM talkgroups tg +SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) ` type GetTalkgroupsByPackedIDsRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - AlphaTag *string `json:"alpha_tag"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - Alert bool `json:"alert"` - AlertConfig []byte `json:"alert_config"` - Weight float32 `json:"weight"` - ID_2 int `json:"id_2"` - Name_2 string `json:"name_2"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` } func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) { @@ -265,20 +233,20 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64 for rows.Next() { var i GetTalkgroupsByPackedIDsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, - &i.ID_2, - &i.Name_2, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, ); err != nil { return nil, err } @@ -291,32 +259,36 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64 } const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE tags && ARRAY[$1] ` -func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) { +type GetTalkgroupsWithAllTagsRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) { rows, err := q.db.Query(ctx, getTalkgroupsWithAllTags, tags) if err != nil { return nil, err } defer rows.Close() - var items []Talkgroup + var items []GetTalkgroupsWithAllTagsRow for rows.Next() { - var i Talkgroup + var i GetTalkgroupsWithAllTagsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ); err != nil { return nil, err } @@ -329,32 +301,36 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ( } const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE tags @> ARRAY[$1] ` -func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) { +type GetTalkgroupsWithAnyTagsRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) { rows, err := q.db.Query(ctx, getTalkgroupsWithAnyTags, tags) if err != nil { return nil, err } defer rows.Close() - var items []Talkgroup + var items []GetTalkgroupsWithAnyTagsRow for rows.Next() { - var i Talkgroup + var i GetTalkgroupsWithAnyTagsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ); err != nil { return nil, err } diff --git a/pkg/database/talkgroups.sql_test.go b/pkg/database/talkgroups.sql_test.go new file mode 100644 index 0000000..cc39c55 --- /dev/null +++ b/pkg/database/talkgroups.sql_test.go @@ -0,0 +1,49 @@ +package database + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const getTalkgroupWithLearnedByPackedIDsTest = `-- name: GetTalkgroupWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE +` +const getTalkgroupWithLearnedTest = `-- name: GetTalkgroupWithLearned :one +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = systg2id($1, $2) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE +` + +func TestQueryColumnsMatch(t *testing.T) { + require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) + require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned) +} diff --git a/pkg/nexus/commands.go b/pkg/nexus/commands.go index 6d5a55f..6842d83 100644 --- a/pkg/nexus/commands.go +++ b/pkg/nexus/commands.go @@ -5,10 +5,9 @@ import ( "encoding/json" "dynatron.me/x/stillbox/pkg/calls" - "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/pb" + "dynatron.me/x/stillbox/pkg/talkgroups" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" "google.golang.org/protobuf/types/known/structpb" ) @@ -61,19 +60,18 @@ func (c *client) SendError(cmd *pb.Command, err error) { } func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { - db := database.FromCtx(ctx) - tgi, err := db.GetTalkgroupWithLearned(ctx, int(tg.System), int(tg.Talkgroup)) + tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup)) if err != nil { - if err != pgx.ErrNoRows { + if err != talkgroups.ErrNoTG { log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail") } return err } var md *structpb.Struct - if len(tgi.Metadata) > 0 { + if len(tgi.Talkgroup.Metadata) > 0 { m := make(map[string]interface{}) - err := json.Unmarshal(tgi.Metadata, &m) + err := json.Unmarshal(tgi.Talkgroup.Metadata, &m) if err != nil { log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("unmarshal tg metadata") } @@ -85,14 +83,14 @@ func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { resp := &pb.TalkgroupInfo{ Tg: tg, - Name: tgi.Name, - Group: tgi.TgGroup, - Frequency: tgi.Frequency, + Name: tgi.Talkgroup.Name, + Group: tgi.Talkgroup.TgGroup, + Frequency: tgi.Talkgroup.Frequency, Metadata: md, - Tags: tgi.Tags, + Tags: tgi.Talkgroup.Tags, Learned: tgi.Learned, - AlphaTag: tgi.AlphaTag, - SystemName: tgi.SystemName, + AlphaTag: tgi.Talkgroup.AlphaTag, + SystemName: tgi.System.Name, } _ = c.Send(&pb.Message{ diff --git a/pkg/server/server.go b/pkg/server/server.go index be6bc56..fb0c23b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -6,7 +6,6 @@ import ( "os" "time" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/alerting" "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/config" @@ -15,6 +14,7 @@ import ( "dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/sinks" "dynatron.me/x/stillbox/pkg/sources" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" @@ -35,7 +35,7 @@ type Server struct { alerter alerting.Alerter notifier notify.Notifier hup chan os.Signal - tgCache calls.TalkgroupCache + tgs talkgroups.Store } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { return nil, err } - tgCache := calls.NewTalkgroupCache() + tgCache := talkgroups.NewCache() srv := &Server{ auth: authenticator, @@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { logger: logger, alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, - tgCache: tgCache, + tgs: tgCache, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) @@ -104,6 +104,7 @@ func (s *Server) Go(ctx context.Context) error { s.installHupHandler() ctx = database.CtxWithDB(ctx, s.db) + ctx = talkgroups.CtxWithStore(ctx, s.tgs) httpSrv := &http.Server{ Addr: s.conf.Listen, diff --git a/pkg/sources/http.go b/pkg/sources/http.go index 4ecefee..32d1a28 100644 --- a/pkg/sources/http.go +++ b/pkg/sources/http.go @@ -7,8 +7,8 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/forms" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/auth" + "dynatron.me/x/stillbox/pkg/calls" "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" ) diff --git a/pkg/calls/alertconfig.go b/pkg/talkgroups/alertconfig.go similarity index 75% rename from pkg/calls/alertconfig.go rename to pkg/talkgroups/alertconfig.go index 0a8b492..65837a0 100644 --- a/pkg/calls/alertconfig.go +++ b/pkg/talkgroups/alertconfig.go @@ -1,4 +1,4 @@ -package calls +package talkgroups import ( "encoding/json" @@ -8,14 +8,14 @@ import ( "dynatron.me/x/stillbox/internal/trending" ) -type AlertConfig map[Talkgroup][]AlertRule +type AlertConfig map[ID][]AlertRule type AlertRule struct { Times []ruletime.RuleTime `json:"times"` ScoreMultiplier float32 `json:"mult"` } -func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { +func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error { if len(confBytes) == 0 { return nil } @@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { return nil } -func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 { +func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 { s, has := ac[score.ID] if !has { return score.Score diff --git a/pkg/calls/alertconfig_test.go b/pkg/talkgroups/alertconfig_test.go similarity index 84% rename from pkg/calls/alertconfig_test.go rename to pkg/talkgroups/alertconfig_test.go index fe141d9..2d26765 100644 --- a/pkg/calls/alertconfig_test.go +++ b/pkg/talkgroups/alertconfig_test.go @@ -1,4 +1,4 @@ -package calls_test +package talkgroups_test import ( "errors" @@ -8,26 +8,26 @@ import ( "dynatron.me/x/stillbox/internal/ruletime" "dynatron.me/x/stillbox/internal/trending" - "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestAlertConfig(t *testing.T) { - ac := make(calls.AlertConfig) + ac := make(talkgroups.AlertConfig) parseTests := []struct { name string - tg calls.Talkgroup + tg talkgroups.ID conf string - compare []calls.AlertRule + compare []talkgroups.AlertRule expectErr error }{ { name: "base case", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`, - compare: []calls.AlertRule{ + compare: []talkgroups.AlertRule{ { Times: []ruletime.RuleTime{ ruletime.Must(ruletime.New("7:00+2h")), @@ -49,7 +49,7 @@ func TestAlertConfig(t *testing.T) { }, { name: "bad spec", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`, expectErr: errors.New("'26:00+2h': invalid hours"), }, @@ -78,42 +78,42 @@ func TestAlertConfig(t *testing.T) { evalTests := []struct { name string - tg calls.Talkgroup + tg talkgroups.ID t time.Time origScore float64 expectScore float64 }{ { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("1:20"), origScore: 3, expectScore: 0.6, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("23:03"), origScore: 3, expectScore: 3, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("8:03"), origScore: 1.0, expectScore: 0.2, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("15:15"), origScore: 3.0, expectScore: 6.0, }, { name: "overlapping eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("16:10"), origScore: 1.0, expectScore: 0.4, @@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) { for _, tc := range evalTests { t.Run(tc.name, func(t *testing.T) { - cs := trending.Score[calls.Talkgroup]{ + cs := trending.Score[talkgroups.ID]{ ID: tc.tg, Score: tc.origScore, } diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go new file mode 100644 index 0000000..bc60417 --- /dev/null +++ b/pkg/talkgroups/cache.go @@ -0,0 +1,195 @@ +package talkgroups + +import ( + "context" + "errors" + "sync" + "time" + + "dynatron.me/x/stillbox/pkg/database" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" + + "github.com/jackc/pgx/v5" + "github.com/rs/zerolog/log" +) + +type tgMap map[ID]Talkgroup + +type Store interface { + // TG retrieves a Talkgroup from the Store. + TG(ctx context.Context, tg ID) (Talkgroup, error) + + // SystemName retrieves a system name from the store. It returns the record and whether one was found. + SystemName(ctx context.Context, id int) (string, bool) + + // ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score. + ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 + + // Hint hints the Store that the provided talkgroups will be asked for. + Hint(ctx context.Context, tgs []ID) error + + // Load loads the provided packed talkgroup IDs into the Store. + Load(ctx context.Context, tgs []int64) error + + // Invalidate invalidates any caching in the Store. + Invalidate() +} + +type CtxStoreKeyT string + +const CtxStoreKey CtxStoreKeyT = "store" + +func CtxWithStore(ctx context.Context, s Store) context.Context { + return context.WithValue(ctx, CtxStoreKey, s) +} + +func StoreFrom(ctx context.Context) Store { + s, ok := ctx.Value(CtxStoreKey).(Store) + if !ok { + return NewCache() + } + + return s +} + +func (t *cache) Invalidate() { + t.Lock() + defer t.Unlock() + clear(t.tgs) + clear(t.systems) + clear(t.AlertConfig) +} + +type cache struct { + sync.RWMutex + AlertConfig + tgs tgMap + systems map[int32]string +} + +// NewCache returns a new cache Store. +func NewCache() Store { + tgc := &cache{ + tgs: make(tgMap), + systems: make(map[int32]string), + AlertConfig: make(AlertConfig), + } + + return tgc +} + +func (t *cache) Hint(ctx context.Context, tgs []ID) error { + t.RLock() + var toLoad []int64 + if len(t.tgs) > len(tgs)/2 { // TODO: instrument this + for _, tg := range tgs { + _, ok := t.tgs[tg] + if !ok { + toLoad = append(toLoad, tg.Pack()) + } + } + + } else { + toLoad = make([]int64, 0, len(tgs)) + for _, g := range tgs { + toLoad = append(toLoad, g.Pack()) + } + } + + if len(toLoad) > 0 { + t.RUnlock() + return t.Load(ctx, toLoad) + } + + t.RUnlock() + return nil +} + +func (t *cache) add(rec Talkgroup) error { + tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid)) + t.tgs[tg] = rec + t.systems[int32(rec.System.ID)] = rec.System.Name + + return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig) +} + +func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup { + return Talkgroup{ + Talkgroup: r.Talkgroup, + System: r.System, + Learned: r.Learned, + } +} + +func (t *cache) Load(ctx context.Context, tgs []int64) error { + tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) + if err != nil { + return err + } + + t.Lock() + defer t.Unlock() + + for _, rec := range tgRecords { + err := t.add(rowToTalkgroup(rec)) + + if err != nil { + log.Error().Err(err).Msg("add alert config fail") + } + } + + return nil +} + +var ErrNoTG = errors.New("talkgroup not found") + +func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { + t.RLock() + rec, has := t.tgs[tg] + t.RUnlock() + + if has { + return rec, nil + } + + recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) + switch err { + case nil: + case pgx.ErrNoRows: + return Talkgroup{}, ErrNoTG + default: + log.Error().Err(err).Msg("TG() cache add db get") + return Talkgroup{}, errors.Join(ErrNoTG, err) + } + + if len(recs) < 1 { + return Talkgroup{}, ErrNoTG + } + + t.Lock() + defer t.Unlock() + err = t.add(rowToTalkgroup(recs[0])) + if err != nil { + log.Error().Err(err).Msg("TG() cache add") + return rowToTalkgroup(recs[0]), errors.Join(ErrNoTG, err) + } + + return rowToTalkgroup(recs[0]), nil +} + +func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) { + n, has := t.systems[int32(id)] + + if !has { + sys, err := database.FromCtx(ctx).GetSystemName(ctx, id) + if err != nil { + return "", false + } + + return sys, true + } + + return n, has +} diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go new file mode 100644 index 0000000..bf68dfa --- /dev/null +++ b/pkg/talkgroups/talkgroup.go @@ -0,0 +1,44 @@ +package talkgroups + +import ( + "fmt" + + "dynatron.me/x/stillbox/pkg/database" +) + +type Talkgroup struct { + database.Talkgroup + System database.System `json:"system"` + Learned bool `json:"learned"` +} + +type ID struct { + System uint32 + Talkgroup uint32 +} + +func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID { + return ID{ + System: uint32(sys), + Talkgroup: uint32(tgid), + } +} + +func (t ID) Pack() int64 { + // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) + return int64((int64(t.System) << 32) | int64(t.Talkgroup)) +} + +func (t ID) String() string { + return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) +} + +func PackedTGs(tg []ID) []int64 { + s := make([]int64, len(tg)) + + for i, v := range tg { + s[i] = v.Pack() + } + + return s +} diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 099e983..a2ab786 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -1,9 +1,9 @@ -- name: GetTalkgroupsWithAnyTags :many -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE tags @> ARRAY[$1]; -- name: GetTalkgroupsWithAllTags :many -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE tags && ARRAY[$1]; -- name: GetTalkgroupIDsByTags :many @@ -25,29 +25,27 @@ UPDATE talkgroups SET tags = $2 WHERE id = ANY($1); -- name: GetTalkgroup :one -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)); -- name: GetTalkgroupsByPackedIDs :many -SELECT * FROM talkgroups tg +SELECT sqlc.embed(tg), sqlc.embed(sys) FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]); -- name: GetTalkgroupWithLearned :one SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +sqlc.embed(tg), sqlc.embed(sys), FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -55,19 +53,17 @@ WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND igno -- name: GetTalkgroupWithLearnedByPackedIDs :many SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +sqlc.embed(tg), sqlc.embed(sys), FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id