diff --git a/Makefile b/Makefile index f5a6861..12e3fb5 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ VPKG=dynatron.me/x/stillbox/internal/version VER!=git describe --tags --always --dirty -BUILDDATE!=date '+%Y-%m-%e' +BUILDDATE!=date '+%Y%m%d' LDFLAGS=-ldflags="-X '${VPKG}.Version=${VER}' -X '${VPKG}.Built=${BUILDDATE}'" all: checkcalls diff --git a/internal/ruletime/ruletime.go b/internal/ruletime/ruletime.go index a6c932b..fb0b98b 100644 --- a/internal/ruletime/ruletime.go +++ b/internal/ruletime/ruletime.go @@ -23,6 +23,15 @@ type coversOptions struct { type CoversOption func(*coversOptions) +// Must is for testing. +func Must(rt RuleTime, err error) RuleTime { + if err != nil { + panic(err) + } + + return rt +} + // WithLocation makes Covers use the provided *time.Location func WithLocation(loc *time.Location) CoversOption { return func(o *coversOptions) { @@ -53,11 +62,7 @@ func (rt *RuleTime) Covers(t time.Time, opts ...CoversOption) bool { } } - if t.After(start) && t.Before(end) { - return true - } - - return false + return t.After(start) && t.Before(end) } // CoversNow returns whether the RuleTime covers this instant. diff --git a/internal/ruletime/ruletime_test.go b/internal/ruletime/ruletime_test.go index 3f07866..bbaee07 100644 --- a/internal/ruletime/ruletime_test.go +++ b/internal/ruletime/ruletime_test.go @@ -185,6 +185,18 @@ func TestCovers(t *testing.T) { covers: true, opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, }, + { + name: "normal", + timespec: "1:00+5h", + t: tM("17:07:00"), + covers: false, + }, + { + name: "normal", + timespec: "1:00+5h", + t: tM("3:07:00"), + covers: true, + }, { name: "24h duration", timespec: "15:00+24h", diff --git a/pkg/calls/talkgroups.go b/pkg/calls/talkgroups.go index 28de88b..193280a 100644 --- a/pkg/calls/talkgroups.go +++ b/pkg/calls/talkgroups.go @@ -2,8 +2,11 @@ package calls import ( "context" + "fmt" "dynatron.me/x/stillbox/pkg/gordio/database" + + "github.com/rs/zerolog/log" ) type Talkgroup struct { @@ -27,6 +30,10 @@ func (t Talkgroup) Pack() int64 { return int64((int64(t.System) << 32) | int64(t.Talkgroup)) } +func (t Talkgroup) String() string { + return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) +} + func PackedTGs(tg []Talkgroup) []int64 { s := make([]int64, len(tg)) @@ -39,14 +46,16 @@ func PackedTGs(tg []Talkgroup) []int64 { type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow type TalkgroupCache struct { + AlertConfig tgs tgMap systems map[int32]string } func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache, error) { tgc := &TalkgroupCache{ - tgs: make(tgMap), - systems: make(map[int32]string), + tgs: make(tgMap), + systems: make(map[int32]string), + AlertConfig: make(AlertConfig), } return tgc, tgc.LoadTGs(ctx, packedTgs) @@ -59,8 +68,14 @@ func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error { } for _, rec := range tgRecords { - t.tgs[TG(rec.SystemID, rec.Tgid)] = rec + tg := TG(rec.SystemID, rec.Tgid) + t.tgs[tg] = rec t.systems[rec.SystemID] = rec.SystemName + + err := t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) + if err != nil { + log.Error().Err(err).Msg("add alert config fail") + } } return nil diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index 181e014..095d560 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -107,7 +107,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { opt(as) } - as.scorer = trending.NewScorer[cl.Talkgroup]( + as.scorer = trending.NewScorer( trending.WithTimeSeries(as.newTimeSeries), trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), @@ -154,28 +154,64 @@ const notificationTemplStr = `{{ range . -}} var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) +func (as *alerter) eval(ctx context.Context, now time.Time, add bool) ([]Alert, error) { + tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + if err != nil { + return nil, fmt.Errorf("new TG cache: %w", err) + } + + db := database.FromCtx(ctx) + + var notifications []Alert + for _, s := range as.scores { + tgr, has := tgc.TG(s.ID) + if has { + if !tgr.Alert { + continue + } + s.Score *= float64(tgr.Weight) + } + origScore := s.Score + s.Score = tgc.ScaleScore(s, now) + + if s.Score > as.cfg.AlertThreshold { + if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { + a, err := makeAlert(tgc, s, origScore) + if err != nil { + return nil, fmt.Errorf("makeAlert: %w", err) + } + + as.alertCache[s.ID] = a + + if add { + err = db.AddAlert(ctx, a.ToAddAlertParams()) + if err != nil { + return nil, fmt.Errorf("addAlert: %w", err) + } + } + + notifications = append(notifications, a) + } + } + } + + return notifications, nil + +} + func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { as.RLock() defer as.RUnlock() alerts := make([]Alert, 0, len(as.scores)) ctx := r.Context() - tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + alerts, err := as.eval(ctx, time.Now(), false) if err != nil { - log.Error().Err(err).Msg("test notificaiton tg cache") + log.Error().Err(err).Msg("test notification send") http.Error(w, err.Error(), http.StatusInternalServerError) return } - for _, s := range as.scores { - a, err := makeAlert(tgc, s) - if err != nil { - log.Error().Err(err).Msg("test notificaiton") - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - alerts = append(alerts, a) - } err = as.sendNotification(ctx, alerts) if err != nil { @@ -203,46 +239,14 @@ func (as *alerter) notify(ctx context.Context) error { return nil } - now := time.Now() - as.Lock() defer as.Unlock() - tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + notifications, err := as.eval(ctx, time.Now(), true) if err != nil { return err } - db := database.FromCtx(ctx) - - var notifications []Alert - for _, s := range as.scores { - tgr, has := tgc.TG(s.ID) - if has { - if !tgr.Alert { - continue - } - s.Score *= float64(tgr.Weight) - } - if s.Score > as.cfg.AlertThreshold { - if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { - a, err := makeAlert(tgc, s) - if err != nil { - return err - } - - as.alertCache[s.ID] = a - - err = db.AddAlert(ctx, a.ToAddAlertParams()) - if err != nil { - return err - } - - notifications = append(notifications, a) - } - } - } - if len(notifications) > 0 { return as.sendNotification(ctx, notifications) } @@ -255,17 +259,20 @@ type Alert struct { Timestamp time.Time TGName string Score trending.Score[cl.Talkgroup] + OrigScore float64 Weight float32 } func (a *Alert) ToAddAlertParams() database.AddAlertParams { f32score := float32(a.Score.Score) + f32origscore := float32(a.OrigScore) return database.AddAlertParams{ ID: a.ID, Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, PackedTg: a.Score.ID.Pack(), Weight: &a.Weight, Score: &f32score, + OrigScore: &f32origscore, } } @@ -285,12 +292,13 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { // makeAlert creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (Alert, error) { +func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { d := Alert{ ID: uuid.New(), Score: score, Timestamp: time.Now(), Weight: 1.0, + OrigScore: origScore, } tgRecord, has := tgs.TG(score.ID) diff --git a/pkg/gordio/database/calls.sql.go b/pkg/gordio/database/calls.sql.go index 3a40a81..6ed1ba5 100644 --- a/pkg/gordio/database/calls.sql.go +++ b/pkg/gordio/database/calls.sql.go @@ -13,7 +13,7 @@ import ( ) const addAlert = `-- name: AddAlert :exec -INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) +INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, metadata) VALUES ( $1, @@ -21,17 +21,19 @@ VALUES $3, $4, $5, - $6 + $6, + $7 ) ` type AddAlertParams struct { - ID uuid.UUID `json:"id"` - Time pgtype.Timestamptz `json:"time"` - PackedTg int64 `json:"packed_tg"` - Weight *float32 `json:"weight"` - Score *float32 `json:"score"` - Metadata []byte `json:"metadata"` + ID uuid.UUID `json:"id"` + Time pgtype.Timestamptz `json:"time"` + PackedTg int64 `json:"packed_tg"` + Weight *float32 `json:"weight"` + Score *float32 `json:"score"` + OrigScore *float32 `json:"orig_score"` + Metadata []byte `json:"metadata"` } func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error { @@ -41,6 +43,7 @@ func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error { arg.PackedTg, arg.Weight, arg.Score, + arg.OrigScore, arg.Metadata, ) return err diff --git a/pkg/gordio/database/models.go b/pkg/gordio/database/models.go index b035696..c9995b2 100644 --- a/pkg/gordio/database/models.go +++ b/pkg/gordio/database/models.go @@ -19,6 +19,7 @@ type Alert struct { Tgid int32 `json:"tgid"` Weight *float32 `json:"weight"` Score *float32 `json:"score"` + OrigScore *float32 `json:"orig_score"` Metadata []byte `json:"metadata"` } diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index 94c62f6..b9145b8 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -79,6 +79,7 @@ CREATE TABLE IF NOT EXISTS alerts( tgid INT4 NOT NULL GENERATED ALWAYS AS (talkgroup & x'ffffffff'::BIGINT) STORED, weight REAL, score REAL, + orig_score REAL, metadata JSONB ); diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index 75998d0..b4e7c19 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -24,7 +24,7 @@ RETURNING id; UPDATE calls SET transcript = $2 WHERE id = $1; -- name: AddAlert :exec -INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) +INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, metadata) VALUES ( sqlc.arg(id), @@ -32,6 +32,7 @@ VALUES sqlc.arg(packed_tg), sqlc.arg(weight), sqlc.arg(score), + sqlc.arg(orig_score), sqlc.arg(metadata) );