From c3233f1bc8bcbb5275feda984879bd5dbe478a55 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 07:58:41 -0500 Subject: [PATCH 1/4] Split out talkgroups --- pkg/calls/call.go | 6 ++++ pkg/calls/filter.go | 22 +++++++------- pkg/{calls => talkgroups}/alertconfig.go | 2 +- pkg/{calls => talkgroups}/alertconfig_test.go | 30 +++++++++---------- pkg/{calls => talkgroups}/talkgroups.go | 6 +--- 5 files changed, 35 insertions(+), 31 deletions(-) rename pkg/{calls => talkgroups}/alertconfig.go (98%) rename pkg/{calls => talkgroups}/alertconfig_test.go (83%) rename pkg/{calls => talkgroups}/talkgroups.go (96%) diff --git a/pkg/calls/call.go b/pkg/calls/call.go index ea9c3a2..ea5402f 100644 --- a/pkg/calls/call.go +++ b/pkg/calls/call.go @@ -7,6 +7,7 @@ import ( "dynatron.me/x/stillbox/internal/audio" "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/pb" + "dynatron.me/x/stillbox/pkg/talkgroups" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -128,3 +129,8 @@ func (c *Call) computeLength() (err error) { return nil } + +func (c *Call) TalkgroupTuple() talkgroups.Talkgroup { + return talkgroups.TG(c.System, c.Talkgroup) +} + diff --git a/pkg/calls/filter.go b/pkg/calls/filter.go index fe45cd4..a3cf253 100644 --- a/pkg/calls/filter.go +++ b/pkg/calls/filter.go @@ -5,16 +5,18 @@ import ( "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/pb" + + tgs "dynatron.me/x/stillbox/pkg/talkgroups" ) type TalkgroupFilter struct { - Talkgroups []Talkgroup `json:"talkgroups,omitempty"` - TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"` + Talkgroups []tgs.Talkgroup `json:"talkgroups,omitempty"` + TalkgroupsNot []tgs.Talkgroup `json:"talkgroupsNot,omitempty"` TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` - talkgroups map[Talkgroup]bool + talkgroups map[tgs.Talkgroup]bool } func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) { @@ -25,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.Talkgroups); l > 0 { - tgf.Talkgroups = make([]Talkgroup, l) + tgf.Talkgroups = make([]tgs.Talkgroup, l) for i, t := range p.Talkgroups { - tgf.Talkgroups[i] = Talkgroup{ + tgf.Talkgroups[i] = tgs.Talkgroup{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -35,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.TalkgroupsNot); l > 0 { - tgf.TalkgroupsNot = make([]Talkgroup, l) + tgf.TalkgroupsNot = make([]tgs.Talkgroup, l) for i, t := range p.TalkgroupsNot { - tgf.TalkgroupsNot[i] = Talkgroup{ + tgf.TalkgroupsNot[i] = tgs.Talkgroup{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -51,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool { return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0 } -func (f *TalkgroupFilter) GetFinalTalkgroups() map[Talkgroup]bool { +func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.Talkgroup]bool { return f.talkgroups } func (f *TalkgroupFilter) compile(ctx context.Context) error { - f.talkgroups = make(map[Talkgroup]bool) + f.talkgroups = make(map[tgs.Talkgroup]bool) for _, tg := range f.Talkgroups { f.talkgroups[tg] = true } @@ -69,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error { } for _, tg := range tagTGs { - f.talkgroups[Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true + f.talkgroups[tgs.Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true } } diff --git a/pkg/calls/alertconfig.go b/pkg/talkgroups/alertconfig.go similarity index 98% rename from pkg/calls/alertconfig.go rename to pkg/talkgroups/alertconfig.go index 0a8b492..73365d6 100644 --- a/pkg/calls/alertconfig.go +++ b/pkg/talkgroups/alertconfig.go @@ -1,4 +1,4 @@ -package calls +package talkgroups import ( "encoding/json" diff --git a/pkg/calls/alertconfig_test.go b/pkg/talkgroups/alertconfig_test.go similarity index 83% rename from pkg/calls/alertconfig_test.go rename to pkg/talkgroups/alertconfig_test.go index fe141d9..785895b 100644 --- a/pkg/calls/alertconfig_test.go +++ b/pkg/talkgroups/alertconfig_test.go @@ -1,4 +1,4 @@ -package calls_test +package talkgroups_test import ( "errors" @@ -8,26 +8,26 @@ import ( "dynatron.me/x/stillbox/internal/ruletime" "dynatron.me/x/stillbox/internal/trending" - "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestAlertConfig(t *testing.T) { - ac := make(calls.AlertConfig) + ac := make(talkgroups.AlertConfig) parseTests := []struct { name string - tg calls.Talkgroup + tg talkgroups.Talkgroup conf string - compare []calls.AlertRule + compare []talkgroups.AlertRule expectErr error }{ { name: "base case", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`, - compare: []calls.AlertRule{ + compare: []talkgroups.AlertRule{ { Times: []ruletime.RuleTime{ ruletime.Must(ruletime.New("7:00+2h")), @@ -49,7 +49,7 @@ func TestAlertConfig(t *testing.T) { }, { name: "bad spec", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`, expectErr: errors.New("'26:00+2h': invalid hours"), }, @@ -78,42 +78,42 @@ func TestAlertConfig(t *testing.T) { evalTests := []struct { name string - tg calls.Talkgroup + tg talkgroups.Talkgroup t time.Time origScore float64 expectScore float64 }{ { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("1:20"), origScore: 3, expectScore: 0.6, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("23:03"), origScore: 3, expectScore: 3, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("8:03"), origScore: 1.0, expectScore: 0.2, }, { name: "base eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("15:15"), origScore: 3.0, expectScore: 6.0, }, { name: "overlapping eval", - tg: calls.TG(197, 3), + tg: talkgroups.TG(197, 3), t: tMust("16:10"), origScore: 1.0, expectScore: 0.4, @@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) { for _, tc := range evalTests { t.Run(tc.name, func(t *testing.T) { - cs := trending.Score[calls.Talkgroup]{ + cs := trending.Score[talkgroups.Talkgroup]{ ID: tc.tg, Score: tc.origScore, } diff --git a/pkg/calls/talkgroups.go b/pkg/talkgroups/talkgroups.go similarity index 96% rename from pkg/calls/talkgroups.go rename to pkg/talkgroups/talkgroups.go index 6d554b7..a537ce9 100644 --- a/pkg/calls/talkgroups.go +++ b/pkg/talkgroups/talkgroups.go @@ -1,4 +1,4 @@ -package calls +package talkgroups import ( "context" @@ -20,10 +20,6 @@ type Talkgroup struct { Talkgroup uint32 } -func (c *Call) TalkgroupTuple() Talkgroup { - return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)} -} - func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup { return Talkgroup{ System: uint32(sys), From 882d9fbefac96be9d580a9625c0bf48b472af9c6 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 08:09:49 -0500 Subject: [PATCH 2/4] Reorg --- cmd/stillbox/main.go | 2 +- pkg/alerting/alerting.go | 49 +++++++-------- pkg/alerting/simulate.go | 6 +- pkg/alerting/stats.go | 10 ++-- pkg/calls/call.go | 3 +- pkg/calls/filter.go | 26 ++++---- pkg/server/server.go | 8 +-- pkg/sources/http.go | 2 +- pkg/talkgroups/alertconfig.go | 6 +- pkg/talkgroups/alertconfig_test.go | 6 +- pkg/talkgroups/{talkgroups.go => cache.go} | 70 ++++++++-------------- pkg/talkgroups/talkgroup.go | 36 +++++++++++ 12 files changed, 120 insertions(+), 104 deletions(-) rename pkg/talkgroups/{talkgroups.go => cache.go} (60%) create mode 100644 pkg/talkgroups/talkgroup.go diff --git a/cmd/stillbox/main.go b/cmd/stillbox/main.go index 93235f1..b78b84e 100644 --- a/cmd/stillbox/main.go +++ b/cmd/stillbox/main.go @@ -9,8 +9,8 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/version" - "dynatron.me/x/stillbox/pkg/cmd/serve" "dynatron.me/x/stillbox/pkg/cmd/admin" + "dynatron.me/x/stillbox/pkg/cmd/serve" "dynatron.me/x/stillbox/pkg/config" "github.com/spf13/cobra" diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index b70fa3e..cf7e56e 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -11,11 +11,12 @@ import ( "text/template" "time" - cl "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/sinks" + talkgroups "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" @@ -46,14 +47,14 @@ type alerter struct { sync.RWMutex clock timeseries.Clock cfg config.Alerting - scorer trending.Scorer[cl.Talkgroup] - scores trending.Scores[cl.Talkgroup] + scorer trending.Scorer[talkgroups.ID] + scores trending.Scores[talkgroups.ID] lastScore time.Time sim *Simulation - alertCache map[cl.Talkgroup]Alert + alertCache map[talkgroups.ID]Alert renotify time.Duration notifier notify.Notifier - tgCache cl.TalkgroupCache + tgCache talkgroups.Store } type offsetClock time.Duration @@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption { } // New creates a new Alerter using the provided configuration. -func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter { +func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter { if !cfg.Enable { return &noopAlerter{} } as := &alerter{ cfg: cfg, - alertCache: make(map[cl.Talkgroup]Alert), + alertCache: make(map[talkgroups.ID]Alert), clock: timeseries.DefaultClock, renotify: DefaultRenotify, tgCache: tgCache, @@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al as.scorer = trending.NewScorer( trending.WithTimeSeries(as.newTimeSeries), - trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), - trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), - trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)), - trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), - trending.WithCountThreshold[cl.Talkgroup](CountThreshold), - trending.WithClock[cl.Talkgroup](as.clock), + trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)), + trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)), + trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)), + trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold), + trending.WithCountThreshold[talkgroups.ID](CountThreshold), + trending.WithClock[talkgroups.ID](as.clock), ) return as @@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { } // scoredTGs gets a list of TGs. -func (as *alerter) scoredTGs() []cl.Talkgroup { - tgs := make([]cl.Talkgroup, 0, len(as.scores)) +func (as *alerter) scoredTGs() []talkgroups.ID { + tgs := make([]talkgroups.ID, 0, len(as.scores)) for _, s := range as.scores { tgs = append(tgs, s.ID) } @@ -275,7 +276,7 @@ type Alert struct { ID uuid.UUID Timestamp time.Time TGName string - Score trending.Score[cl.Talkgroup] + Score trending.Score[talkgroups.ID] OrigScore float64 Weight float32 Suppressed bool @@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { // makeAlert creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { +func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) { d := Alert{ ID: uuid.New(), Score: score, @@ -369,7 +370,7 @@ func (as *alerter) cleanCache() { } } -func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { +func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, @@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim defer as.Unlock() for rows.Next() { - var tg cl.Talkgroup + var tg talkgroups.ID var callDate time.Time if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { return count, err @@ -440,7 +441,7 @@ func (as *alerter) SinkType() string { return "alerting" } -func (as *alerter) Call(ctx context.Context, call *cl.Call) error { +func (as *alerter) Call(ctx context.Context, call *calls.Call) error { as.Lock() defer as.Unlock() as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) @@ -453,7 +454,7 @@ func (*alerter) Enabled() bool { return true } // noopAlerter is used when alerting is disabled. type noopAlerter struct{} -func (*noopAlerter) SinkType() string { return "noopAlerter" } -func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil } -func (*noopAlerter) Go(_ context.Context) {} -func (*noopAlerter) Enabled() bool { return false } +func (*noopAlerter) SinkType() string { return "noopAlerter" } +func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil } +func (*noopAlerter) Go(_ context.Context) {} +func (*noopAlerter) Enabled() bool { return false } diff --git a/pkg/alerting/simulate.go b/pkg/alerting/simulate.go index 326b7b5..6d646fa 100644 --- a/pkg/alerting/simulate.go +++ b/pkg/alerting/simulate.go @@ -12,8 +12,8 @@ import ( "dynatron.me/x/stillbox/internal/forms" "dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/trending" - cl "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/rs/zerolog/log" ) @@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) { } // Simulate begins the simulation using the DB handle from ctx. It returns final scores. -func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) { +func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) { now := time.Now() - tgc := cl.NewTalkgroupCache() + tgc := talkgroups.NewCache() s.Enable = true s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter) diff --git a/pkg/alerting/stats.go b/pkg/alerting/stats.go index 80d7777..c8835d9 100644 --- a/pkg/alerting/stats.go +++ b/pkg/alerting/stats.go @@ -7,9 +7,9 @@ import ( "net/http" "time" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/jsontime" @@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) { return } - tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs)) + tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs)) for _, t := range tgs { - tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t + tgMap[talkgroups.ID{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t } renderData := struct { - TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow - Scores trending.Scores[calls.Talkgroup] + TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow + Scores trending.Scores[talkgroups.ID] LastScore time.Time Simulation *Simulation Config config.Alerting diff --git a/pkg/calls/call.go b/pkg/calls/call.go index ea5402f..9b8e78d 100644 --- a/pkg/calls/call.go +++ b/pkg/calls/call.go @@ -130,7 +130,6 @@ func (c *Call) computeLength() (err error) { return nil } -func (c *Call) TalkgroupTuple() talkgroups.Talkgroup { +func (c *Call) TalkgroupTuple() talkgroups.ID { return talkgroups.TG(c.System, c.Talkgroup) } - diff --git a/pkg/calls/filter.go b/pkg/calls/filter.go index a3cf253..ecc1fc6 100644 --- a/pkg/calls/filter.go +++ b/pkg/calls/filter.go @@ -10,13 +10,13 @@ import ( ) type TalkgroupFilter struct { - Talkgroups []tgs.Talkgroup `json:"talkgroups,omitempty"` - TalkgroupsNot []tgs.Talkgroup `json:"talkgroupsNot,omitempty"` - TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` - TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` - TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` + Talkgroups []tgs.ID `json:"talkgroups,omitempty"` + TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"` + TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` + TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` + TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` - talkgroups map[tgs.Talkgroup]bool + talkgroups map[tgs.ID]bool } func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) { @@ -27,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.Talkgroups); l > 0 { - tgf.Talkgroups = make([]tgs.Talkgroup, l) + tgf.Talkgroups = make([]tgs.ID, l) for i, t := range p.Talkgroups { - tgf.Talkgroups[i] = tgs.Talkgroup{ + tgf.Talkgroups[i] = tgs.ID{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -37,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, } if l := len(p.TalkgroupsNot); l > 0 { - tgf.TalkgroupsNot = make([]tgs.Talkgroup, l) + tgf.TalkgroupsNot = make([]tgs.ID, l) for i, t := range p.TalkgroupsNot { - tgf.TalkgroupsNot[i] = tgs.Talkgroup{ + tgf.TalkgroupsNot[i] = tgs.ID{ System: uint32(t.System), Talkgroup: uint32(t.Talkgroup), } @@ -53,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool { return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0 } -func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.Talkgroup]bool { +func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool { return f.talkgroups } func (f *TalkgroupFilter) compile(ctx context.Context) error { - f.talkgroups = make(map[tgs.Talkgroup]bool) + f.talkgroups = make(map[tgs.ID]bool) for _, tg := range f.Talkgroups { f.talkgroups[tg] = true } @@ -71,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error { } for _, tg := range tagTGs { - f.talkgroups[tgs.Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true + f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true } } diff --git a/pkg/server/server.go b/pkg/server/server.go index be6bc56..beff8ca 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -6,7 +6,6 @@ import ( "os" "time" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/alerting" "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/config" @@ -15,6 +14,7 @@ import ( "dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/sinks" "dynatron.me/x/stillbox/pkg/sources" + "dynatron.me/x/stillbox/pkg/talkgroups" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" @@ -35,7 +35,7 @@ type Server struct { alerter alerting.Alerter notifier notify.Notifier hup chan os.Signal - tgCache calls.TalkgroupCache + tgs talkgroups.Store } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { return nil, err } - tgCache := calls.NewTalkgroupCache() + tgCache := talkgroups.NewCache() srv := &Server{ auth: authenticator, @@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { logger: logger, alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, - tgCache: tgCache, + tgs: tgCache, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) diff --git a/pkg/sources/http.go b/pkg/sources/http.go index 4ecefee..32d1a28 100644 --- a/pkg/sources/http.go +++ b/pkg/sources/http.go @@ -7,8 +7,8 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/forms" - "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/auth" + "dynatron.me/x/stillbox/pkg/calls" "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" ) diff --git a/pkg/talkgroups/alertconfig.go b/pkg/talkgroups/alertconfig.go index 73365d6..65837a0 100644 --- a/pkg/talkgroups/alertconfig.go +++ b/pkg/talkgroups/alertconfig.go @@ -8,14 +8,14 @@ import ( "dynatron.me/x/stillbox/internal/trending" ) -type AlertConfig map[Talkgroup][]AlertRule +type AlertConfig map[ID][]AlertRule type AlertRule struct { Times []ruletime.RuleTime `json:"times"` ScoreMultiplier float32 `json:"mult"` } -func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { +func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error { if len(confBytes) == 0 { return nil } @@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { return nil } -func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 { +func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 { s, has := ac[score.ID] if !has { return score.Score diff --git a/pkg/talkgroups/alertconfig_test.go b/pkg/talkgroups/alertconfig_test.go index 785895b..2d26765 100644 --- a/pkg/talkgroups/alertconfig_test.go +++ b/pkg/talkgroups/alertconfig_test.go @@ -18,7 +18,7 @@ func TestAlertConfig(t *testing.T) { ac := make(talkgroups.AlertConfig) parseTests := []struct { name string - tg talkgroups.Talkgroup + tg talkgroups.ID conf string compare []talkgroups.AlertRule expectErr error @@ -78,7 +78,7 @@ func TestAlertConfig(t *testing.T) { evalTests := []struct { name string - tg talkgroups.Talkgroup + tg talkgroups.ID t time.Time origScore float64 expectScore float64 @@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) { for _, tc := range evalTests { t.Run(tc.name, func(t *testing.T) { - cs := trending.Score[talkgroups.Talkgroup]{ + cs := trending.Score[talkgroups.ID]{ ID: tc.tg, Score: tc.origScore, } diff --git a/pkg/talkgroups/talkgroups.go b/pkg/talkgroups/cache.go similarity index 60% rename from pkg/talkgroups/talkgroups.go rename to pkg/talkgroups/cache.go index a537ce9..1a3f89b 100644 --- a/pkg/talkgroups/talkgroups.go +++ b/pkg/talkgroups/cache.go @@ -2,7 +2,6 @@ package talkgroups import ( "context" - "fmt" "sync" "time" @@ -15,49 +14,29 @@ import ( "github.com/rs/zerolog/log" ) -type Talkgroup struct { - System uint32 - Talkgroup uint32 -} +type tgMap map[ID]database.GetTalkgroupWithLearnedByPackedIDsRow -func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup { - return Talkgroup{ - System: uint32(sys), - Talkgroup: uint32(tgid), - } -} +type Store interface { + // TG retrieves a Talkgroup from the Store. It returns the record and whether one was found. + TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) -func (t Talkgroup) Pack() int64 { - // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) - return int64((int64(t.System) << 32) | int64(t.Talkgroup)) -} - -func (t Talkgroup) String() string { - return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) -} - -func PackedTGs(tg []Talkgroup) []int64 { - s := make([]int64, len(tg)) - - for i, v := range tg { - s[i] = v.Pack() - } - - return s -} - -type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow - -type TalkgroupCache interface { - TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) + // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) - ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 - Hint(ctx context.Context, tgs []Talkgroup) error + + // ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score. + ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 + + // Hint hints the Store that the provided talkgroups will be asked for. + Hint(ctx context.Context, tgs []ID) error + + // Load loads the provided packed talkgroup IDs into the Store. Load(ctx context.Context, tgs []int64) error + + // Invalidate invalidates any caching in the Store. Invalidate() } -func (t *talkgroupCache) Invalidate() { +func (t *cache) Invalidate() { t.Lock() defer t.Unlock() clear(t.tgs) @@ -65,15 +44,16 @@ func (t *talkgroupCache) Invalidate() { clear(t.AlertConfig) } -type talkgroupCache struct { +type cache struct { sync.RWMutex AlertConfig tgs tgMap systems map[int32]string } -func NewTalkgroupCache() TalkgroupCache { - tgc := &talkgroupCache{ +// NewCache returns a new cache Store. +func NewCache() Store { + tgc := &cache{ tgs: make(tgMap), systems: make(map[int32]string), AlertConfig: make(AlertConfig), @@ -82,7 +62,7 @@ func NewTalkgroupCache() TalkgroupCache { return tgc } -func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error { +func (t *cache) Hint(ctx context.Context, tgs []ID) error { t.RLock() var toLoad []int64 if len(t.tgs) > len(tgs)/2 { // TODO: instrument this @@ -109,7 +89,7 @@ func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error { return nil } -func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { +func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { tg := TG(rec.SystemID, rec.Tgid) t.tgs[tg] = rec t.systems[rec.SystemID] = rec.SystemName @@ -117,7 +97,7 @@ func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) } -func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error { +func (t *cache) Load(ctx context.Context, tgs []int64) error { tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { return err @@ -137,7 +117,7 @@ func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error { return nil } -func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { +func (t *cache) TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { t.RLock() rec, has := t.tgs[tg] t.RUnlock() @@ -171,7 +151,7 @@ func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalk return recs[0], true } -func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) { +func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) { n, has := t.systems[int32(id)] if !has { diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go new file mode 100644 index 0000000..61253ed --- /dev/null +++ b/pkg/talkgroups/talkgroup.go @@ -0,0 +1,36 @@ +package talkgroups + +import ( + "fmt" +) + +type ID struct { + System uint32 + Talkgroup uint32 +} + +func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID { + return ID{ + System: uint32(sys), + Talkgroup: uint32(tgid), + } +} + +func (t ID) Pack() int64 { + // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) + return int64((int64(t.System) << 32) | int64(t.Talkgroup)) +} + +func (t ID) String() string { + return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) +} + +func PackedTGs(tg []ID) []int64 { + s := make([]int64, len(tg)) + + for i, v := range tg { + s[i] = v.Pack() + } + + return s +} From 5338bb0071679eae13637cee7c025f3ad631e932 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 08:44:34 -0500 Subject: [PATCH 3/4] Use sqlc.embed --- pkg/alerting/alerting.go | 16 +- pkg/alerting/stats.go | 2 +- pkg/database/querier.go | 6 +- pkg/database/talkgroups.sql.go | 248 +++++++++++++--------------- pkg/nexus/commands.go | 16 +- pkg/talkgroups/cache.go | 6 +- sql/postgres/queries/talkgroups.sql | 16 +- 7 files changed, 141 insertions(+), 169 deletions(-) diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index cf7e56e..23f282f 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -170,10 +170,10 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al origScore := s.Score tgr, has := as.tgCache.TG(ctx, s.ID) if has { - if !tgr.Alert { + if !tgr.Talkgroup.Alert { continue } - s.Score *= float64(tgr.Weight) + s.Score *= float64(tgr.Talkgroup.Weight) } if s.Score > as.cfg.AlertThreshold || testMode { @@ -330,15 +330,15 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroup tgRecord, has := as.tgCache.TG(ctx, score.ID) switch has { case true: - d.Weight = tgRecord.Weight - if tgRecord.SystemName == "" { - tgRecord.SystemName = strconv.Itoa(int(score.ID.System)) + d.Weight = tgRecord.Talkgroup.Weight + if tgRecord.System.Name == "" { + tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) } - if tgRecord.Name != nil { - d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup) + if tgRecord.Talkgroup.Name != nil { + d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup) } else { - d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) + d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) } case false: system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) diff --git a/pkg/alerting/stats.go b/pkg/alerting/stats.go index c8835d9..e513edd 100644 --- a/pkg/alerting/stats.go +++ b/pkg/alerting/stats.go @@ -78,7 +78,7 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) { tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs)) for _, t := range tgs { - tgMap[talkgroups.ID{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t + tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t } renderData := struct { diff --git a/pkg/database/querier.go b/pkg/database/querier.go index b911805..4bf2177 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -22,14 +22,14 @@ type Querier interface { GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetDatabaseSize(ctx context.Context) (string, error) GetSystemName(ctx context.Context, systemID int) (string, error) - GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) + GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) - GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) - GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) + GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) + GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) GetUserByID(ctx context.Context, id int32) (User, error) GetUserByUID(ctx context.Context, id int32) (User, error) GetUserByUsername(ctx context.Context, username string) (User, error) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index 0b58f8a..e8c4204 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -31,26 +31,30 @@ func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, erro } const getTalkgroup = `-- name: GetTalkgroup :one -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE id = systg2id($1, $2) ` -func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) { +type GetTalkgroupRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) { row := q.db.QueryRow(ctx, getTalkgroup, systemID, tgid) - var i Talkgroup + var i GetTalkgroupRow err := row.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ) return i, err } @@ -101,9 +105,7 @@ func (q *Queries) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]stri const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -121,39 +123,29 @@ WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE ` type GetTalkgroupWithLearnedRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - SystemName string `json:"system_name"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - AlphaTag *string `json:"alpha_tag"` - Alert bool `json:"alert"` - Weight float32 `json:"weight"` - AlertConfig []byte `json:"alert_config"` - Learned bool `json:"learned"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` } func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) { row := q.db.QueryRow(ctx, getTalkgroupWithLearned, systemID, tgid) var i GetTalkgroupWithLearnedRow err := row.Scan( - &i.ID, - &i.SystemID, - &i.SystemName, - &i.Tgid, - &i.Name, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.AlphaTag, - &i.Alert, - &i.Weight, - &i.AlertConfig, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, &i.Learned, ) return i, err @@ -161,9 +153,7 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -181,20 +171,9 @@ WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRU ` type GetTalkgroupWithLearnedByPackedIDsRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - SystemName string `json:"system_name"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - AlphaTag *string `json:"alpha_tag"` - Alert bool `json:"alert"` - Weight float32 `json:"weight"` - AlertConfig []byte `json:"alert_config"` - Learned bool `json:"learned"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` } func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) { @@ -207,19 +186,20 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar for rows.Next() { var i GetTalkgroupWithLearnedByPackedIDsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.SystemName, - &i.Tgid, - &i.Name, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.AlphaTag, - &i.Alert, - &i.Weight, - &i.AlertConfig, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, &i.Learned, ); err != nil { return nil, err @@ -233,26 +213,14 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar } const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many -SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, sys.id, sys.name FROM talkgroups tg +SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) ` type GetTalkgroupsByPackedIDsRow struct { - ID int64 `json:"id"` - SystemID int32 `json:"system_id"` - Tgid int32 `json:"tgid"` - Name *string `json:"name"` - AlphaTag *string `json:"alpha_tag"` - TgGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata []byte `json:"metadata"` - Tags []string `json:"tags"` - Alert bool `json:"alert"` - AlertConfig []byte `json:"alert_config"` - Weight float32 `json:"weight"` - ID_2 int `json:"id_2"` - Name_2 string `json:"name_2"` + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` } func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) { @@ -265,20 +233,20 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64 for rows.Next() { var i GetTalkgroupsByPackedIDsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, - &i.ID_2, - &i.Name_2, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, ); err != nil { return nil, err } @@ -291,32 +259,36 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64 } const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE tags && ARRAY[$1] ` -func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) { +type GetTalkgroupsWithAllTagsRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) { rows, err := q.db.Query(ctx, getTalkgroupsWithAllTags, tags) if err != nil { return nil, err } defer rows.Close() - var items []Talkgroup + var items []GetTalkgroupsWithAllTagsRow for rows.Next() { - var i Talkgroup + var i GetTalkgroupsWithAllTagsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ); err != nil { return nil, err } @@ -329,32 +301,36 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ( } const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many -SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups +SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups WHERE tags @> ARRAY[$1] ` -func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) { +type GetTalkgroupsWithAnyTagsRow struct { + Talkgroup Talkgroup `json:"talkgroup"` +} + +func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) { rows, err := q.db.Query(ctx, getTalkgroupsWithAnyTags, tags) if err != nil { return nil, err } defer rows.Close() - var items []Talkgroup + var items []GetTalkgroupsWithAnyTagsRow for rows.Next() { - var i Talkgroup + var i GetTalkgroupsWithAnyTagsRow if err := rows.Scan( - &i.ID, - &i.SystemID, - &i.Tgid, - &i.Name, - &i.AlphaTag, - &i.TgGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, ); err != nil { return nil, err } diff --git a/pkg/nexus/commands.go b/pkg/nexus/commands.go index 6d5a55f..d9b9180 100644 --- a/pkg/nexus/commands.go +++ b/pkg/nexus/commands.go @@ -71,9 +71,9 @@ func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { } var md *structpb.Struct - if len(tgi.Metadata) > 0 { + if len(tgi.Talkgroup.Metadata) > 0 { m := make(map[string]interface{}) - err := json.Unmarshal(tgi.Metadata, &m) + err := json.Unmarshal(tgi.Talkgroup.Metadata, &m) if err != nil { log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("unmarshal tg metadata") } @@ -85,14 +85,14 @@ func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { resp := &pb.TalkgroupInfo{ Tg: tg, - Name: tgi.Name, - Group: tgi.TgGroup, - Frequency: tgi.Frequency, + Name: tgi.Talkgroup.Name, + Group: tgi.Talkgroup.TgGroup, + Frequency: tgi.Talkgroup.Frequency, Metadata: md, - Tags: tgi.Tags, + Tags: tgi.Talkgroup.Tags, Learned: tgi.Learned, - AlphaTag: tgi.AlphaTag, - SystemName: tgi.SystemName, + AlphaTag: tgi.Talkgroup.AlphaTag, + SystemName: tgi.System.Name, } _ = c.Send(&pb.Message{ diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 1a3f89b..8c59da3 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -90,11 +90,11 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error { } func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { - tg := TG(rec.SystemID, rec.Tgid) + tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid)) t.tgs[tg] = rec - t.systems[rec.SystemID] = rec.SystemName + t.systems[int32(rec.System.ID)] = rec.System.Name - return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) + return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig) } func (t *cache) Load(ctx context.Context, tgs []int64) error { diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 099e983..49783d1 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -1,9 +1,9 @@ -- name: GetTalkgroupsWithAnyTags :many -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE tags @> ARRAY[$1]; -- name: GetTalkgroupsWithAllTags :many -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE tags && ARRAY[$1]; -- name: GetTalkgroupIDsByTags :many @@ -25,19 +25,17 @@ UPDATE talkgroups SET tags = $2 WHERE id = ANY($1); -- name: GetTalkgroup :one -SELECT * FROM talkgroups +SELECT sqlc.embed(talkgroups) FROM talkgroups WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)); -- name: GetTalkgroupsByPackedIDs :many -SELECT * FROM talkgroups tg +SELECT sqlc.embed(tg), sqlc.embed(sys) FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]); -- name: GetTalkgroupWithLearned :one SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +sqlc.embed(tg), sqlc.embed(sys), FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -55,9 +53,7 @@ WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND igno -- name: GetTalkgroupWithLearnedByPackedIDs :many SELECT -tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, -tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, -tg.alert, tg.weight, tg.alert_config, +sqlc.embed(tg), sqlc.embed(sys), FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id From d917e13c4f79f7777406bca9d5f2e09526f98695 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 09:45:51 -0500 Subject: [PATCH 4/4] Embed --- pkg/alerting/alerting.go | 12 +++---- pkg/database/talkgroups.sql.go | 16 ++++----- pkg/database/talkgroups.sql_test.go | 49 ++++++++++++++++++++++++++ pkg/nexus/commands.go | 8 ++--- pkg/server/server.go | 1 + pkg/talkgroups/cache.go | 54 ++++++++++++++++++++++------- pkg/talkgroups/talkgroup.go | 8 +++++ sql/postgres/queries/talkgroups.sql | 16 ++++----- 8 files changed, 124 insertions(+), 40 deletions(-) create mode 100644 pkg/database/talkgroups.sql_test.go diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 23f282f..5d5d09d 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -168,8 +168,8 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al var notifications []Alert for _, s := range as.scores { origScore := s.Score - tgr, has := as.tgCache.TG(ctx, s.ID) - if has { + tgr, err := as.tgCache.TG(ctx, s.ID) + if err == nil { if !tgr.Talkgroup.Alert { continue } @@ -327,9 +327,9 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroup OrigScore: origScore, } - tgRecord, has := as.tgCache.TG(ctx, score.ID) - switch has { - case true: + tgRecord, err := as.tgCache.TG(ctx, score.ID) + switch err { + case nil: d.Weight = tgRecord.Talkgroup.Weight if tgRecord.System.Name == "" { tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) @@ -340,7 +340,7 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroup } else { d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) } - case false: + default: system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) if has { d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index e8c4204..8700be2 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -112,10 +112,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id($1, $2) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -160,10 +160,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id diff --git a/pkg/database/talkgroups.sql_test.go b/pkg/database/talkgroups.sql_test.go new file mode 100644 index 0000000..cc39c55 --- /dev/null +++ b/pkg/database/talkgroups.sql_test.go @@ -0,0 +1,49 @@ +package database + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const getTalkgroupWithLearnedByPackedIDsTest = `-- name: GetTalkgroupWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE +` +const getTalkgroupWithLearnedTest = `-- name: GetTalkgroupWithLearned :one +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = systg2id($1, $2) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE +` + +func TestQueryColumnsMatch(t *testing.T) { + require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) + require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned) +} diff --git a/pkg/nexus/commands.go b/pkg/nexus/commands.go index d9b9180..6842d83 100644 --- a/pkg/nexus/commands.go +++ b/pkg/nexus/commands.go @@ -5,10 +5,9 @@ import ( "encoding/json" "dynatron.me/x/stillbox/pkg/calls" - "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/pb" + "dynatron.me/x/stillbox/pkg/talkgroups" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" "google.golang.org/protobuf/types/known/structpb" ) @@ -61,10 +60,9 @@ func (c *client) SendError(cmd *pb.Command, err error) { } func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { - db := database.FromCtx(ctx) - tgi, err := db.GetTalkgroupWithLearned(ctx, int(tg.System), int(tg.Talkgroup)) + tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup)) if err != nil { - if err != pgx.ErrNoRows { + if err != talkgroups.ErrNoTG { log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail") } return err diff --git a/pkg/server/server.go b/pkg/server/server.go index beff8ca..fb0c23b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -104,6 +104,7 @@ func (s *Server) Go(ctx context.Context) error { s.installHupHandler() ctx = database.CtxWithDB(ctx, s.db) + ctx = talkgroups.CtxWithStore(ctx, s.tgs) httpSrv := &http.Server{ Addr: s.conf.Listen, diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 8c59da3..bc60417 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -2,6 +2,7 @@ package talkgroups import ( "context" + "errors" "sync" "time" @@ -14,11 +15,11 @@ import ( "github.com/rs/zerolog/log" ) -type tgMap map[ID]database.GetTalkgroupWithLearnedByPackedIDsRow +type tgMap map[ID]Talkgroup type Store interface { - // TG retrieves a Talkgroup from the Store. It returns the record and whether one was found. - TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) + // TG retrieves a Talkgroup from the Store. + TG(ctx context.Context, tg ID) (Talkgroup, error) // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) @@ -36,6 +37,23 @@ type Store interface { Invalidate() } +type CtxStoreKeyT string + +const CtxStoreKey CtxStoreKeyT = "store" + +func CtxWithStore(ctx context.Context, s Store) context.Context { + return context.WithValue(ctx, CtxStoreKey, s) +} + +func StoreFrom(ctx context.Context) Store { + s, ok := ctx.Value(CtxStoreKey).(Store) + if !ok { + return NewCache() + } + + return s +} + func (t *cache) Invalidate() { t.Lock() defer t.Unlock() @@ -89,7 +107,7 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error { return nil } -func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { +func (t *cache) add(rec Talkgroup) error { tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid)) t.tgs[tg] = rec t.systems[int32(rec.System.ID)] = rec.System.Name @@ -97,6 +115,14 @@ func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig) } +func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup { + return Talkgroup{ + Talkgroup: r.Talkgroup, + System: r.System, + Learned: r.Learned, + } +} + func (t *cache) Load(ctx context.Context, tgs []int64) error { tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { @@ -107,7 +133,7 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error { defer t.Unlock() for _, rec := range tgRecords { - err := t.add(rec) + err := t.add(rowToTalkgroup(rec)) if err != nil { log.Error().Err(err).Msg("add alert config fail") @@ -117,38 +143,40 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error { return nil } -func (t *cache) TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { +var ErrNoTG = errors.New("talkgroup not found") + +func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { t.RLock() rec, has := t.tgs[tg] t.RUnlock() if has { - return rec, has + return rec, nil } recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) switch err { case nil: case pgx.ErrNoRows: - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, ErrNoTG default: log.Error().Err(err).Msg("TG() cache add db get") - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, errors.Join(ErrNoTG, err) } if len(recs) < 1 { - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, ErrNoTG } t.Lock() defer t.Unlock() - err = t.add(recs[0]) + err = t.add(rowToTalkgroup(recs[0])) if err != nil { log.Error().Err(err).Msg("TG() cache add") - return recs[0], false + return rowToTalkgroup(recs[0]), errors.Join(ErrNoTG, err) } - return recs[0], true + return rowToTalkgroup(recs[0]), nil } func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) { diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index 61253ed..bf68dfa 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -2,8 +2,16 @@ package talkgroups import ( "fmt" + + "dynatron.me/x/stillbox/pkg/database" ) +type Talkgroup struct { + database.Talkgroup + System database.System `json:"system"` + Learned bool `json:"learned"` +} + type ID struct { System uint32 Talkgroup uint32 diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 49783d1..a2ab786 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -42,10 +42,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -60,10 +60,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id