diff --git a/internal/trending/item.go b/internal/trending/item.go index 058274e..6d2417e 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -1,6 +1,7 @@ package trending import ( + "context" "math" "time" ) @@ -31,7 +32,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] { } } -func (i *item[K]) score() Score[K] { +func (i *item[K]) score(ctx context.Context, id K) Score[K] { recentCount, count := i.computeCounts() if recentCount < i.options.countThreshold { return Score[K]{} @@ -58,7 +59,7 @@ func (i *item[K]) score() Score[K] { } i.decayMax() - mixedScore := 5 * (klScore + i.max) + mixedScore := 5 * (klScore + i.max) * i.options.weigher.Weight(ctx, id, i.options.clock.Now()) return Score[K]{ Score: mixedScore, diff --git a/internal/trending/trending.go b/internal/trending/trending.go index db76f2b..c7ae56c 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -1,6 +1,7 @@ package trending import ( + "context" "sort" "time" @@ -33,6 +34,7 @@ type options[K comparable] struct { creator TimeSeriesCreator[K] slidingWindowCreator SlidingWindowCreator[K] clock timeseries.Clock + weigher Weigher[K] halfLife time.Duration @@ -45,8 +47,24 @@ type options[K comparable] struct { countThreshold float64 } +type Weigher[K comparable] interface { + Weight(context.Context, K, time.Time) float64 +} + type Option[K comparable] func(*options[K]) +type unityWeigher[K comparable] float64 + +func (u unityWeigher[K]) Weight(_ context.Context, _ K, _ time.Time) float64 { + return float64(u) +} + +func WithWeigher[K comparable](w Weigher[K]) Option[K] { + return func(o *options[K]) { + o.weigher = w + } +} + func WithTimeSeries[K comparable](creator TimeSeriesCreator[K]) Option[K] { return func(o *options[K]) { o.creator = creator @@ -164,6 +182,9 @@ func NewScorer[K comparable](options ...Option[K]) Scorer[K] { if scorer.options.clock == nil { scorer.options.clock = timeseries.DefaultClock } + if scorer.options.weigher == nil { + scorer.options.weigher = unityWeigher[K](1.0) + } if scorer.options.slidingWindowCreator == nil { scorer.options.slidingWindowCreator = func(id K) SlidingWindow { return slidingwindow.NewSlidingWindow( @@ -189,10 +210,10 @@ func (s *Scorer[K]) addToItem(item *item[K], tm time.Time) { item.eventSeries.IncreaseAtTime(1, tm) } -func (s *Scorer[K]) Score() Scores[K] { +func (s *Scorer[K]) Score(ctx context.Context) Scores[K] { var scores Scores[K] for id, item := range s.items { - score := item.score() + score := item.score(ctx, id) score.ID = id scores = append(scores, score) } diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 59020af..f41e6bf 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -118,6 +118,7 @@ func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Ale trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold), trending.WithCountThreshold[talkgroups.ID](CountThreshold), trending.WithClock[talkgroups.ID](as.clock), + trending.WithWeigher[talkgroups.ID](as.tgCache), ) return as @@ -130,13 +131,13 @@ func (as *alerter) Go(ctx context.Context) { log.Error().Err(err).Msg("backfill") } - as.score(time.Now()) + as.score(ctx, time.Now()) ticker := time.NewTicker(alerterTickInterval) for { select { case now := <-ticker.C: - as.score(now) + as.score(ctx, now) err := as.notify(ctx) if err != nil { log.Error().Err(err).Msg("notify") @@ -178,7 +179,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al if s.Score > as.cfg.AlertThreshold || testMode { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { - s.Score = as.tgCache.ApplyAlertRules(s, now) + s.Score *= as.tgCache.ApplyAlertRules(s.ID, now) a, err := as.makeAlert(ctx, s, origScore) if err != nil { return nil, fmt.Errorf("makeAlert: %w", err) @@ -385,16 +386,16 @@ func (as *alerter) startBackfill(ctx context.Context) error { if err != nil { return err } - log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished") + log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score(ctx).Len()).Msg("backfill finished") return nil } -func (as *alerter) score(now time.Time) { +func (as *alerter) score(ctx context.Context, now time.Time) { as.Lock() defer as.Unlock() - as.scores = as.scorer.Score() + as.scores = as.scorer.Score(ctx) as.lastScore = now sort.Sort(as.scores) } @@ -420,7 +421,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim } as.scorer.AddEvent(tg, callDate) if as.sim != nil { // step the simulator if it is active - as.sim.stepClock(callDate) + as.sim.stepClock(ctx, callDate) } count++ } diff --git a/pkg/alerting/simulate.go b/pkg/alerting/simulate.go index 6d646fa..3d38ae4 100644 --- a/pkg/alerting/simulate.go +++ b/pkg/alerting/simulate.go @@ -46,12 +46,12 @@ func (s *Simulation) verify() error { } // stepClock is called by backfill during simulation operations. -func (s *Simulation) stepClock(t time.Time) { +func (s *Simulation) stepClock(ctx context.Context, t time.Time) { now := s.clock.Now() step := t.Sub(s.lastScore) if step > time.Duration(s.SimInterval) { s.clock += offsetClock(s.SimInterval) - s.scores = s.scorer.Score() + s.scores = s.scorer.Score(ctx) s.lastScore = now } @@ -85,7 +85,7 @@ func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.I } // initial score - s.scores = s.scorer.Score() + s.scores = s.scorer.Score(ctx) s.lastScore = time.Time(s.ScoreStart) ssT := time.Time(s.ScoreStart) diff --git a/pkg/alerting/stats.html b/pkg/alerting/stats.html index 24e1af6..cbaa108 100644 --- a/pkg/alerting/stats.html +++ b/pkg/alerting/stats.html @@ -85,8 +85,8 @@ {{ range .Scores }} {{ $tg := (index $.TGs .ID) }}