diff --git a/Makefile b/Makefile index f5a6861..12e3fb5 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ VPKG=dynatron.me/x/stillbox/internal/version VER!=git describe --tags --always --dirty -BUILDDATE!=date '+%Y-%m-%e' +BUILDDATE!=date '+%Y%m%d' LDFLAGS=-ldflags="-X '${VPKG}.Version=${VER}' -X '${VPKG}.Built=${BUILDDATE}'" all: checkcalls diff --git a/internal/ruletime/ruletime.go b/internal/ruletime/ruletime.go new file mode 100644 index 0000000..fb0b98b --- /dev/null +++ b/internal/ruletime/ruletime.go @@ -0,0 +1,160 @@ +package ruletime + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" +) + +// RuleTime specifies a start and end of a time period. +type RuleTime struct { + Hour uint8 + Minute uint8 + Second uint8 + + Duration time.Duration +} + +type coversOptions struct { + loc *time.Location +} + +type CoversOption func(*coversOptions) + +// Must is for testing. +func Must(rt RuleTime, err error) RuleTime { + if err != nil { + panic(err) + } + + return rt +} + +// WithLocation makes Covers use the provided *time.Location +func WithLocation(loc *time.Location) CoversOption { + return func(o *coversOptions) { + o.loc = loc + } +} + +// Covers returns whether the RuleTime covers the provided time.Time. +func (rt *RuleTime) Covers(t time.Time, opts ...CoversOption) bool { + o := coversOptions{} + for _, opt := range opts { + opt(&o) + } + + if o.loc == nil { + o.loc = time.Local + } + + start := time.Date(t.Year(), t.Month(), t.Day(), int(rt.Hour), int(rt.Minute), int(rt.Second), 0, o.loc) + end := start.Add(rt.Duration) + + // wrap things around the clock + if end.Day() > start.Day() || end.Month() > start.Month() || end.Year() > start.Year() { + start = start.Add(-24 * time.Hour) + end = start.Add(rt.Duration) + if t.Sub(start) > 24*time.Hour { + t = t.Add(-24 * time.Hour) + } + } + + return t.After(start) && t.Before(end) +} + +// CoversNow returns whether the RuleTime covers this instant. +func (rt *RuleTime) CoversNow(opts ...CoversOption) bool { + return rt.Covers(time.Now(), opts...) +} + +func (rt *RuleTime) MarshalJSON() ([]byte, error) { + return json.Marshal(rt.String()) +} + +func (rt *RuleTime) UnmarshalJSON(b []byte) error { + var s string + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + + return rt.Parse(s) +} + +func (rt *RuleTime) String() string { + return fmt.Sprintf("%d:%d:%d+%s", rt.Hour, rt.Minute, rt.Second, rt.Duration.String()) +} + +// ParseRuleTime emits a RuleTime from a string in the format of `[h]h:[m]m[:[s]s]+duration` +// such as: `08:09:00+1h2m`. Time must be in 24 hour format. +func (rt *RuleTime) Parse(s string) error { + perr := func(e string) error { + return fmt.Errorf("parse rule time '%s': %s", s, e) + } + + werr := func(w error) error { + return fmt.Errorf("parse rule time '%s': %w", s, w) + } + + timeDurationParts := strings.Split(s, "+") + if len(timeDurationParts) != 2 { + return perr("needs time and duration part") + } + + timeSpec := strings.Split(timeDurationParts[0], ":") + if len(timeSpec) != 2 && len(timeSpec) != 3 { + return perr("invalid time part") + } + + for i, v := range timeSpec { + nv, err := strconv.Atoi(v) + if err != nil { + return werr(err) + } + + switch i { + case 0: + if nv > 23 || nv < 0 { + return perr("invalid hours") + } + rt.Hour = uint8(nv) + case 1: + if nv > 59 || nv < 0 { + return perr("invalid minutes") + } + rt.Minute = uint8(nv) + case 2: + if nv > 59 || nv < 0 { + return perr("invalid seconds") + } + rt.Second = uint8(nv) + } + } + + dur, err := time.ParseDuration(timeDurationParts[1]) + if err != nil { + return werr(err) + } + + if dur < 0 { + return perr("duration must be positive") + } + + if dur > 24*time.Hour { + return perr("duration too long") + } + + rt.Duration = dur + + return nil +} + +// New creates a new RuleTime with the provided value. +func New(s string) (RuleTime, error) { + rt := RuleTime{} + + return rt, rt.Parse(s) +} diff --git a/internal/ruletime/ruletime_test.go b/internal/ruletime/ruletime_test.go new file mode 100644 index 0000000..bbaee07 --- /dev/null +++ b/internal/ruletime/ruletime_test.go @@ -0,0 +1,227 @@ +package ruletime_test + +import ( + "errors" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/ruletime" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse(t *testing.T) { + tests := []struct { + name string + ruleTime string + compare ruletime.RuleTime + expectErr error + }{ + { + name: "base case", + ruleTime: "23:45:01+2h1m", + compare: ruletime.RuleTime{ + 23, 45, 1, (2 * time.Hour) + time.Minute, + }, + }, + { + name: "no seconds", + ruleTime: "23:45+2h1m", + compare: ruletime.RuleTime{ + 23, 45, 0, (2 * time.Hour) + time.Minute, + }, + }, + { + name: "bad hour", + ruleTime: "29:45+2h1m", + expectErr: errors.New("invalid hours"), + }, + { + name: "bad minute", + ruleTime: "22:70+2h1m", + expectErr: errors.New("invalid minutes"), + }, + { + name: "bad minute 2", + ruleTime: "22:-70+2h1m", + expectErr: errors.New("invalid minutes"), + }, + { + name: "bad hour 2", + ruleTime: "-20:34+2h1m", + expectErr: errors.New("invalid hours"), + }, + { + name: "bad seconds", + ruleTime: "20:34:94+2h1m", + expectErr: errors.New("invalid seconds"), + }, + { + name: "negative duration", + ruleTime: "20:34:33+-2h1m", + expectErr: errors.New("duration must be positive"), + }, + { + name: "bad duration", + ruleTime: "20:34:04+2j", + expectErr: errors.New("in duration"), + }, + { + name: "duration too long", + ruleTime: "20:34:04+25h", + expectErr: errors.New("duration too long"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + rt, err := ruletime.New(tc.ruleTime) + if tc.expectErr != nil { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectErr.Error()) + } else { + require.NoError(t, err) + assert.Equal(t, tc.compare, rt) + } + }) + } +} + +func TestCovers(t *testing.T) { + tM := func(s string) func(*time.Location) time.Time { + return func(loc *time.Location) time.Time { + tm, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-11-01 "+s, loc) + if err != nil { + panic(err) + } + + return tm + } + } + + tz := func(s string) *time.Location { + l, err := time.LoadLocation(s) + if err != nil { + panic(err) + } + + return l + } + + tests := []struct { + name string + timespec string + t func(*time.Location) time.Time + opts []ruletime.CoversOption + loc *time.Location + covers bool + }{ + { + name: "base", + timespec: "8:45+1h", + t: tM("9:00:00"), + covers: true, + }, + { + name: "base false", + timespec: "8:45+1h", + t: tM("9:47:00"), + covers: false, + }, + { + name: "wrong tz", + timespec: "8:45+1h", + t: tM("9:37:00"), + loc: tz("America/New_York"), + covers: false, + opts: []ruletime.CoversOption{ruletime.WithLocation(time.UTC)}, + }, + { + name: "wrong tz 2", + timespec: "8:45+1h", + t: tM("9:07:00"), + loc: tz("America/Chicago"), + covers: false, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/New_York"))}, + }, + { + name: "right tz", + timespec: "8:45+1h", + t: tM("9:17:00"), + loc: tz("America/New_York"), + covers: true, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/New_York"))}, + }, + { + name: "past midnight", + timespec: "23:45+1h", + t: tM("0:07:00"), + loc: tz("America/Chicago"), + covers: true, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, + }, + { + name: "not past midnight", + timespec: "15:00+10h", + t: tM("3:07:00"), + loc: tz("America/Chicago"), + covers: false, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, + }, + { + name: "not past midnight 2", + timespec: "15:00+10h", + t: tM("14:07:00"), + loc: tz("America/Chicago"), + covers: false, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, + }, + { + name: "not past midnight 3", + timespec: "15:00+10h", + t: tM("15:07:00"), + loc: tz("America/Chicago"), + covers: true, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, + }, + { + name: "normal", + timespec: "1:00+5h", + t: tM("17:07:00"), + covers: false, + }, + { + name: "normal", + timespec: "1:00+5h", + t: tM("3:07:00"), + covers: true, + }, + { + name: "24h duration", + timespec: "15:00+24h", + t: tM("3:07:00"), + loc: tz("America/Chicago"), + covers: true, + opts: []ruletime.CoversOption{ruletime.WithLocation(tz("America/Chicago"))}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + rt, err := ruletime.New(tc.timespec) + require.NoError(t, err) + loc := time.Local + if tc.loc != nil { + loc = tc.loc + } + + c := rt.Covers(tc.t(loc), tc.opts...) + if tc.covers { + assert.True(t, c) + } else { + assert.False(t, c) + } + }) + } +} diff --git a/pkg/calls/alertconfig.go b/pkg/calls/alertconfig.go new file mode 100644 index 0000000..71fde53 --- /dev/null +++ b/pkg/calls/alertconfig.go @@ -0,0 +1,58 @@ +package calls + +import ( + "encoding/json" + "time" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" +) + +type AlertConfig map[Talkgroup][]AlertRule + +type AlertRule struct { + Times []ruletime.RuleTime `json:"times"` + ScoreMultiplier float32 `json:"mult"` +} + +func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { + if len(confBytes) == 0 { + return nil + } + + var rules []AlertRule + err := json.Unmarshal(confBytes, &rules) + if err != nil { + return err + } + + ac[tg] = rules + return nil +} + +func (ac AlertConfig) ScaleScore(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 { + s, has := ac[score.ID] + if !has { + return score.Score + } + + final := score.Score + + for _, ar := range s { + if ar.MatchTime(t, coversOpts...) { + final *= float64(ar.ScoreMultiplier) + } + } + + return final +} + +func (ar *AlertRule) MatchTime(t time.Time, coversOpts ...ruletime.CoversOption) bool { + for _, at := range ar.Times { + if at.Covers(t, coversOpts...) { + return true + } + } + + return false +} diff --git a/pkg/calls/alertconfig_test.go b/pkg/calls/alertconfig_test.go new file mode 100644 index 0000000..c971fcb --- /dev/null +++ b/pkg/calls/alertconfig_test.go @@ -0,0 +1,141 @@ +package calls_test + +import ( + "errors" + "math" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" + "dynatron.me/x/stillbox/pkg/calls" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAlertConfig(t *testing.T) { + ac := make(calls.AlertConfig) + parseTests := []struct { + name string + tg calls.Talkgroup + conf string + compare []calls.AlertRule + expectErr error + }{ + { + name: "base case", + tg: calls.TG(197, 3), + conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`, + compare: []calls.AlertRule{ + { + Times: []ruletime.RuleTime{ + ruletime.Must(ruletime.New("7:00+2h")), + ruletime.Must(ruletime.New("1:00+1h")), + ruletime.Must(ruletime.New("16:00+1h")), + ruletime.Must(ruletime.New("19:00+4h")), + }, + ScoreMultiplier: 0.2, + }, + { + Times: []ruletime.RuleTime{ + ruletime.Must(ruletime.New("11:00+1h")), + ruletime.Must(ruletime.New("15:00+30m")), + ruletime.Must(ruletime.New("16:03+20m")), + }, + ScoreMultiplier: 2.0, + }, + }, + }, + { + name: "bad spec", + tg: calls.TG(197, 3), + conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`, + expectErr: errors.New("'26:00+2h': invalid hours"), + }, + } + + for _, tc := range parseTests { + t.Run(tc.name, func(t *testing.T) { + err := ac.AddAlertConfig(tc.tg, []byte(tc.conf)) + if tc.expectErr != nil { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectErr.Error()) + } else { + assert.Equal(t, tc.compare, ac[tc.tg]) + } + }) + } + + tMust := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02 15:04", "2024-11-02 "+s, time.Local) + if err != nil { + panic(err) + } + + return t + } + + evalTests := []struct { + name string + tg calls.Talkgroup + t time.Time + origScore float64 + expectScore float64 + }{ + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("1:20"), + origScore: 3, + expectScore: 0.6, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("23:03"), + origScore: 3, + expectScore: 3, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("8:03"), + origScore: 1.0, + expectScore: 0.2, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("15:15"), + origScore: 3.0, + expectScore: 6.0, + }, + { + name: "overlapping eval", + tg: calls.TG(197, 3), + t: tMust("16:10"), + origScore: 1.0, + expectScore: 0.4, + }, + } + + for _, tc := range evalTests { + t.Run(tc.name, func(t *testing.T) { + cs := trending.Score[calls.Talkgroup]{ + ID: tc.tg, + Score: tc.origScore, + } + assert.Equal(t, tc.expectScore, toFixed(ac.ScaleScore(cs, tc.t), 5)) + }) + } +} + +func round(num float64) int { + return int(num + math.Copysign(0.5, num)) +} + +func toFixed(num float64, precision int) float64 { + output := math.Pow(10, float64(precision)) + return float64(round(num*output)) / output +} diff --git a/pkg/calls/talkgroups.go b/pkg/calls/talkgroups.go index 28de88b..19a1702 100644 --- a/pkg/calls/talkgroups.go +++ b/pkg/calls/talkgroups.go @@ -2,8 +2,17 @@ package calls import ( "context" + "fmt" + "sync" + "time" "dynatron.me/x/stillbox/pkg/gordio/database" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" + + "github.com/jackc/pgx/v5" + "github.com/rs/zerolog/log" ) type Talkgroup struct { @@ -27,6 +36,10 @@ func (t Talkgroup) Pack() int64 { return int64((int64(t.System) << 32) | int64(t.Talkgroup)) } +func (t Talkgroup) String() string { + return fmt.Sprintf("%d:%d", t.System, t.Talkgroup) +} + func PackedTGs(tg []Talkgroup) []int64 { s := make([]int64, len(tg)) @@ -38,41 +51,141 @@ func PackedTGs(tg []Talkgroup) []int64 { } type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow -type TalkgroupCache struct { + +type TalkgroupCache interface { + TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) + SystemName(ctx context.Context, id int) (string, bool) + ScaleScore(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 + Hint(ctx context.Context, tgs []Talkgroup) error + Load(ctx context.Context, tgs []int64) error + Invalidate() +} + +func (t *talkgroupCache) Invalidate() { + t.Lock() + defer t.Unlock() + clear(t.tgs) + clear(t.systems) + clear(t.AlertConfig) +} + +type talkgroupCache struct { + sync.RWMutex + AlertConfig tgs tgMap systems map[int32]string } -func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache, error) { - tgc := &TalkgroupCache{ - tgs: make(tgMap), - systems: make(map[int32]string), +func NewTalkgroupCache() TalkgroupCache { + tgc := &talkgroupCache{ + tgs: make(tgMap), + systems: make(map[int32]string), + AlertConfig: make(AlertConfig), } - return tgc, tgc.LoadTGs(ctx, packedTgs) + return tgc } -func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error { - tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, packedTgs) +func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error { + t.RLock() + var toLoad []int64 + if len(t.tgs) > len(tgs)/2 { // TODO: instrument this + for _, tg := range tgs { + _, ok := t.tgs[tg] + if !ok { + toLoad = append(toLoad, tg.Pack()) + } + } + + } else { + toLoad = make([]int64, 0, len(tgs)) + for _, g := range tgs { + toLoad = append(toLoad, g.Pack()) + } + } + + if len(toLoad) > 0 { + t.RUnlock() + return t.Load(ctx, toLoad) + } + + t.RUnlock() + return nil +} + +func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { + tg := TG(rec.SystemID, rec.Tgid) + t.tgs[tg] = rec + t.systems[rec.SystemID] = rec.SystemName + + return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) +} + +func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error { + tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { return err } + t.Lock() + defer t.Unlock() + for _, rec := range tgRecords { - t.tgs[TG(rec.SystemID, rec.Tgid)] = rec - t.systems[rec.SystemID] = rec.SystemName + err := t.add(rec) + + if err != nil { + log.Error().Err(err).Msg("add alert config fail") + } } return nil } -func (t *TalkgroupCache) TG(tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { +func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { + t.RLock() rec, has := t.tgs[tg] + t.RUnlock() - return rec, has + if has { + return rec, has + } + + recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) + switch err { + case nil: + case pgx.ErrNoRows: + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + default: + log.Error().Err(err).Msg("TG() cache add db get") + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + } + + if len(recs) < 1 { + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + } + + t.Lock() + defer t.Unlock() + err = t.add(recs[0]) + if err != nil { + log.Error().Err(err).Msg("TG() cache add") + return recs[0], false + } + + return recs[0], true } -func (t *TalkgroupCache) SystemName(id int) (string, bool) { +func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) { n, has := t.systems[int32(id)] + + if !has { + sys, err := database.FromCtx(ctx).GetSystemName(ctx, id) + if err != nil { + return "", false + } + + return sys, true + } + return n, has } diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index 181e014..89f3bb9 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -53,6 +53,7 @@ type alerter struct { alertCache map[cl.Talkgroup]Alert renotify time.Duration notifier notify.Notifier + tgCache cl.TalkgroupCache } type offsetClock time.Duration @@ -87,7 +88,7 @@ func WithNotifier(n notify.Notifier) AlertOption { } // New creates a new Alerter using the provided configuration. -func New(cfg config.Alerting, opts ...AlertOption) Alerter { +func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter { if !cfg.Enable { return &noopAlerter{} } @@ -97,6 +98,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { alertCache: make(map[cl.Talkgroup]Alert), clock: timeseries.DefaultClock, renotify: DefaultRenotify, + tgCache: tgCache, } if cfg.Renotify != nil { @@ -107,7 +109,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { opt(as) } - as.scorer = trending.NewScorer[cl.Talkgroup]( + as.scorer = trending.NewScorer( trending.WithTimeSeries(as.newTimeSeries), trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), @@ -154,29 +156,70 @@ const notificationTemplStr = `{{ range . -}} var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) +func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Alert, error) { + err := as.tgCache.Hint(ctx, as.scoredTGs()) + if err != nil { + return nil, fmt.Errorf("prime TG cache: %w", err) + } + + db := database.FromCtx(ctx) + + var notifications []Alert + for _, s := range as.scores { + origScore := s.Score + tgr, has := as.tgCache.TG(ctx, s.ID) + if has { + if !tgr.Alert { + continue + } + s.Score *= float64(tgr.Weight) + } + + if s.Score > as.cfg.AlertThreshold || testMode { + if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { + s.Score = as.tgCache.ScaleScore(s, now) + a, err := as.makeAlert(ctx, s, origScore) + if err != nil { + return nil, fmt.Errorf("makeAlert: %w", err) + } + + if s.Score < as.cfg.AlertThreshold { + a.Suppressed = true + } + + as.alertCache[s.ID] = a + + if !testMode { + err = db.AddAlert(ctx, a.ToAddAlertParams()) + if err != nil { + return nil, fmt.Errorf("addAlert: %w", err) + } + } + + if !a.Suppressed { + notifications = append(notifications, a) + } + } + } + } + + return notifications, nil + +} + func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { as.RLock() defer as.RUnlock() alerts := make([]Alert, 0, len(as.scores)) ctx := r.Context() - tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + alerts, err := as.eval(ctx, time.Now(), true) if err != nil { - log.Error().Err(err).Msg("test notificaiton tg cache") + log.Error().Err(err).Msg("test notification eval") http.Error(w, err.Error(), http.StatusInternalServerError) return } - for _, s := range as.scores { - a, err := makeAlert(tgc, s) - if err != nil { - log.Error().Err(err).Msg("test notificaiton") - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - alerts = append(alerts, a) - } - err = as.sendNotification(ctx, alerts) if err != nil { log.Error().Err(err).Msg("test notification send") @@ -187,14 +230,24 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("Sent")) } -// packedScoredTGs gets a packed list of TG IDs for DB use. -func (as *alerter) packedScoredTGs() []int64 { - packedTGs := make([]int64, 0, len(as.scores)) +// scoredTGs gets a list of TGs. +func (as *alerter) scoredTGs() []cl.Talkgroup { + tgs := make([]cl.Talkgroup, 0, len(as.scores)) for _, s := range as.scores { - packedTGs = append(packedTGs, s.ID.Pack()) + tgs = append(tgs, s.ID) } - return packedTGs + return tgs +} + +// packedScoredTGs gets a list of packed TGIDs. +func (as *alerter) packedScoredTGs() []int64 { + tgs := make([]int64, 0, len(as.scores)) + for _, s := range as.scores { + tgs = append(tgs, s.ID.Pack()) + } + + return tgs } // notify iterates the scores and sends out any necessary notifications @@ -203,46 +256,14 @@ func (as *alerter) notify(ctx context.Context) error { return nil } - now := time.Now() - as.Lock() defer as.Unlock() - tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) + notifications, err := as.eval(ctx, time.Now(), false) if err != nil { return err } - db := database.FromCtx(ctx) - - var notifications []Alert - for _, s := range as.scores { - tgr, has := tgc.TG(s.ID) - if has { - if !tgr.Alert { - continue - } - s.Score *= float64(tgr.Weight) - } - if s.Score > as.cfg.AlertThreshold { - if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { - a, err := makeAlert(tgc, s) - if err != nil { - return err - } - - as.alertCache[s.ID] = a - - err = db.AddAlert(ctx, a.ToAddAlertParams()) - if err != nil { - return err - } - - notifications = append(notifications, a) - } - } - } - if len(notifications) > 0 { return as.sendNotification(ctx, notifications) } @@ -251,21 +272,32 @@ func (as *alerter) notify(ctx context.Context) error { } type Alert struct { - ID uuid.UUID - Timestamp time.Time - TGName string - Score trending.Score[cl.Talkgroup] - Weight float32 + ID uuid.UUID + Timestamp time.Time + TGName string + Score trending.Score[cl.Talkgroup] + OrigScore float64 + Weight float32 + Suppressed bool } func (a *Alert) ToAddAlertParams() database.AddAlertParams { f32score := float32(a.Score.Score) + f32origscore := float32(a.OrigScore) + + var origScore *float32 + if a.Score.Score != a.OrigScore { + origScore = &f32origscore + } + return database.AddAlertParams{ - ID: a.ID, - Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, - PackedTg: a.Score.ID.Pack(), - Weight: &a.Weight, - Score: &f32score, + ID: a.ID, + Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, + PackedTg: a.Score.ID.Pack(), + Weight: &a.Weight, + Score: &f32score, + OrigScore: origScore, + Notified: !a.Suppressed, } } @@ -285,15 +317,16 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { // makeAlert creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (Alert, error) { +func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { d := Alert{ ID: uuid.New(), Score: score, Timestamp: time.Now(), Weight: 1.0, + OrigScore: origScore, } - tgRecord, has := tgs.TG(score.ID) + tgRecord, has := as.tgCache.TG(ctx, score.ID) switch has { case true: d.Weight = tgRecord.Weight @@ -307,7 +340,7 @@ func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup]) (Aler d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) } case false: - system, has := tgs.SystemName(int(score.ID.System)) + system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) if has { d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) } else { diff --git a/pkg/gordio/alerting/simulate.go b/pkg/gordio/alerting/simulate.go index 8f1f4a9..6863615 100644 --- a/pkg/gordio/alerting/simulate.go +++ b/pkg/gordio/alerting/simulate.go @@ -58,10 +58,12 @@ func (s *Simulation) stepClock(t time.Time) { } // Simulate begins the simulation using the DB handle from ctx. It returns final scores. -func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] { +func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) { now := time.Now() + tgc := cl.NewTalkgroupCache() + s.Enable = true - s.alerter = New(s.Alerting, WithClock(&s.clock)).(*alerter) + s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter) if time.Time(s.ScoreEnd).IsZero() { s.ScoreEnd = jsontime.Time(now) } @@ -79,7 +81,7 @@ func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] // backfill from lookback start until score start _, err := s.backfill(ctx, sinceLookback, time.Time(s.ScoreStart)) if err != nil { - log.Error().Err(err).Msg("simulate backfill") + return nil, fmt.Errorf("simulate backfill: %w", err) } // initial score @@ -99,13 +101,13 @@ func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] // backfill from scorestart until now. sim is enabled, so scoring will be done by stepClock() _, err = s.backfill(ctx, time.Time(s.ScoreStart), scoreEnd) if err != nil { - log.Error().Err(err).Msg("simulate backfill final") + return nil, fmt.Errorf("simulate backfill final: %w", err) } s.lastScore = scoreEnd sort.Sort(s.scores) - return s.scores + return s.scores, nil } // simulateHandler is the POST endpoint handler. @@ -136,6 +138,11 @@ func (as *alerter) simulateHandler(w http.ResponseWriter, r *http.Request) { return } - s.Simulate(ctx) + _, err = s.Simulate(ctx) + if err != nil { + err = fmt.Errorf("simulate: %w", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } s.tgStatsHandler(w, r) } diff --git a/pkg/gordio/database/calls.sql.go b/pkg/gordio/database/calls.sql.go index 3a40a81..73f3f71 100644 --- a/pkg/gordio/database/calls.sql.go +++ b/pkg/gordio/database/calls.sql.go @@ -13,7 +13,7 @@ import ( ) const addAlert = `-- name: AddAlert :exec -INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) +INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, notified, metadata) VALUES ( $1, @@ -21,17 +21,21 @@ VALUES $3, $4, $5, - $6 + $6, + $7, + $8 ) ` type AddAlertParams struct { - ID uuid.UUID `json:"id"` - Time pgtype.Timestamptz `json:"time"` - PackedTg int64 `json:"packed_tg"` - Weight *float32 `json:"weight"` - Score *float32 `json:"score"` - Metadata []byte `json:"metadata"` + ID uuid.UUID `json:"id"` + Time pgtype.Timestamptz `json:"time"` + PackedTg int64 `json:"packed_tg"` + Weight *float32 `json:"weight"` + Score *float32 `json:"score"` + OrigScore *float32 `json:"orig_score"` + Notified bool `json:"notified"` + Metadata []byte `json:"metadata"` } func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error { @@ -41,6 +45,8 @@ func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error { arg.PackedTg, arg.Weight, arg.Score, + arg.OrigScore, + arg.Notified, arg.Metadata, ) return err diff --git a/pkg/gordio/database/models.go b/pkg/gordio/database/models.go index b035696..7a2b67d 100644 --- a/pkg/gordio/database/models.go +++ b/pkg/gordio/database/models.go @@ -19,6 +19,8 @@ type Alert struct { Tgid int32 `json:"tgid"` Weight *float32 `json:"weight"` Score *float32 `json:"score"` + OrigScore *float32 `json:"orig_score"` + Notified bool `json:"notified"` Metadata []byte `json:"metadata"` } diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 4f54909..92efbcc 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -6,6 +6,7 @@ import ( "os" "time" + "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/alerting" "dynatron.me/x/stillbox/pkg/gordio/auth" "dynatron.me/x/stillbox/pkg/gordio/config" @@ -34,6 +35,7 @@ type Server struct { alerter alerting.Alerter notifier notify.Notifier hup chan os.Signal + tgCache calls.TalkgroupCache } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -56,6 +58,8 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { return nil, err } + tgCache := calls.NewTalkgroupCache() + srv := &Server{ auth: authenticator, conf: cfg, @@ -63,8 +67,9 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { r: r, nex: nexus.New(), logger: logger, - alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)), + alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, + tgCache: tgCache, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index 94c62f6..b9159e6 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -79,6 +79,8 @@ CREATE TABLE IF NOT EXISTS alerts( tgid INT4 NOT NULL GENERATED ALWAYS AS (talkgroup & x'ffffffff'::BIGINT) STORED, weight REAL, score REAL, + orig_score REAL, + notified BOOLEAN NOT NULL DEFAULT 'false', metadata JSONB ); diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index 75998d0..9f4ff97 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -24,7 +24,7 @@ RETURNING id; UPDATE calls SET transcript = $2 WHERE id = $1; -- name: AddAlert :exec -INSERT INTO alerts (id, time, talkgroup, weight, score, metadata) +INSERT INTO alerts (id, time, talkgroup, weight, score, orig_score, notified, metadata) VALUES ( sqlc.arg(id), @@ -32,6 +32,8 @@ VALUES sqlc.arg(packed_tg), sqlc.arg(weight), sqlc.arg(score), + sqlc.arg(orig_score), + sqlc.arg(notified), sqlc.arg(metadata) );