diff --git a/pkg/calls/alertconfig.go b/pkg/calls/alertconfig.go new file mode 100644 index 0000000..71fde53 --- /dev/null +++ b/pkg/calls/alertconfig.go @@ -0,0 +1,58 @@ +package calls + +import ( + "encoding/json" + "time" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" +) + +type AlertConfig map[Talkgroup][]AlertRule + +type AlertRule struct { + Times []ruletime.RuleTime `json:"times"` + ScoreMultiplier float32 `json:"mult"` +} + +func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { + if len(confBytes) == 0 { + return nil + } + + var rules []AlertRule + err := json.Unmarshal(confBytes, &rules) + if err != nil { + return err + } + + ac[tg] = rules + return nil +} + +func (ac AlertConfig) ScaleScore(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 { + s, has := ac[score.ID] + if !has { + return score.Score + } + + final := score.Score + + for _, ar := range s { + if ar.MatchTime(t, coversOpts...) { + final *= float64(ar.ScoreMultiplier) + } + } + + return final +} + +func (ar *AlertRule) MatchTime(t time.Time, coversOpts ...ruletime.CoversOption) bool { + for _, at := range ar.Times { + if at.Covers(t, coversOpts...) { + return true + } + } + + return false +} diff --git a/pkg/calls/alertconfig_test.go b/pkg/calls/alertconfig_test.go new file mode 100644 index 0000000..c971fcb --- /dev/null +++ b/pkg/calls/alertconfig_test.go @@ -0,0 +1,141 @@ +package calls_test + +import ( + "errors" + "math" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" + "dynatron.me/x/stillbox/pkg/calls" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAlertConfig(t *testing.T) { + ac := make(calls.AlertConfig) + parseTests := []struct { + name string + tg calls.Talkgroup + conf string + compare []calls.AlertRule + expectErr error + }{ + { + name: "base case", + tg: calls.TG(197, 3), + conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`, + compare: []calls.AlertRule{ + { + Times: []ruletime.RuleTime{ + ruletime.Must(ruletime.New("7:00+2h")), + ruletime.Must(ruletime.New("1:00+1h")), + ruletime.Must(ruletime.New("16:00+1h")), + ruletime.Must(ruletime.New("19:00+4h")), + }, + ScoreMultiplier: 0.2, + }, + { + Times: []ruletime.RuleTime{ + ruletime.Must(ruletime.New("11:00+1h")), + ruletime.Must(ruletime.New("15:00+30m")), + ruletime.Must(ruletime.New("16:03+20m")), + }, + ScoreMultiplier: 2.0, + }, + }, + }, + { + name: "bad spec", + tg: calls.TG(197, 3), + conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`, + expectErr: errors.New("'26:00+2h': invalid hours"), + }, + } + + for _, tc := range parseTests { + t.Run(tc.name, func(t *testing.T) { + err := ac.AddAlertConfig(tc.tg, []byte(tc.conf)) + if tc.expectErr != nil { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectErr.Error()) + } else { + assert.Equal(t, tc.compare, ac[tc.tg]) + } + }) + } + + tMust := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02 15:04", "2024-11-02 "+s, time.Local) + if err != nil { + panic(err) + } + + return t + } + + evalTests := []struct { + name string + tg calls.Talkgroup + t time.Time + origScore float64 + expectScore float64 + }{ + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("1:20"), + origScore: 3, + expectScore: 0.6, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("23:03"), + origScore: 3, + expectScore: 3, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("8:03"), + origScore: 1.0, + expectScore: 0.2, + }, + { + name: "base eval", + tg: calls.TG(197, 3), + t: tMust("15:15"), + origScore: 3.0, + expectScore: 6.0, + }, + { + name: "overlapping eval", + tg: calls.TG(197, 3), + t: tMust("16:10"), + origScore: 1.0, + expectScore: 0.4, + }, + } + + for _, tc := range evalTests { + t.Run(tc.name, func(t *testing.T) { + cs := trending.Score[calls.Talkgroup]{ + ID: tc.tg, + Score: tc.origScore, + } + assert.Equal(t, tc.expectScore, toFixed(ac.ScaleScore(cs, tc.t), 5)) + }) + } +} + +func round(num float64) int { + return int(num + math.Copysign(0.5, num)) +} + +func toFixed(num float64, precision int) float64 { + output := math.Pow(10, float64(precision)) + return float64(round(num*output)) / output +} diff --git a/pkg/calls/talkgroups.go b/pkg/calls/talkgroups.go index 193280a..19a1702 100644 --- a/pkg/calls/talkgroups.go +++ b/pkg/calls/talkgroups.go @@ -3,9 +3,15 @@ package calls import ( "context" "fmt" + "sync" + "time" "dynatron.me/x/stillbox/pkg/gordio/database" + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/internal/trending" + + "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" ) @@ -45,34 +51,88 @@ func PackedTGs(tg []Talkgroup) []int64 { } type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow -type TalkgroupCache struct { + +type TalkgroupCache interface { + TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) + SystemName(ctx context.Context, id int) (string, bool) + ScaleScore(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 + Hint(ctx context.Context, tgs []Talkgroup) error + Load(ctx context.Context, tgs []int64) error + Invalidate() +} + +func (t *talkgroupCache) Invalidate() { + t.Lock() + defer t.Unlock() + clear(t.tgs) + clear(t.systems) + clear(t.AlertConfig) +} + +type talkgroupCache struct { + sync.RWMutex AlertConfig tgs tgMap systems map[int32]string } -func NewTalkgroupCache(ctx context.Context, packedTgs []int64) (*TalkgroupCache, error) { - tgc := &TalkgroupCache{ +func NewTalkgroupCache() TalkgroupCache { + tgc := &talkgroupCache{ tgs: make(tgMap), systems: make(map[int32]string), AlertConfig: make(AlertConfig), } - return tgc, tgc.LoadTGs(ctx, packedTgs) + return tgc } -func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error { - tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, packedTgs) +func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error { + t.RLock() + var toLoad []int64 + if len(t.tgs) > len(tgs)/2 { // TODO: instrument this + for _, tg := range tgs { + _, ok := t.tgs[tg] + if !ok { + toLoad = append(toLoad, tg.Pack()) + } + } + + } else { + toLoad = make([]int64, 0, len(tgs)) + for _, g := range tgs { + toLoad = append(toLoad, g.Pack()) + } + } + + if len(toLoad) > 0 { + t.RUnlock() + return t.Load(ctx, toLoad) + } + + t.RUnlock() + return nil +} + +func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error { + tg := TG(rec.SystemID, rec.Tgid) + t.tgs[tg] = rec + t.systems[rec.SystemID] = rec.SystemName + + return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) +} + +func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error { + tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { return err } - for _, rec := range tgRecords { - tg := TG(rec.SystemID, rec.Tgid) - t.tgs[tg] = rec - t.systems[rec.SystemID] = rec.SystemName + t.Lock() + defer t.Unlock() + + for _, rec := range tgRecords { + err := t.add(rec) - err := t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig) if err != nil { log.Error().Err(err).Msg("add alert config fail") } @@ -81,13 +141,51 @@ func (t *TalkgroupCache) LoadTGs(ctx context.Context, packedTgs []int64) error { return nil } -func (t *TalkgroupCache) TG(tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { +func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) { + t.RLock() rec, has := t.tgs[tg] + t.RUnlock() - return rec, has + if has { + return rec, has + } + + recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) + switch err { + case nil: + case pgx.ErrNoRows: + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + default: + log.Error().Err(err).Msg("TG() cache add db get") + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + } + + if len(recs) < 1 { + return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false + } + + t.Lock() + defer t.Unlock() + err = t.add(recs[0]) + if err != nil { + log.Error().Err(err).Msg("TG() cache add") + return recs[0], false + } + + return recs[0], true } -func (t *TalkgroupCache) SystemName(id int) (string, bool) { +func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) { n, has := t.systems[int32(id)] + + if !has { + sys, err := database.FromCtx(ctx).GetSystemName(ctx, id) + if err != nil { + return "", false + } + + return sys, true + } + return n, has } diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index 095d560..8e4ae56 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -53,6 +53,7 @@ type alerter struct { alertCache map[cl.Talkgroup]Alert renotify time.Duration notifier notify.Notifier + tgCache cl.TalkgroupCache } type offsetClock time.Duration @@ -87,7 +88,7 @@ func WithNotifier(n notify.Notifier) AlertOption { } // New creates a new Alerter using the provided configuration. -func New(cfg config.Alerting, opts ...AlertOption) Alerter { +func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter { if !cfg.Enable { return &noopAlerter{} } @@ -97,6 +98,7 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { alertCache: make(map[cl.Talkgroup]Alert), clock: timeseries.DefaultClock, renotify: DefaultRenotify, + tgCache: tgCache, } if cfg.Renotify != nil { @@ -154,36 +156,36 @@ const notificationTemplStr = `{{ range . -}} var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) -func (as *alerter) eval(ctx context.Context, now time.Time, add bool) ([]Alert, error) { - tgc, err := cl.NewTalkgroupCache(ctx, as.packedScoredTGs()) +func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Alert, error) { + err := as.tgCache.Hint(ctx, as.scoredTGs()) if err != nil { - return nil, fmt.Errorf("new TG cache: %w", err) + return nil, fmt.Errorf("prime TG cache: %w", err) } db := database.FromCtx(ctx) var notifications []Alert for _, s := range as.scores { - tgr, has := tgc.TG(s.ID) + origScore := s.Score + tgr, has := as.tgCache.TG(ctx, s.ID) if has { if !tgr.Alert { continue } s.Score *= float64(tgr.Weight) + s.Score = as.tgCache.ScaleScore(s, now) } - origScore := s.Score - s.Score = tgc.ScaleScore(s, now) - if s.Score > as.cfg.AlertThreshold { + if s.Score > as.cfg.AlertThreshold || testMode { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { - a, err := makeAlert(tgc, s, origScore) + a, err := as.makeAlert(ctx, s, origScore) if err != nil { return nil, fmt.Errorf("makeAlert: %w", err) } as.alertCache[s.ID] = a - if add { + if !testMode { err = db.AddAlert(ctx, a.ToAddAlertParams()) if err != nil { return nil, fmt.Errorf("addAlert: %w", err) @@ -195,7 +197,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, add bool) ([]Alert, } } - return notifications, nil + return notifications, nil } @@ -205,14 +207,13 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { alerts := make([]Alert, 0, len(as.scores)) ctx := r.Context() - alerts, err := as.eval(ctx, time.Now(), false) + alerts, err := as.eval(ctx, time.Now(), true) if err != nil { - log.Error().Err(err).Msg("test notification send") + log.Error().Err(err).Msg("test notification eval") http.Error(w, err.Error(), http.StatusInternalServerError) return } - err = as.sendNotification(ctx, alerts) if err != nil { log.Error().Err(err).Msg("test notification send") @@ -223,14 +224,24 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("Sent")) } -// packedScoredTGs gets a packed list of TG IDs for DB use. -func (as *alerter) packedScoredTGs() []int64 { - packedTGs := make([]int64, 0, len(as.scores)) +// scoredTGs gets a list of TGs. +func (as *alerter) scoredTGs() []cl.Talkgroup { + tgs := make([]cl.Talkgroup, 0, len(as.scores)) for _, s := range as.scores { - packedTGs = append(packedTGs, s.ID.Pack()) + tgs = append(tgs, s.ID) } - return packedTGs + return tgs +} + +// packedScoredTGs gets a list of packed TGIDs. +func (as *alerter) packedScoredTGs() []int64 { + tgs := make([]int64, 0, len(as.scores)) + for _, s := range as.scores { + tgs = append(tgs, s.ID.Pack()) + } + + return tgs } // notify iterates the scores and sends out any necessary notifications @@ -242,7 +253,7 @@ func (as *alerter) notify(ctx context.Context) error { as.Lock() defer as.Unlock() - notifications, err := as.eval(ctx, time.Now(), true) + notifications, err := as.eval(ctx, time.Now(), false) if err != nil { return err } @@ -267,11 +278,11 @@ func (a *Alert) ToAddAlertParams() database.AddAlertParams { f32score := float32(a.Score.Score) f32origscore := float32(a.OrigScore) return database.AddAlertParams{ - ID: a.ID, - Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, - PackedTg: a.Score.ID.Pack(), - Weight: &a.Weight, - Score: &f32score, + ID: a.ID, + Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, + PackedTg: a.Score.ID.Pack(), + Weight: &a.Weight, + Score: &f32score, OrigScore: &f32origscore, } } @@ -292,7 +303,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { // makeAlert creates a notification for later rendering by the template. // It takes a talkgroup Score as input. -func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { +func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { d := Alert{ ID: uuid.New(), Score: score, @@ -301,7 +312,7 @@ func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup], origS OrigScore: origScore, } - tgRecord, has := tgs.TG(score.ID) + tgRecord, has := as.tgCache.TG(ctx, score.ID) switch has { case true: d.Weight = tgRecord.Weight @@ -315,7 +326,7 @@ func makeAlert(tgs *cl.TalkgroupCache, score trending.Score[cl.Talkgroup], origS d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) } case false: - system, has := tgs.SystemName(int(score.ID.System)) + system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) if has { d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) } else { diff --git a/pkg/gordio/alerting/simulate.go b/pkg/gordio/alerting/simulate.go index 8f1f4a9..6863615 100644 --- a/pkg/gordio/alerting/simulate.go +++ b/pkg/gordio/alerting/simulate.go @@ -58,10 +58,12 @@ func (s *Simulation) stepClock(t time.Time) { } // Simulate begins the simulation using the DB handle from ctx. It returns final scores. -func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] { +func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) { now := time.Now() + tgc := cl.NewTalkgroupCache() + s.Enable = true - s.alerter = New(s.Alerting, WithClock(&s.clock)).(*alerter) + s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter) if time.Time(s.ScoreEnd).IsZero() { s.ScoreEnd = jsontime.Time(now) } @@ -79,7 +81,7 @@ func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] // backfill from lookback start until score start _, err := s.backfill(ctx, sinceLookback, time.Time(s.ScoreStart)) if err != nil { - log.Error().Err(err).Msg("simulate backfill") + return nil, fmt.Errorf("simulate backfill: %w", err) } // initial score @@ -99,13 +101,13 @@ func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] // backfill from scorestart until now. sim is enabled, so scoring will be done by stepClock() _, err = s.backfill(ctx, time.Time(s.ScoreStart), scoreEnd) if err != nil { - log.Error().Err(err).Msg("simulate backfill final") + return nil, fmt.Errorf("simulate backfill final: %w", err) } s.lastScore = scoreEnd sort.Sort(s.scores) - return s.scores + return s.scores, nil } // simulateHandler is the POST endpoint handler. @@ -136,6 +138,11 @@ func (as *alerter) simulateHandler(w http.ResponseWriter, r *http.Request) { return } - s.Simulate(ctx) + _, err = s.Simulate(ctx) + if err != nil { + err = fmt.Errorf("simulate: %w", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } s.tgStatsHandler(w, r) } diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 4f54909..92efbcc 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -6,6 +6,7 @@ import ( "os" "time" + "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/alerting" "dynatron.me/x/stillbox/pkg/gordio/auth" "dynatron.me/x/stillbox/pkg/gordio/config" @@ -34,6 +35,7 @@ type Server struct { alerter alerting.Alerter notifier notify.Notifier hup chan os.Signal + tgCache calls.TalkgroupCache } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -56,6 +58,8 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { return nil, err } + tgCache := calls.NewTalkgroupCache() + srv := &Server{ auth: authenticator, conf: cfg, @@ -63,8 +67,9 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { r: r, nex: nexus.New(), logger: logger, - alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)), + alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, + tgCache: tgCache, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)