From 90d61aed735b3e00ff2e74eb257741cd3c33853c Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 3 Nov 2024 19:11:45 -0500 Subject: [PATCH] Scale scores during score operation. Closes #24. --- internal/trending/item.go | 5 +++-- internal/trending/trending.go | 25 +++++++++++++++++++++++-- pkg/alerting/alerting.go | 15 ++++++++------- pkg/alerting/simulate.go | 6 +++--- pkg/alerting/stats.html | 4 ++-- pkg/server/signals.go | 1 + pkg/talkgroups/alertconfig.go | 9 ++++----- pkg/talkgroups/alertconfig_test.go | 2 +- pkg/talkgroups/cache.go | 26 +++++++++++++++++++++++++- 9 files changed, 70 insertions(+), 23 deletions(-) diff --git a/internal/trending/item.go b/internal/trending/item.go index 058274e..6d2417e 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -1,6 +1,7 @@ package trending import ( + "context" "math" "time" ) @@ -31,7 +32,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] { } } -func (i *item[K]) score() Score[K] { +func (i *item[K]) score(ctx context.Context, id K) Score[K] { recentCount, count := i.computeCounts() if recentCount < i.options.countThreshold { return Score[K]{} @@ -58,7 +59,7 @@ func (i *item[K]) score() Score[K] { } i.decayMax() - mixedScore := 5 * (klScore + i.max) + mixedScore := 5 * (klScore + i.max) * i.options.weigher.Weight(ctx, id, i.options.clock.Now()) return Score[K]{ Score: mixedScore, diff --git a/internal/trending/trending.go b/internal/trending/trending.go index db76f2b..c7ae56c 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -1,6 +1,7 @@ package trending import ( + "context" "sort" "time" @@ -33,6 +34,7 @@ type options[K comparable] struct { creator TimeSeriesCreator[K] slidingWindowCreator SlidingWindowCreator[K] clock timeseries.Clock + weigher Weigher[K] halfLife time.Duration @@ -45,8 +47,24 @@ type options[K comparable] struct { countThreshold float64 } +type Weigher[K comparable] interface { + Weight(context.Context, K, time.Time) float64 +} + type Option[K comparable] func(*options[K]) +type unityWeigher[K comparable] float64 + +func (u unityWeigher[K]) Weight(_ context.Context, _ K, _ time.Time) float64 { + return float64(u) +} + +func WithWeigher[K comparable](w Weigher[K]) Option[K] { + return func(o *options[K]) { + o.weigher = w + } +} + func WithTimeSeries[K comparable](creator TimeSeriesCreator[K]) Option[K] { return func(o *options[K]) { o.creator = creator @@ -164,6 +182,9 @@ func NewScorer[K comparable](options ...Option[K]) Scorer[K] { if scorer.options.clock == nil { scorer.options.clock = timeseries.DefaultClock } + if scorer.options.weigher == nil { + scorer.options.weigher = unityWeigher[K](1.0) + } if scorer.options.slidingWindowCreator == nil { scorer.options.slidingWindowCreator = func(id K) SlidingWindow { return slidingwindow.NewSlidingWindow( @@ -189,10 +210,10 @@ func (s *Scorer[K]) addToItem(item *item[K], tm time.Time) { item.eventSeries.IncreaseAtTime(1, tm) } -func (s *Scorer[K]) Score() Scores[K] { +func (s *Scorer[K]) Score(ctx context.Context) Scores[K] { var scores Scores[K] for id, item := range s.items { - score := item.score() + score := item.score(ctx, id) score.ID = id scores = append(scores, score) } diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 59020af..f41e6bf 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -118,6 +118,7 @@ func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Ale trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold), trending.WithCountThreshold[talkgroups.ID](CountThreshold), trending.WithClock[talkgroups.ID](as.clock), + trending.WithWeigher[talkgroups.ID](as.tgCache), ) return as @@ -130,13 +131,13 @@ func (as *alerter) Go(ctx context.Context) { log.Error().Err(err).Msg("backfill") } - as.score(time.Now()) + as.score(ctx, time.Now()) ticker := time.NewTicker(alerterTickInterval) for { select { case now := <-ticker.C: - as.score(now) + as.score(ctx, now) err := as.notify(ctx) if err != nil { log.Error().Err(err).Msg("notify") @@ -178,7 +179,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al if s.Score > as.cfg.AlertThreshold || testMode { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { - s.Score = as.tgCache.ApplyAlertRules(s, now) + s.Score *= as.tgCache.ApplyAlertRules(s.ID, now) a, err := as.makeAlert(ctx, s, origScore) if err != nil { return nil, fmt.Errorf("makeAlert: %w", err) @@ -385,16 +386,16 @@ func (as *alerter) startBackfill(ctx context.Context) error { if err != nil { return err } - log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished") + log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score(ctx).Len()).Msg("backfill finished") return nil } -func (as *alerter) score(now time.Time) { +func (as *alerter) score(ctx context.Context, now time.Time) { as.Lock() defer as.Unlock() - as.scores = as.scorer.Score() + as.scores = as.scorer.Score(ctx) as.lastScore = now sort.Sort(as.scores) } @@ -420,7 +421,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim } as.scorer.AddEvent(tg, callDate) if as.sim != nil { // step the simulator if it is active - as.sim.stepClock(callDate) + as.sim.stepClock(ctx, callDate) } count++ } diff --git a/pkg/alerting/simulate.go b/pkg/alerting/simulate.go index 6d646fa..3d38ae4 100644 --- a/pkg/alerting/simulate.go +++ b/pkg/alerting/simulate.go @@ -46,12 +46,12 @@ func (s *Simulation) verify() error { } // stepClock is called by backfill during simulation operations. -func (s *Simulation) stepClock(t time.Time) { +func (s *Simulation) stepClock(ctx context.Context, t time.Time) { now := s.clock.Now() step := t.Sub(s.lastScore) if step > time.Duration(s.SimInterval) { s.clock += offsetClock(s.SimInterval) - s.scores = s.scorer.Score() + s.scores = s.scorer.Score(ctx) s.lastScore = now } @@ -85,7 +85,7 @@ func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.I } // initial score - s.scores = s.scorer.Score() + s.scores = s.scorer.Score(ctx) s.lastScore = time.Time(s.ScoreStart) ssT := time.Time(s.ScoreStart) diff --git a/pkg/alerting/stats.html b/pkg/alerting/stats.html index 24e1af6..cbaa108 100644 --- a/pkg/alerting/stats.html +++ b/pkg/alerting/stats.html @@ -85,8 +85,8 @@ {{ range .Scores }} {{ $tg := (index $.TGs .ID) }} - {{ $tg.Name_2}} - {{ $tg.Name}} + {{ $tg.System.Name}} + {{ $tg.Talkgroup.Name}} {{ .ID.Talkgroup }} {{ f .Count 0 }} {{ f .RecentCount 0 }} diff --git a/pkg/server/signals.go b/pkg/server/signals.go index 35ad4df..267be3d 100644 --- a/pkg/server/signals.go +++ b/pkg/server/signals.go @@ -17,6 +17,7 @@ func (s *Server) huppers() []hupper { return []hupper{ s.logger, s.auth, + s.tgs, } } diff --git a/pkg/talkgroups/alertconfig.go b/pkg/talkgroups/alertconfig.go index 12d8305..1a7f7b6 100644 --- a/pkg/talkgroups/alertconfig.go +++ b/pkg/talkgroups/alertconfig.go @@ -6,7 +6,6 @@ import ( "time" "dynatron.me/x/stillbox/internal/ruletime" - "dynatron.me/x/stillbox/internal/trending" ) type AlertConfig struct { @@ -50,15 +49,15 @@ func (ac *AlertConfig) UnmarshalTGRules(tg ID, confBytes []byte) error { return nil } -func (ac *AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 { +func (ac *AlertConfig) ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64 { ac.RLock() - s, has := ac.m[score.ID] + s, has := ac.m[id] ac.RUnlock() if !has { - return score.Score + return 1.0 } - final := score.Score + final := 1.0 for _, ar := range s { if ar.MatchTime(t, coversOpts...) { diff --git a/pkg/talkgroups/alertconfig_test.go b/pkg/talkgroups/alertconfig_test.go index e966831..4819f13 100644 --- a/pkg/talkgroups/alertconfig_test.go +++ b/pkg/talkgroups/alertconfig_test.go @@ -126,7 +126,7 @@ func TestAlertConfig(t *testing.T) { ID: tc.tg, Score: tc.origScore, } - assert.Equal(t, tc.expectScore, toFixed(ac.ApplyAlertRules(cs, tc.t), 5)) + assert.Equal(t, tc.expectScore, toFixed(cs.Score*ac.ApplyAlertRules(cs.ID, tc.t), 5)) }) } } diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 1ab4ed8..6c37f12 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/internal/ruletime" @@ -25,7 +26,7 @@ type Store interface { SystemName(ctx context.Context, id int) (string, bool) // ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score. - ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 + ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64 // Hint hints the Store that the provided talkgroups will be asked for. Hint(ctx context.Context, tgs []ID) error @@ -35,6 +36,12 @@ type Store interface { // Invalidate invalidates any caching in the Store. Invalidate() + + // Include the trending Weigher interface + trending.Weigher[ID] + + // Hupper + HUP(*config.Config) } type CtxStoreKeyT string @@ -54,6 +61,10 @@ func StoreFrom(ctx context.Context) Store { return s } +func (t *cache) HUP(_ *config.Config) { + t.Invalidate() +} + func (t *cache) Invalidate() { t.Lock() defer t.Unlock() @@ -145,6 +156,19 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error { var ErrNotFound = errors.New("talkgroup not found") +func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 { + tg, err := t.TG(ctx, id) + if err != nil { + return 1.0 + } + + m := float64(tg.Weight) + + m *= t.AlertConfig.ApplyAlertRules(id, tm) + + return float64(tg.Weight) +} + func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { t.RLock() rec, has := t.tgs[tg]