From d917e13c4f79f7777406bca9d5f2e09526f98695 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 09:45:51 -0500 Subject: [PATCH] Embed --- pkg/alerting/alerting.go | 12 +++---- pkg/database/talkgroups.sql.go | 16 ++++----- pkg/database/talkgroups.sql_test.go | 49 ++++++++++++++++++++++++++ pkg/nexus/commands.go | 8 ++--- pkg/server/server.go | 1 + pkg/talkgroups/cache.go | 54 ++++++++++++++++++++++------- pkg/talkgroups/talkgroup.go | 8 +++++ sql/postgres/queries/talkgroups.sql | 16 ++++----- 8 files changed, 124 insertions(+), 40 deletions(-) create mode 100644 pkg/database/talkgroups.sql_test.go diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 23f282f..5d5d09d 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -168,8 +168,8 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al var notifications []Alert for _, s := range as.scores { origScore := s.Score - tgr, has := as.tgCache.TG(ctx, s.ID) - if has { + tgr, err := as.tgCache.TG(ctx, s.ID) + if err == nil { if !tgr.Talkgroup.Alert { continue } @@ -327,9 +327,9 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroup OrigScore: origScore, } - tgRecord, has := as.tgCache.TG(ctx, score.ID) - switch has { - case true: + tgRecord, err := as.tgCache.TG(ctx, score.ID) + switch err { + case nil: d.Weight = tgRecord.Talkgroup.Weight if tgRecord.System.Name == "" { tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) @@ -340,7 +340,7 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroup } else { d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) } - case false: + default: system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) if has { d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index e8c4204..8700be2 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -112,10 +112,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id($1, $2) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -160,10 +160,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id diff --git a/pkg/database/talkgroups.sql_test.go b/pkg/database/talkgroups.sql_test.go new file mode 100644 index 0000000..cc39c55 --- /dev/null +++ b/pkg/database/talkgroups.sql_test.go @@ -0,0 +1,49 @@ +package database + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +const getTalkgroupWithLearnedByPackedIDsTest = `-- name: GetTalkgroupWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE +` +const getTalkgroupWithLearnedTest = `-- name: GetTalkgroupWithLearned :one +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = systg2id($1, $2) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE +` + +func TestQueryColumnsMatch(t *testing.T) { + require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) + require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned) +} diff --git a/pkg/nexus/commands.go b/pkg/nexus/commands.go index d9b9180..6842d83 100644 --- a/pkg/nexus/commands.go +++ b/pkg/nexus/commands.go @@ -5,10 +5,9 @@ import ( "encoding/json" "dynatron.me/x/stillbox/pkg/calls" - "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/pb" + "dynatron.me/x/stillbox/pkg/talkgroups" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" "google.golang.org/protobuf/types/known/structpb" ) @@ -61,10 +60,9 @@ func (c *client) SendError(cmd *pb.Command, err error) { } func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { - db := database.FromCtx(ctx) - tgi, err := db.GetTalkgroupWithLearned(ctx, int(tg.System), int(tg.Talkgroup)) + tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup)) if err != nil { - if err != pgx.ErrNoRows { + if err != talkgroups.ErrNoTG { log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail") } return err diff --git a/pkg/server/server.go b/pkg/server/server.go index beff8ca..fb0c23b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -104,6 +104,7 @@ func (s *Server) Go(ctx context.Context) error { s.installHupHandler() ctx = database.CtxWithDB(ctx, s.db) + ctx = talkgroups.CtxWithStore(ctx, s.tgs) httpSrv := &http.Server{ Addr: s.conf.Listen, diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 8c59da3..bc60417 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -2,6 +2,7 @@ package talkgroups import ( "context" + "errors" "sync" "time" @@ -14,11 +15,11 @@ import ( "github.com/rs/zerolog/log" ) -type tgMap map[ID]database.GetTalkgroupWithLearnedByPackedIDsRow +type tgMap map[ID]Talkgroup type Store interface { - // TG retrieves a Talkgroup from the Store. It returns the record and whether one was found. - TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) + // TG retrieves a Talkgroup from the Store. + TG(ctx context.Context, tg ID) (Talkgroup, error) // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) @@ -36,6 +37,23 @@ type Store interface { Invalidate() } +type CtxStoreKeyT string + +const CtxStoreKey CtxStoreKeyT = "store" + +func CtxWithStore(ctx context.Context, s Store) context.Context { + return context.WithValue(ctx, CtxStoreKey, s) +} + +func StoreFrom(ctx context.Context) Store { + s, ok := ctx.Value(CtxStoreKey).(Store) + if !ok { + return NewCache() + } + + return s +} + func (t *cache) Invalidate() { t.Lock() defer t.Unlock() @@ -89,7 +107,7 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error { return nil } -func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { +func (t *cache) add(rec Talkgroup) error { tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid)) t.tgs[tg] = rec t.systems[int32(rec.System.ID)] = rec.System.Name @@ -97,6 +115,14 @@ func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig) } +func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup { + return Talkgroup{ + Talkgroup: r.Talkgroup, + System: r.System, + Learned: r.Learned, + } +} + func (t *cache) Load(ctx context.Context, tgs []int64) error { tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { @@ -107,7 +133,7 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error { defer t.Unlock() for _, rec := range tgRecords { - err := t.add(rec) + err := t.add(rowToTalkgroup(rec)) if err != nil { log.Error().Err(err).Msg("add alert config fail") @@ -117,38 +143,40 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error { return nil } -func (t *cache) TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { +var ErrNoTG = errors.New("talkgroup not found") + +func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { t.RLock() rec, has := t.tgs[tg] t.RUnlock() if has { - return rec, has + return rec, nil } recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) switch err { case nil: case pgx.ErrNoRows: - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, ErrNoTG default: log.Error().Err(err).Msg("TG() cache add db get") - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, errors.Join(ErrNoTG, err) } if len(recs) < 1 { - return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + return Talkgroup{}, ErrNoTG } t.Lock() defer t.Unlock() - err = t.add(recs[0]) + err = t.add(rowToTalkgroup(recs[0])) if err != nil { log.Error().Err(err).Msg("TG() cache add") - return recs[0], false + return rowToTalkgroup(recs[0]), errors.Join(ErrNoTG, err) } - return recs[0], true + return rowToTalkgroup(recs[0]), nil } func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) { diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index 61253ed..bf68dfa 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -2,8 +2,16 @@ package talkgroups import ( "fmt" + + "dynatron.me/x/stillbox/pkg/database" ) +type Talkgroup struct { + database.Talkgroup + System database.System `json:"system"` + Learned bool `json:"learned"` +} + type ID struct { System uint32 Talkgroup uint32 diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 49783d1..a2ab786 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -42,10 +42,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -60,10 +60,10 @@ JOIN systems sys ON tg.system_id = sys.id WHERE tg.id = ANY($1::INT8[]) UNION SELECT -tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, -TRUE, 1.0, NULL::JSONB, +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id