This commit is contained in:
Daniel 2024-11-02 09:41:48 -04:00
parent 29acaf017a
commit 83b6b0e4f4
9 changed files with 110 additions and 64 deletions

View file

@ -1,6 +1,6 @@
VPKG=dynatron.me/x/stillbox/internal/version VPKG=dynatron.me/x/stillbox/internal/version
VER!=git describe --tags --always --dirty VER!=git describe --tags --always --dirty
BUILDDATE!=date '+%Y-%m-%e' BUILDDATE!=date '+%Y%m%d'
LDFLAGS=-ldflags="-X '${VPKG}.Version=${VER}' -X '${VPKG}.Built=${BUILDDATE}'" LDFLAGS=-ldflags="-X '${VPKG}.Version=${VER}' -X '${VPKG}.Built=${BUILDDATE}'"
all: checkcalls all: checkcalls

View file

@ -23,6 +23,15 @@ type coversOptions struct {
type CoversOption func(*coversOptions) type CoversOption func(*coversOptions)
// Must is for testing.
func Must(rt RuleTime, err error) RuleTime {
if err != nil {
panic(err)
}
return rt
}
// WithLocation makes Covers use the provided *time.Location // WithLocation makes Covers use the provided *time.Location
func WithLocation(loc *time.Location) CoversOption { func WithLocation(loc *time.Location) CoversOption {
return func(o *coversOptions) { return func(o *coversOptions) {
@ -53,11 +62,7 @@ func (rt *RuleTime) Covers(t time.Time, opts ...CoversOption) bool {
} }
} }
if t.After(start) && t.Before(end) { return t.After(start) && t.Before(end)
return true
}
return false
} }
// CoversNow returns whether the RuleTime covers this instant. // CoversNow returns whether the RuleTime covers this instant.

View file

@ -185,6 +185,18 @@ func TestCovers(t *testing.T) {
covers: true, covers: true,
opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))},
}, },
{
name: "normal",
timespec: "1:00+5h",
t: tM("17:07:00"),
covers: false,
},
{
name: "normal",
timespec: "1:00+5h",
t: tM("3:07:00"),
covers: true,
},
{ {
name: "24h duration", name: "24h duration",
timespec: "15:00+24h", timespec: "15:00+24h",

View file

@ -2,8 +2,11 @@ package calls
import ( import (
"context" "context"
"fmt"
"dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/pkg/gordio/database"
"github.com/rs/zerolog/log"
) )
type Talkgroup struct { type Talkgroup struct {
@ -27,6 +30,10 @@ func (t Talkgroup) Pack() int64 {
return int64((int64(t.System) << 32) | int64(t.Talkgroup)) return int64((int64(t.System) << 32) | int64(t.Talkgroup))
} }
func (t Talkgroup) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []Talkgroup) []int64 { func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg)) s := make([]int64, len(tg))
@ -39,6 +46,7 @@ func PackedTGs(tg []Talkgroup) []int64 {
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
type TalkgroupCache struct { type TalkgroupCache struct {
AlertConfig
tgs tgMap tgs tgMap
systems map[int32]string systems map[int32]string
} }
@ -47,6 +55,7 @@ func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache,
tgc := &TalkgroupCache{ tgc := &TalkgroupCache{
tgs: make(tgMap), tgs: make(tgMap),
systems: make(map[int32]string), systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
} }
return tgc, tgc.LoadTGs(ctx, packedTgs) return tgc, tgc.LoadTGs(ctx, packedTgs)
@ -59,8 +68,14 @@ func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error {
} }
for _, rec := range tgRecords { for _, rec := range tgRecords {
t.tgs[TG(rec.SystemID, rec.Tgid)] = rec tg := TG(rec.SystemID, rec.Tgid)
t.tgs[tg] = rec
t.systems[rec.SystemID] = rec.SystemName t.systems[rec.SystemID] = rec.SystemName
err := t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
} }
return nil return nil

View file

@ -107,7 +107,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter {
opt(as) opt(as)
} }
as.scorer = trending.NewScorer[cl.Talkgroup]( as.scorer = trending.NewScorer(
trending.WithTimeSeries(as.newTimeSeries), trending.WithTimeSeries(as.newTimeSeries),
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)),
@ -154,28 +154,64 @@ const notificationTemplStr = `{{ range . -}}
var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr))
func (as *alerter) eval(ctx context.Context, now time.Time, add bool) ([]Alert, error) {
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs())
if err != nil {
return nil, fmt.Errorf("new TG cache: %w", err)
}
db := database.FromCtx(ctx)
var notifications []Alert
for _, s := range as.scores {
tgr, has := tgc.TG(s.ID)
if has {
if !tgr.Alert {
continue
}
s.Score *= float64(tgr.Weight)
}
origScore := s.Score
s.Score = tgc.ScaleScore(s, now)
if s.Score > as.cfg.AlertThreshold {
if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify {
a, err := makeAlert(tgc, s, origScore)
if err != nil {
return nil, fmt.Errorf("makeAlert: %w", err)
}
as.alertCache[s.ID] = a
if add {
err = db.AddAlert(ctx, a.ToAddAlertParams())
if err != nil {
return nil, fmt.Errorf("addAlert: %w", err)
}
}
notifications = append(notifications, a)
}
}
}
return notifications, nil
}
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
as.RLock() as.RLock()
defer as.RUnlock() defer as.RUnlock()
alerts := make([]Alert, 0, len(as.scores)) alerts := make([]Alert, 0, len(as.scores))
ctx := r.Context() ctx := r.Context()
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) alerts, err := as.eval(ctx, time.Now(), false)
if err != nil { if err != nil {
log.Error().Err(err).Msg("test notificaiton tg cache") log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
for _, s := range as.scores {
a, err := makeAlert(tgc, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
alerts = append(alerts, a)
}
err = as.sendNotification(ctx, alerts) err = as.sendNotification(ctx, alerts)
if err != nil { if err != nil {
@ -203,46 +239,14 @@ func (as *alerter) notify(ctx context.Context) error {
return nil return nil
} }
now := time.Now()
as.Lock() as.Lock()
defer as.Unlock() defer as.Unlock()
tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) notifications, err := as.eval(ctx, time.Now(), true)
if err != nil { if err != nil {
return err return err
} }
db := database.FromCtx(ctx)
var notifications []Alert
for _, s := range as.scores {
tgr, has := tgc.TG(s.ID)
if has {
if !tgr.Alert {
continue
}
s.Score *= float64(tgr.Weight)
}
if s.Score > as.cfg.AlertThreshold {
if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify {
a, err := makeAlert(tgc, s)
if err != nil {
return err
}
as.alertCache[s.ID] = a
err = db.AddAlert(ctx, a.ToAddAlertParams())
if err != nil {
return err
}
notifications = append(notifications, a)
}
}
}
if len(notifications) > 0 { if len(notifications) > 0 {
return as.sendNotification(ctx, notifications) return as.sendNotification(ctx, notifications)
} }
@ -255,17 +259,20 @@ type Alert struct {
Timestamp time.Time Timestamp time.Time
TGName string TGName string
Score trending.Score[cl.Talkgroup] Score trending.Score[cl.Talkgroup]
OrigScore float64
Weight float32 Weight float32
} }
func (a *Alert) ToAddAlertParams() database.AddAlertParams { func (a *Alert) ToAddAlertParams() database.AddAlertParams {
f32score := float32(a.Score.Score) f32score := float32(a.Score.Score)
f32origscore := float32(a.OrigScore)
return database.AddAlertParams{ return database.AddAlertParams{
ID: a.ID, ID: a.ID,
Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true},
PackedTg: a.Score.ID.Pack(), PackedTg: a.Score.ID.Pack(),
Weight: &a.Weight, Weight: &a.Weight,
Score: &f32score, Score: &f32score,
OrigScore: &f32origscore,
} }
} }
@ -285,12 +292,13 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
// makeAlert creates a notification for later rendering by the template. // makeAlert creates a notification for later rendering by the template.
// It takes a talkgroup Score as input. // It takes a talkgroup Score as input.
func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (Alert, error) { func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) {
d := Alert{ d := Alert{
ID: uuid.New(), ID: uuid.New(),
Score: score, Score: score,
Timestamp: time.Now(), Timestamp: time.Now(),
Weight: 1.0, Weight: 1.0,
OrigScore: origScore,
} }
tgRecord, has := tgs.TG(score.ID) tgRecord, has := tgs.TG(score.ID)

View file

@ -13,7 +13,7 @@ import (
) )
const addAlert = `-- name: AddAlert :exec const addAlert = `-- name: AddAlert :exec
INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, metadata)
VALUES VALUES
( (
$1, $1,
@ -21,7 +21,8 @@ VALUES
$3, $3,
$4, $4,
$5, $5,
$6 $6,
$7
) )
` `
@ -31,6 +32,7 @@ type AddAlertParams struct {
PackedTg int64 `json:"packed_tg"` PackedTg int64 `json:"packed_tg"`
Weight *float32 `json:"weight"` Weight *float32 `json:"weight"`
Score *float32 `json:"score"` Score *float32 `json:"score"`
OrigScore *float32 `json:"orig_score"`
Metadata []byte `json:"metadata"` Metadata []byte `json:"metadata"`
} }
@ -41,6 +43,7 @@ func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error {
arg.PackedTg, arg.PackedTg,
arg.Weight, arg.Weight,
arg.Score, arg.Score,
arg.OrigScore,
arg.Metadata, arg.Metadata,
) )
return err return err

View file

@ -19,6 +19,7 @@ type Alert struct {
Tgid int32 `json:"tgid"` Tgid int32 `json:"tgid"`
Weight *float32 `json:"weight"` Weight *float32 `json:"weight"`
Score *float32 `json:"score"` Score *float32 `json:"score"`
OrigScore *float32 `json:"orig_score"`
Metadata []byte `json:"metadata"` Metadata []byte `json:"metadata"`
} }

View file

@ -79,6 +79,7 @@ CREATE TABLE IF NOT EXISTS alerts(
tgid INT4 NOT NULL GENERATED ALWAYS AS (talkgroup & x'ffffffff'::BIGINT) STORED, tgid INT4 NOT NULL GENERATED ALWAYS AS (talkgroup & x'ffffffff'::BIGINT) STORED,
weight REAL, weight REAL,
score REAL, score REAL,
orig_score REAL,
metadata JSONB metadata JSONB
); );

View file

@ -24,7 +24,7 @@ RETURNING id;
UPDATE calls SET transcript = $2 WHERE id = $1; UPDATE calls SET transcript = $2 WHERE id = $1;
-- name: AddAlert :exec -- name: AddAlert :exec
INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, metadata)
VALUES VALUES
( (
sqlc.arg(id), sqlc.arg(id),
@ -32,6 +32,7 @@ VALUES
sqlc.arg(packed_tg), sqlc.arg(packed_tg),
sqlc.arg(weight), sqlc.arg(weight),
sqlc.arg(score), sqlc.arg(score),
sqlc.arg(orig_score),
sqlc.arg(metadata) sqlc.arg(metadata)
); );