diff --git a/internal/forms/forms.go b/internal/forms/forms.go index d4bd8a1..facb68c 100644 --- a/internal/forms/forms.go +++ b/internal/forms/forms.go @@ -294,7 +294,6 @@ func Unmarshal(r *http.Request, dest any, opt ...Option) error { return ErrNotStruct } - if strings.HasPrefix(r.Header.Get("Content-Type"), "application/x-www-form-urlencoded") { err := r.ParseForm() if err != nil { diff --git a/pkg/calls/filter.go b/pkg/calls/filter.go index 62a8c64..cdeb3da 100644 --- a/pkg/calls/filter.go +++ b/pkg/calls/filter.go @@ -7,20 +7,6 @@ import ( "dynatron.me/x/stillbox/pkg/pb" ) -type Talkgroup struct { - System uint32 - Talkgroup uint32 -} - -func (c *Call) TalkgroupTuple() Talkgroup { - return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)} -} - -func (t Talkgroup) Pack() int64 { - // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) - return int64((int64(t.System) << 32) | int64(t.Talkgroup)) -} - type TalkgroupFilter struct { Talkgroups []Talkgroup `json:"talkgroups,omitempty"` TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"` @@ -61,16 +47,6 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, return tgf, tgf.compile(ctx) } -func PackedTGs(tg []Talkgroup) []int64 { - s := make([]int64, len(tg)) - - for i, v := range tg { - s[i] = v.Pack() - } - - return s -} - func (f *TalkgroupFilter) hasTags() bool { return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0 } diff --git a/pkg/calls/talkgroups.go b/pkg/calls/talkgroups.go new file mode 100644 index 0000000..28de88b --- /dev/null +++ b/pkg/calls/talkgroups.go @@ -0,0 +1,78 @@ +package calls + +import ( + "context" + + "dynatron.me/x/stillbox/pkg/gordio/database" +) + +type Talkgroup struct { + System uint32 + Talkgroup uint32 +} + +func (c *Call) TalkgroupTuple() Talkgroup { + return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)} +} + +func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup { + return Talkgroup{ + System: uint32(sys), + Talkgroup: uint32(tgid), + } +} + +func (t Talkgroup) Pack() int64 { + // P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8) + return int64((int64(t.System) << 32) | int64(t.Talkgroup)) +} + +func PackedTGs(tg []Talkgroup) []int64 { + s := make([]int64, len(tg)) + + for i, v := range tg { + s[i] = v.Pack() + } + + return s +} + +type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow +type TalkgroupCache struct { + tgs tgMap + systems map[int32]string +} + +func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache, error) { + tgc := &TalkgroupCache{ + tgs: make(tgMap), + systems: make(map[int32]string), + } + + return tgc, tgc.LoadTGs(ctx, packedTgs) +} + +func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error { + tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, packedTgs) + if err != nil { + return err + } + + for _, rec := range tgRecords { + t.tgs[TG(rec.SystemID, rec.Tgid)] = rec + t.systems[rec.SystemID] = rec.SystemName + } + + return nil +} + +func (t *TalkgroupCache) TG(tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { + rec, has := t.tgs[tg] + + return rec, has +} + +func (t *TalkgroupCache) SystemName(id int) (string, bool) { + n, has := t.systems[int32(id)] + return n, has +} diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index 2136f90..13e88c9 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -20,7 +20,6 @@ import ( "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" ) @@ -123,13 +122,13 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { func (as *alerter) Go(ctx context.Context) { as.startBackfill(ctx) - as.score(ctx, time.Now()) + as.score(time.Now()) ticker := time.NewTicker(alerterTickInterval) for { select { case now := <-ticker.C: - as.score(ctx, now) + as.score(now) err := as.notify(ctx) if err != nil { log.Error().Err(err).Msg("notify") @@ -144,8 +143,9 @@ func (as *alerter) Go(ctx context.Context) { } const notificationTemplStr = `{{ range . -}} -{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }}) -{{ end }}` +{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }}/{{ .Score.Count }} recent calls) + +{{ end -}}` var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) @@ -155,8 +155,15 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { ns := make([]notification, 0, len(as.scores)) ctx := r.Context() + tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + if err != nil { + log.Error().Err(err).Msg("test notificaiton tg cache") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + for _, s := range as.scores { - n, err := makeNotification(ctx, s) + n, err := makeNotification(tgc, s) if err != nil { log.Error().Err(err).Msg("test notificaiton") http.Error(w, err.Error(), http.StatusInternalServerError) @@ -165,7 +172,7 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { ns = append(ns, n) } - err := as.sendNotification(ctx, ns) + err = as.sendNotification(ctx, ns) if err != nil { log.Error().Err(err).Msg("test notification send") http.Error(w, err.Error(), http.StatusInternalServerError) @@ -175,6 +182,16 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Sent")) } +// packedScoredTGs gets a packed list of TG IDs for DB use. +func (as *alerter) packedScoredTGs() []int64 { + packedTGs := make([]int64, 0, len(as.scores)) + for _, s := range as.scores { + packedTGs = append(packedTGs, s.ID.Pack()) + } + + return packedTGs +} + // notify iterates the scores and sends out any necessary notifications func (as *alerter) notify(ctx context.Context) error { if as.notifier == nil { @@ -186,12 +203,24 @@ func (as *alerter) notify(ctx context.Context) error { as.Lock() defer as.Unlock() + tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + if err != nil { + return err + } + var notifications []notification for _, s := range as.scores { + tgr, has := tgc.TG(s.ID) + if has { + if !tgr.Notify { + continue + } + s.Score *= float64(tgr.Weight) + } if s.Score > as.cfg.AlertThreshold { if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify { as.notifyCache[s.ID] = time.Now() - n, err := makeNotification(ctx, s) + n, err := makeNotification(tgc, s) if err != nil { return err } @@ -229,36 +258,30 @@ func (as *alerter) sendNotification(ctx context.Context, n []notification) error // makeNotification creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) { +func makeNotification(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (notification, error) { d := notification{ - Score: tg, + Score: score, } - db := database.FromCtx(ctx) - tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup)) - switch err { - case nil: + tgRecord, has := tgs.TG(score.ID) + switch has { + case true: if tgRecord.SystemName == "" { - tgRecord.SystemName = strconv.Itoa(int(tg.ID.System)) + tgRecord.SystemName = strconv.Itoa(int(score.ID.System)) } if tgRecord.Name != nil { - d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name) + d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup) } else { - d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup)) + d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) } - case pgx.ErrNoRows: - system, err := db.GetSystemName(ctx, int(tg.ID.System)) - switch err { - case nil: - d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup)) - case pgx.ErrNoRows: - d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup)) - default: - return d, fmt.Errorf("sendNotification get system: %w", err) + case false: + system, has := tgs.SystemName(int(score.ID.System)) + if has { + d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) + } else { + d.TGName = fmt.Sprintf("%d:%d", int(score.ID.System), int(score.ID.Talkgroup)) } - default: - return d, fmt.Errorf("sendNotification get talkgroup: %w", err) } return d, nil @@ -307,7 +330,7 @@ func (as *alerter) startBackfill(ctx context.Context) error { return nil } -func (as *alerter) score(ctx context.Context, now time.Time) { +func (as *alerter) score(now time.Time) { as.Lock() defer as.Unlock() diff --git a/pkg/gordio/alerting/stats.go b/pkg/gordio/alerting/stats.go index 0efdcd3..c6b1180 100644 --- a/pkg/gordio/alerting/stats.go +++ b/pkg/gordio/alerting/stats.go @@ -69,12 +69,7 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() db := database.FromCtx(ctx) - packed := make([]int64, 0, len(as.scores)) - for _, s := range as.scores { - packed = append(packed, s.ID.Pack()) - } - - tgs, err := db.GetTalkgroupsByPackedIDs(ctx, packed) + tgs, err := db.GetTalkgroupsByPackedIDs(ctx, as.packedScoredTGs()) if err != nil { log.Error().Err(err).Msg("stats TG get failed") http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/gordio/database/querier.go b/pkg/gordio/database/querier.go index 9fbbd17..cccc758 100644 --- a/pkg/gordio/database/querier.go +++ b/pkg/gordio/database/querier.go @@ -25,6 +25,7 @@ type Querier interface { GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) + GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) diff --git a/pkg/gordio/database/talkgroups.sql.go b/pkg/gordio/database/talkgroups.sql.go index 2d2c5ff..59e6aa0 100644 --- a/pkg/gordio/database/talkgroups.sql.go +++ b/pkg/gordio/database/talkgroups.sql.go @@ -102,6 +102,7 @@ const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one SELECT tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, +tg.notify, tg.weight, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -110,7 +111,7 @@ UNION SELECT tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id @@ -128,6 +129,8 @@ type GetTalkgroupWithLearnedRow struct { Metadata []byte `json:"metadata"` Tags []string `json:"tags"` AlphaTag *string `json:"alpha_tag"` + Notify bool `json:"notify"` + Weight float32 `json:"weight"` Learned bool `json:"learned"` } @@ -145,11 +148,83 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi &i.Metadata, &i.Tags, &i.AlphaTag, + &i.Notify, + &i.Weight, &i.Learned, ) return i, err } +const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, +tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, +tg.notify, tg.weight, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE +` + +type GetTalkgroupWithLearnedByPackedIDsRow struct { + ID int64 `json:"id"` + SystemID int32 `json:"system_id"` + SystemName string `json:"system_name"` + Tgid int32 `json:"tgid"` + Name *string `json:"name"` + TgGroup *string `json:"tg_group"` + Frequency *int32 `json:"frequency"` + Metadata []byte `json:"metadata"` + Tags []string `json:"tags"` + AlphaTag *string `json:"alpha_tag"` + Notify bool `json:"notify"` + Weight float32 `json:"weight"` + Learned bool `json:"learned"` +} + +func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) { + rows, err := q.db.Query(ctx, getTalkgroupWithLearnedByPackedIDs, dollar_1) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTalkgroupWithLearnedByPackedIDsRow + for rows.Next() { + var i GetTalkgroupWithLearnedByPackedIDsRow + if err := rows.Scan( + &i.ID, + &i.SystemID, + &i.SystemName, + &i.Tgid, + &i.Name, + &i.TgGroup, + &i.Frequency, + &i.Metadata, + &i.Tags, + &i.AlphaTag, + &i.Notify, + &i.Weight, + &i.Learned, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight, sys.id, sys.name FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 18f6dfb..80a858b 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -37,6 +37,7 @@ WHERE tg.id = ANY($1::INT8[]); SELECT tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, +tg.notify, tg.weight, FALSE learned FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -45,11 +46,30 @@ UNION SELECT tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0, TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE; +-- name: GetTalkgroupWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, +tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag, +tg.notify, tg.weight, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE; + -- name: GetSystemName :one SELECT name FROM systems WHERE id = sqlc.arg(system_id);