Add talkgroup activity alerting #17

Merged
amigan merged 11 commits from alerting into trunk 2024-10-31 00:20:48 -04:00
3 changed files with 20 additions and 11 deletions
Showing only changes of commit 48e8078359 - Show all commits

View file

@ -3,8 +3,6 @@ package trending
import ( import (
"math" "math"
"time" "time"
timeseries "dynatron.me/x/stillbox/internal/timeseries"
) )
type item[K comparable] struct { type item[K comparable] struct {
@ -24,7 +22,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] {
defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour) defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour)
defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration) defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration)
return &item[K]{ return &item[K]{
eventSeries: options.creator(id), eventSeries: options.creator(id, options.clock),
maxSeries: options.slidingWindowCreator(id), maxSeries: options.slidingWindowCreator(id),
options: options, options: options,
@ -74,7 +72,7 @@ func (i *item[K]) score() Score[K] {
} }
func (i *item[K]) computeCounts() (float64, float64) { func (i *item[K]) computeCounts() (float64, float64) {
now := timeseries.DefaultClock.Now() now := i.options.clock.Now()
totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now) totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now)
count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now) count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now)
return count, totalCount return count, totalCount
@ -90,11 +88,11 @@ func (i *item[K]) decayMax() {
func (i *item[K]) updateMax(score float64) { func (i *item[K]) updateMax(score float64) {
i.max = score i.max = score
i.maxTime = timeseries.DefaultClock.Now() i.maxTime = i.options.clock.Now()
} }
func (i *item[K]) computeExponentialDecayMultiplier() float64 { func (i *item[K]) computeExponentialDecayMultiplier() float64 {
return math.Pow(0.5, float64(timeseries.DefaultClock.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds()) return math.Pow(0.5, float64(i.options.clock.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds())
} }
func computeKullbackLeibler(probability float64, expectation float64) float64 { func computeKullbackLeibler(probability float64, expectation float64) float64 {

View file

@ -32,6 +32,7 @@ var defaultCountThreshold = 3.0
type options[K comparable] struct { type options[K comparable] struct {
creator TimeSeriesCreator[K] creator TimeSeriesCreator[K]
slidingWindowCreator SlidingWindowCreator[K] slidingWindowCreator SlidingWindowCreator[K]
clock timeseries.Clock
halfLife time.Duration halfLife time.Duration
@ -94,6 +95,12 @@ func WithCountThreshold[K comparable](threshold float64) Option[K] {
} }
} }
func WithClock[K comparable](clock timeseries.Clock) Option[K] {
return func(o *options[K]) {
o.clock = clock
}
}
type Scorer[K comparable] struct { type Scorer[K comparable] struct {
options options[K] options options[K]
items map[K]*item[K] items map[K]*item[K]
@ -111,9 +118,9 @@ type TimeSeries interface {
Range(start, end time.Time) (float64, error) Range(start, end time.Time) (float64, error)
} }
type TimeSeriesCreator[K comparable] func(K) TimeSeries type TimeSeriesCreator[K comparable] func(K, timeseries.Clock) TimeSeries
func NewMemoryTimeSeries[K comparable](id K) TimeSeries { func NewMemoryTimeSeries[K comparable](id K, clock timeseries.Clock) TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{ []timeseries.Granularity{
{Granularity: time.Second, Count: 60}, {Granularity: time.Second, Count: 60},
@ -121,7 +128,7 @@ func NewMemoryTimeSeries[K comparable](id K) TimeSeries {
{Granularity: time.Hour, Count: 24}, {Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: 7}, {Granularity: time.Hour * 24, Count: 7},
}, },
)) ), timeseries.WithClock(clock))
return ts return ts
} }
@ -154,11 +161,15 @@ func NewScorer[K comparable](options ...Option[K]) Scorer[K] {
if scorer.options.baseCount == 0.0 { if scorer.options.baseCount == 0.0 {
scorer.options.baseCount = defaultBaseCount scorer.options.baseCount = defaultBaseCount
} }
if scorer.options.clock == nil {
scorer.options.clock = timeseries.DefaultClock
}
if scorer.options.slidingWindowCreator == nil { if scorer.options.slidingWindowCreator == nil {
scorer.options.slidingWindowCreator = func(id K) SlidingWindow { scorer.options.slidingWindowCreator = func(id K) SlidingWindow {
return slidingwindow.NewSlidingWindow( return slidingwindow.NewSlidingWindow(
slidingwindow.WithStep(time.Hour*24), slidingwindow.WithStep(time.Hour*24),
slidingwindow.WithDuration(scorer.options.storageDuration), slidingwindow.WithDuration(scorer.options.storageDuration),
slidingwindow.WithClock(scorer.options.clock),
) )
} }
} }

View file

@ -72,7 +72,7 @@ func (as *Alerter) Go(ctx context.Context) {
} }
func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { func newTimeSeries(id cl.Talkgroup, clock timeseries.Clock) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{ []timeseries.Granularity{
{Granularity: time.Second, Count: 60}, {Granularity: time.Second, Count: 60},
@ -80,7 +80,7 @@ func newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
{Granularity: time.Hour, Count: 24}, {Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: StorageLookbackDays}, {Granularity: time.Hour * 24, Count: StorageLookbackDays},
}, },
)) ), timeseries.WithClock(clock))
return ts return ts
} }