Implement tgcache

This commit is contained in:
Daniel Ponte 2024-10-31 16:14:38 -04:00
parent 80badd0f00
commit 47c1beec4a
8 changed files with 229 additions and 62 deletions

View file

@ -294,7 +294,6 @@ func Unmarshal(r *http.Request, dest any, opt ...Option) error {
return ErrNotStruct
}
if strings.HasPrefix(r.Header.Get("Content-Type"), "application/x-www-form-urlencoded") {
err := r.ParseForm()
if err != nil {

View file

@ -7,20 +7,6 @@ import (
"dynatron.me/x/stillbox/pkg/pb"
)
type Talkgroup struct {
System uint32
Talkgroup uint32
}
func (c *Call) TalkgroupTuple() Talkgroup {
return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)}
}
func (t Talkgroup) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
type TalkgroupFilter struct {
Talkgroups []Talkgroup `json:"talkgroups,omitempty"`
TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"`
@ -61,16 +47,6 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
return tgf, tgf.compile(ctx)
}
func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}
func (f *TalkgroupFilter) hasTags() bool {
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
}

78
pkg/calls/talkgroups.go Normal file
View file

@ -0,0 +1,78 @@
package calls
import (
"context"
"dynatron.me/x/stillbox/pkg/gordio/database"
)
type Talkgroup struct {
System uint32
Talkgroup uint32
}
func (c *Call) TalkgroupTuple() Talkgroup {
return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)}
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
return Talkgroup{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t Talkgroup) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
type TalkgroupCache struct {
tgs tgMap
systems map[int32]string
}
func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache, error) {
tgc := &TalkgroupCache{
tgs: make(tgMap),
systems: make(map[int32]string),
}
return tgc, tgc.LoadTGs(ctx, packedTgs)
}
func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, packedTgs)
if err != nil {
return err
}
for _, rec := range tgRecords {
t.tgs[TG(rec.SystemID, rec.Tgid)] = rec
t.systems[rec.SystemID] = rec.SystemName
}
return nil
}
func (t *TalkgroupCache) TG(tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
rec, has := t.tgs[tg]
return rec, has
}
func (t *TalkgroupCache) SystemName(id int) (string, bool) {
n, has := t.systems[int32(id)]
return n, has
}

View file

@ -20,7 +20,6 @@ import (
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
@ -123,13 +122,13 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter {
func (as *alerter) Go(ctx context.Context) {
as.startBackfill(ctx)
as.score(ctx, time.Now())
as.score(time.Now())
ticker := time.NewTicker(alerterTickInterval)
for {
select {
case now := <-ticker.C:
as.score(ctx, now)
as.score(now)
err := as.notify(ctx)
if err != nil {
log.Error().Err(err).Msg("notify")
@ -144,8 +143,9 @@ func (as *alerter) Go(ctx context.Context) {
}
const notificationTemplStr = `{{ range . -}}
{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }})
{{ end }}`
{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }}/{{ .Score.Count }} recent calls)
{{ end -}}`
var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr))
@ -155,8 +155,15 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
ns := make([]notification, 0, len(as.scores))
ctx := r.Context()
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs())
if err != nil {
log.Error().Err(err).Msg("test notificaiton tg cache")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for _, s := range as.scores {
n, err := makeNotification(ctx, s)
n, err := makeNotification(tgc, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -165,7 +172,7 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
ns = append(ns, n)
}
err := as.sendNotification(ctx, ns)
err = as.sendNotification(ctx, ns)
if err != nil {
log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -175,6 +182,16 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Sent"))
}
// packedScoredTGs gets a packed list of TG IDs for DB use.
func (as *alerter) packedScoredTGs() []int64 {
packedTGs := make([]int64, 0, len(as.scores))
for _, s := range as.scores {
packedTGs = append(packedTGs, s.ID.Pack())
}
return packedTGs
}
// notify iterates the scores and sends out any necessary notifications
func (as *alerter) notify(ctx context.Context) error {
if as.notifier == nil {
@ -186,12 +203,24 @@ func (as *alerter) notify(ctx context.Context) error {
as.Lock()
defer as.Unlock()
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs())
if err != nil {
return err
}
var notifications []notification
for _, s := range as.scores {
tgr, has := tgc.TG(s.ID)
if has {
if !tgr.Notify {
continue
}
s.Score *= float64(tgr.Weight)
}
if s.Score > as.cfg.AlertThreshold {
if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify {
as.notifyCache[s.ID] = time.Now()
n, err := makeNotification(ctx, s)
n, err := makeNotification(tgc, s)
if err != nil {
return err
}
@ -229,36 +258,30 @@ func (as *alerter) sendNotification(ctx context.Context, n []notification) error
// makeNotification creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) {
func makeNotification(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (notification, error) {
d := notification{
Score: tg,
Score: score,
}
db := database.FromCtx(ctx)
tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup))
switch err {
case nil:
tgRecord, has := tgs.TG(score.ID)
switch has {
case true:
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(tg.ID.System))
tgRecord.SystemName = strconv.Itoa(int(score.ID.System))
}
if tgRecord.Name != nil {
d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name)
d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup))
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup))
}
case pgx.ErrNoRows:
system, err := db.GetSystemName(ctx, int(tg.ID.System))
switch err {
case nil:
d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup))
case pgx.ErrNoRows:
d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup))
default:
return d, fmt.Errorf("sendNotification get system: %w", err)
case false:
system, has := tgs.SystemName(int(score.ID.System))
if has {
d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup))
} else {
d.TGName = fmt.Sprintf("%d:%d", int(score.ID.System), int(score.ID.Talkgroup))
}
default:
return d, fmt.Errorf("sendNotification get talkgroup: %w", err)
}
return d, nil
@ -307,7 +330,7 @@ func (as *alerter) startBackfill(ctx context.Context) error {
return nil
}
func (as *alerter) score(ctx context.Context, now time.Time) {
func (as *alerter) score(now time.Time) {
as.Lock()
defer as.Unlock()

View file

@ -69,12 +69,7 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
db := database.FromCtx(ctx)
packed := make([]int64, 0, len(as.scores))
for _, s := range as.scores {
packed = append(packed, s.ID.Pack())
}
tgs, err := db.GetTalkgroupsByPackedIDs(ctx, packed)
tgs, err := db.GetTalkgroupsByPackedIDs(ctx, as.packedScoredTGs())
if err != nil {
log.Error().Err(err).Msg("stats TG get failed")
http.Error(w, err.Error(), http.StatusInternalServerError)

View file

@ -25,6 +25,7 @@ type Querier interface {
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)
GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error)
GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error)
GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error)

View file

@ -102,6 +102,7 @@ const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -110,7 +111,7 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -128,6 +129,8 @@ type GetTalkgroupWithLearnedRow struct {
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Notify bool `json:"notify"`
Weight float32 `json:"weight"`
Learned bool `json:"learned"`
}
@ -145,11 +148,83 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Notify,
&i.Weight,
&i.Learned,
)
return i, err
}
const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE
`
type GetTalkgroupWithLearnedByPackedIDsRow struct {
ID int64 `json:"id"`
SystemID int32 `json:"system_id"`
SystemName string `json:"system_name"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Notify bool `json:"notify"`
Weight float32 `json:"weight"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupWithLearnedByPackedIDs, dollar_1)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupWithLearnedByPackedIDsRow
for rows.Next() {
var i GetTalkgroupWithLearnedByPackedIDsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.SystemName,
&i.Tgid,
&i.Name,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Notify,
&i.Weight,
&i.Learned,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id

View file

@ -37,6 +37,7 @@ WHERE tg.id = ANY($1::INT8[]);
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -45,11 +46,30 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE;
-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE;
-- name: GetSystemName :one
SELECT name FROM systems WHERE id = sqlc.arg(system_id);