diff --git a/internal/trending/item.go b/internal/trending/item.go index 6d2417e..aa5dc24 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -1,7 +1,6 @@ package trending import ( - "context" "math" "time" ) @@ -32,7 +31,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] { } } -func (i *item[K]) score(ctx context.Context, id K) Score[K] { +func (i *item[K]) score(id K) Score[K] { recentCount, count := i.computeCounts() if recentCount < i.options.countThreshold { return Score[K]{} @@ -59,7 +58,7 @@ func (i *item[K]) score(ctx context.Context, id K) Score[K] { } i.decayMax() - mixedScore := 5 * (klScore + i.max) * i.options.weigher.Weight(ctx, id, i.options.clock.Now()) + mixedScore := 5 * (klScore + i.max) return Score[K]{ Score: mixedScore, diff --git a/internal/trending/trending.go b/internal/trending/trending.go index c7ae56c..1814225 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -1,7 +1,6 @@ package trending import ( - "context" "sort" "time" @@ -34,7 +33,6 @@ type options[K comparable] struct { creator TimeSeriesCreator[K] slidingWindowCreator SlidingWindowCreator[K] clock timeseries.Clock - weigher Weigher[K] halfLife time.Duration @@ -47,24 +45,8 @@ type options[K comparable] struct { countThreshold float64 } -type Weigher[K comparable] interface { - Weight(context.Context, K, time.Time) float64 -} - type Option[K comparable] func(*options[K]) -type unityWeigher[K comparable] float64 - -func (u unityWeigher[K]) Weight(_ context.Context, _ K, _ time.Time) float64 { - return float64(u) -} - -func WithWeigher[K comparable](w Weigher[K]) Option[K] { - return func(o *options[K]) { - o.weigher = w - } -} - func WithTimeSeries[K comparable](creator TimeSeriesCreator[K]) Option[K] { return func(o *options[K]) { o.creator = creator @@ -182,9 +164,6 @@ func NewScorer[K comparable](options ...Option[K]) Scorer[K] { if scorer.options.clock == nil { scorer.options.clock = timeseries.DefaultClock } - if scorer.options.weigher == nil { - scorer.options.weigher = unityWeigher[K](1.0) - } if scorer.options.slidingWindowCreator == nil { scorer.options.slidingWindowCreator = func(id K) SlidingWindow { return slidingwindow.NewSlidingWindow( @@ -210,10 +189,10 @@ func (s *Scorer[K]) addToItem(item *item[K], tm time.Time) { item.eventSeries.IncreaseAtTime(1, tm) } -func (s *Scorer[K]) Score(ctx context.Context) Scores[K] { +func (s *Scorer[K]) Score() Scores[K] { var scores Scores[K] for id, item := range s.items { - score := item.score(ctx, id) + score := item.score(id) score.ID = id scores = append(scores, score) } diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 1a0eb03..cf83f66 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -118,7 +118,6 @@ func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Ale trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold), trending.WithCountThreshold[talkgroups.ID](CountThreshold), trending.WithClock[talkgroups.ID](as.clock), - trending.WithWeigher[talkgroups.ID](as.tgCache), ) return as @@ -131,13 +130,13 @@ func (as *alerter) Go(ctx context.Context) { log.Error().Err(err).Msg("backfill") } - as.score(ctx, time.Now()) + as.score(time.Now()) ticker := time.NewTicker(alerterTickInterval) for { select { case now := <-ticker.C: - as.score(ctx, now) + as.score(now) err := as.notify(ctx) if err != nil { log.Error().Err(err).Msg("notify") @@ -179,6 +178,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al if s.Score > as.cfg.AlertThreshold || testMode { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { + s.Score *= as.tgCache.Weight(ctx, s.ID, now) a, err := as.makeAlert(ctx, s, origScore) if err != nil { return nil, fmt.Errorf("makeAlert: %w", err) @@ -385,16 +385,16 @@ func (as *alerter) startBackfill(ctx context.Context) error { if err != nil { return err } - log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score(ctx).Len()).Msg("backfill finished") + log.Debug().Int("callsCount", count).Str("in", time.Since(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished") return nil } -func (as *alerter) score(ctx context.Context, now time.Time) { +func (as *alerter) score(now time.Time) { as.Lock() defer as.Unlock() - as.scores = as.scorer.Score(ctx) + as.scores = as.scorer.Score() as.lastScore = now sort.Sort(as.scores) } @@ -420,7 +420,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim } as.scorer.AddEvent(tg, callDate) if as.sim != nil { // step the simulator if it is active - as.sim.stepClock(ctx, callDate) + as.sim.stepClock(callDate) } count++ } diff --git a/pkg/alerting/simulate.go b/pkg/alerting/simulate.go index 3d38ae4..6d646fa 100644 --- a/pkg/alerting/simulate.go +++ b/pkg/alerting/simulate.go @@ -46,12 +46,12 @@ func (s *Simulation) verify() error { } // stepClock is called by backfill during simulation operations. -func (s *Simulation) stepClock(ctx context.Context, t time.Time) { +func (s *Simulation) stepClock(t time.Time) { now := s.clock.Now() step := t.Sub(s.lastScore) if step > time.Duration(s.SimInterval) { s.clock += offsetClock(s.SimInterval) - s.scores = s.scorer.Score(ctx) + s.scores = s.scorer.Score() s.lastScore = now } @@ -85,7 +85,7 @@ func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.I } // initial score - s.scores = s.scorer.Score(ctx) + s.scores = s.scorer.Score() s.lastScore = time.Time(s.ScoreStart) ssT := time.Time(s.ScoreStart) diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 350a3a4..22e9500 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -6,12 +6,11 @@ import ( "sync" "time" + "dynatron.me/x/stillbox/internal/ruletime" + "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" - "dynatron.me/x/stillbox/internal/ruletime" - "dynatron.me/x/stillbox/internal/trending" - "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" ) @@ -37,8 +36,8 @@ type Store interface { // Invalidate invalidates any caching in the Store. Invalidate() - // Include the trending Weigher interface - trending.Weigher[ID] + // Weight returns the final weight of this talkgroup, including its static and rules-derived weight. + Weight(ctx context.Context, id ID, t time.Time) float64 // Hupper HUP(*config.Config)