Add alerts to table

This commit is contained in:
Daniel Ponte 2024-11-01 09:15:39 -04:00
parent 489e72b17a
commit f54d0d250a
8 changed files with 195 additions and 105 deletions

View file

@ -20,6 +20,8 @@ import (
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"
)
@ -48,7 +50,7 @@ type alerter struct {
scores trending.Scores[cl.Talkgroup]
lastScore time.Time
sim *Simulation
notifyCache map[cl.Talkgroup]time.Time
alertCache map[cl.Talkgroup]Alert
renotify time.Duration
notifier notify.Notifier
}
@ -92,7 +94,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter {
as := &alerter{
cfg: cfg,
notifyCache: make(map[cl.Talkgroup]time.Time),
alertCache: make(map[cl.Talkgroup]Alert),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
}
@ -155,7 +157,7 @@ var notificationTemplate = template.Must(template.New("notification").Funcs(func
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
as.RLock()
defer as.RUnlock()
ns := make([]notification, 0, len(as.scores))
alerts := make([]Alert, 0, len(as.scores))
ctx := r.Context()
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs())
@ -166,16 +168,16 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
}
for _, s := range as.scores {
n, err := makeNotification(tgc, s)
a, err := makeAlert(tgc, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ns = append(ns, n)
alerts = append(alerts, a)
}
err = as.sendNotification(ctx, ns)
err = as.sendNotification(ctx, alerts)
if err != nil {
log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -211,24 +213,32 @@ func (as *alerter) notify(ctx context.Context) error {
return err
}
var notifications []notification
db := database.FromCtx(ctx)
var notifications []Alert
for _, s := range as.scores {
tgr, has := tgc.TG(s.ID)
if has {
if !tgr.Notify {
if !tgr.Alert {
continue
}
s.Score *= float64(tgr.Weight)
}
if s.Score > as.cfg.AlertThreshold {
if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify {
as.notifyCache[s.ID] = time.Now()
n, err := makeNotification(tgc, s)
if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify {
a, err := makeAlert(tgc, s)
if err != nil {
return err
}
notifications = append(notifications, n)
as.alertCache[s.ID] = a
err = db.AddAlert(ctx, a.ToAddAlertParams())
if err != nil {
return err
}
notifications = append(notifications, a)
}
}
}
@ -240,13 +250,27 @@ func (as *alerter) notify(ctx context.Context) error {
return nil
}
type notification struct {
type Alert struct {
ID uuid.UUID
Timestamp time.Time
TGName string
Score trending.Score[cl.Talkgroup]
Weight float32
}
func (a *Alert) ToAddAlertParams() database.AddAlertParams {
f32score := float32(a.Score.Score)
return database.AddAlertParams{
ID: a.ID,
Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true},
PackedTg: a.Score.ID.Pack(),
Weight: &a.Weight,
Score: &f32score,
}
}
// sendNotification renders and sends the notification.
func (as *alerter) sendNotification(ctx context.Context, n []notification) error {
func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
msgBuffer := new(bytes.Buffer)
err := notificationTemplate.Execute(msgBuffer, n)
@ -259,16 +283,20 @@ func (as *alerter) sendNotification(ctx context.Context, n []notification) error
return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String())
}
// makeNotification creates a notification for later rendering by the template.
// makeAlert creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func makeNotification(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (notification, error) {
d := notification{
func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (Alert, error) {
d := Alert{
ID: uuid.New(),
Score: score,
Timestamp: time.Now(),
Weight: 1.0,
}
tgRecord, has := tgs.TG(score.ID)
switch has {
case true:
d.Weight = tgRecord.Weight
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(score.ID.System))
}
@ -301,9 +329,9 @@ func (as *alerter) cleanCache() {
as.Lock()
defer as.Unlock()
for k, t := range as.notifyCache {
if now.Sub(t) > as.renotify {
delete(as.notifyCache, k)
for k, a := range as.alertCache {
if now.Sub(a.Timestamp) > as.renotify {
delete(as.alertCache, k)
}
}
}

View file

@ -12,6 +12,40 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const addAlert = `-- name: AddAlert :exec
INSERT INTO alerts (id, time, talkgroup, weight, score, metadata)
VALUES
(
$1,
$2,
$3,
$4,
$5,
$6
)
`
type AddAlertParams struct {
ID uuid.UUID `json:"id"`
Time pgtype.Timestamptz `json:"time"`
PackedTg int64 `json:"packed_tg"`
Weight *float32 `json:"weight"`
Score *float32 `json:"score"`
Metadata []byte `json:"metadata"`
}
func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error {
_, err := q.db.Exec(ctx, addAlert,
arg.ID,
arg.Time,
arg.PackedTg,
arg.Weight,
arg.Score,
arg.Metadata,
)
return err
}
const addCall = `-- name: AddCall :one
INSERT INTO calls (
id,

View file

@ -12,7 +12,7 @@ import (
)
type Alert struct {
ID int32 `json:"id"`
ID uuid.UUID `json:"id"`
Time pgtype.Timestamptz `json:"time"`
Talkgroup int64 `json:"talkgroup"`
SystemID int32 `json:"system_id"`
@ -89,7 +89,8 @@ type Talkgroup struct {
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Notify bool `json:"notify"`
Alert bool `json:"alert"`
AlertConfig []byte `json:"alert_config"`
Weight float32 `json:"weight"`
}

View file

@ -12,6 +12,7 @@ import (
)
type Querier interface {
AddAlert(ctx context.Context, arg AddAlertParams) error
AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, error)
BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []string) error
CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error)

View file

@ -31,7 +31,7 @@ func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, erro
}
const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
WHERE id = systg2id($1, $2)
`
@ -48,7 +48,8 @@ func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Tal
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Alert,
&i.AlertConfig,
&i.Weight,
)
return i, err
@ -102,7 +103,7 @@ const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
tg.alert, tg.weight, tg.alert_config,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -111,7 +112,8 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -129,8 +131,9 @@ type GetTalkgroupWithLearnedRow struct {
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Notify bool `json:"notify"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
}
@ -148,8 +151,9 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Notify,
&i.Alert,
&i.Weight,
&i.AlertConfig,
&i.Learned,
)
return i, err
@ -159,7 +163,7 @@ const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPa
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
tg.alert, tg.weight, tg.alert_config,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -168,7 +172,8 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -186,8 +191,9 @@ type GetTalkgroupWithLearnedByPackedIDsRow struct {
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Notify bool `json:"notify"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
}
@ -211,8 +217,9 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Notify,
&i.Alert,
&i.Weight,
&i.AlertConfig,
&i.Learned,
); err != nil {
return nil, err
@ -226,7 +233,7 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
}
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight, sys.id, sys.name FROM talkgroups tg
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
`
@ -241,7 +248,8 @@ type GetTalkgroupsByPackedIDsRow struct {
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Notify bool `json:"notify"`
Alert bool `json:"alert"`
AlertConfig []byte `json:"alert_config"`
Weight float32 `json:"weight"`
ID_2 int `json:"id_2"`
Name_2 string `json:"name_2"`
@ -266,7 +274,8 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.ID_2,
&i.Name_2,
@ -282,7 +291,7 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
}
const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
WHERE tags && ARRAY[$1]
`
@ -305,7 +314,8 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Alert,
&i.AlertConfig,
&i.Weight,
); err != nil {
return nil, err
@ -319,7 +329,7 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
}
const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
WHERE tags @> ARRAY[$1]
`
@ -342,7 +352,8 @@ func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) (
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Alert,
&i.AlertConfig,
&i.Weight,
); err != nil {
return nil, err

View file

@ -54,7 +54,8 @@ CREATE TABLE IF NOT EXISTS talkgroups(
frequency INTEGER,
metadata JSONB,
tags TEXT[] NOT NULL DEFAULT '{}',
notify BOOLEAN NOT NULL DEFAULT 'true',
alert BOOLEAN NOT NULL DEFAULT 'true',
alert_config JSONB,
weight REAL NOT NULL DEFAULT 1.0
);
@ -71,7 +72,7 @@ CREATE TABLE IF NOT EXISTS talkgroups_learned(
);
CREATE TABLE IF NOT EXISTS alerts(
id SERIAL PRIMARY KEY,
id UUID PRIMARY KEY,
time TIMESTAMPTZ NOT NULL,
talkgroup INT8 REFERENCES talkgroups(id) NOT NULL,
system_id INT4 REFERENCES systems(id) NOT NULL GENERATED ALWAYS AS (talkgroup >> 32) STORED,

View file

@ -23,5 +23,17 @@ RETURNING id;
-- name: SetCallTranscript :exec
UPDATE calls SET transcript = $2 WHERE id = $1;
-- name: AddAlert :exec
INSERT INTO alerts (id, time, talkgroup, weight, score, metadata)
VALUES
(
sqlc.arg(id),
sqlc.arg(time),
sqlc.arg(packed_tg),
sqlc.arg(weight),
sqlc.arg(score),
sqlc.arg(metadata)
);
-- name: GetDatabaseSize :one
SELECT pg_size_pretty(pg_database_size(current_database()));

View file

@ -37,7 +37,7 @@ WHERE tg.id = ANY($1::INT8[]);
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
tg.alert, tg.weight, tg.alert_config,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -46,7 +46,8 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -56,7 +57,7 @@ WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND igno
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.notify, tg.weight,
tg.alert, tg.weight, tg.alert_config,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
@ -65,7 +66,8 @@ UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, TRUE, 1.0,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id