From 922da11c8c6f8b33daab3e66d3ef750837a7ae78 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 23 Oct 2024 17:08:29 -0400 Subject: [PATCH] wip --- .gitignore | 2 + internal/timeseries/timeseries.go | 6 +- internal/timeseries/timeseries_test.go | 2 - internal/trending/item.go | 10 ++- internal/trending/score.go | 2 + internal/trending/trending.go | 2 +- pkg/gordio/sinks/alerting/alerting.go | 96 +++++++++++++++++--------- 7 files changed, 79 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index a37f180..0223701 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ config.yaml +config.test.yaml mydb.sql client/calls/ !client/calls/.gitkeep @@ -6,3 +7,4 @@ client/calls/ /calls Session.vim *.log +*.dlv diff --git a/internal/timeseries/timeseries.go b/internal/timeseries/timeseries.go index 889cec5..a0a8747 100644 --- a/internal/timeseries/timeseries.go +++ b/internal/timeseries/timeseries.go @@ -85,9 +85,11 @@ type Clock interface { Now() time.Time } -// defaultClock is used in case no clock is provided to the constructor. +// DefaultClock is used in case no clock is provided to the constructor. type defaultClock struct{} +var DefaultClock Clock = &defaultClock{} + func (c *defaultClock) Now() time.Time { return time.Now() } @@ -137,7 +139,7 @@ func NewTimeSeries(os ...Option) (*TimeSeries, error) { o(&opts) } if opts.clock == nil { - opts.clock = &defaultClock{} + opts.clock = DefaultClock } if opts.granularities == nil { opts.granularities = defaultGranularities diff --git a/internal/timeseries/timeseries_test.go b/internal/timeseries/timeseries_test.go index 2d0f745..8d87c30 100644 --- a/internal/timeseries/timeseries_test.go +++ b/internal/timeseries/timeseries_test.go @@ -3,8 +3,6 @@ package timeseries import ( "testing" "time" - - "github.com/benbjohnson/clock" ) // TODO: do table based testing diff --git a/internal/trending/item.go b/internal/trending/item.go index 02517e5..665bbf1 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -3,6 +3,8 @@ package trending import ( "math" "time" + + timeseries "dynatron.me/x/stillbox/internal/timeseries" ) type item[K comparable] struct { @@ -66,11 +68,13 @@ func (i *item[K]) score() score[K] { Expectation: expectation, Maximum: i.max, KLScore: klScore, + Count: count, + RecentCount: recentCount, } } func (i *item[K]) computeCounts() (float64, float64) { - now := time.Now() + now := timeseries.DefaultClock.Now() totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now) count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now) return count, totalCount @@ -86,11 +90,11 @@ func (i *item[K]) decayMax() { func (i *item[K]) updateMax(score float64) { i.max = score - i.maxTime = time.Now() + i.maxTime = timeseries.DefaultClock.Now() } func (i *item[K]) computeExponentialDecayMultiplier() float64 { - return math.Pow(0.5, float64(time.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds()) + return math.Pow(0.5, float64(timeseries.DefaultClock.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds()) } func computeKullbackLeibler(probability float64, expectation float64) float64 { diff --git a/internal/trending/score.go b/internal/trending/score.go index 32b5976..dbbc4ca 100644 --- a/internal/trending/score.go +++ b/internal/trending/score.go @@ -7,6 +7,8 @@ type score[K comparable] struct { Expectation float64 Maximum float64 KLScore float64 + Count float64 + RecentCount float64 } type Scores[K comparable] []score[K] diff --git a/internal/trending/trending.go b/internal/trending/trending.go index 136dcba..a7278c6 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -25,7 +25,7 @@ var defaultHalfLife = 2 * time.Hour var defaultRecentDuration = 5 * time.Minute var defaultStorageDuration = 7 * 24 * time.Hour var defaultMaxResults = 100 -var defaultBaseCount = 3 +var defaultBaseCount = 1 var defaultScoreThreshold = 0.01 var defaultCountThreshold = 3.0 diff --git a/pkg/gordio/sinks/alerting/alerting.go b/pkg/gordio/sinks/alerting/alerting.go index eaa1e87..73a1527 100644 --- a/pkg/gordio/sinks/alerting/alerting.go +++ b/pkg/gordio/sinks/alerting/alerting.go @@ -16,9 +16,11 @@ import ( ) const ( - StorageLookbackDays = 2 - HalfLife = time.Hour - RecentDuration = time.Hour + StorageLookbackDays = 4 + HalfLife = 30 * time.Minute + RecentDuration = 12*time.Hour + ScoreThreshold = -1 + CountThreshold = 1 ) type AlertSink struct { @@ -26,6 +28,14 @@ type AlertSink struct { scorer trending.Scorer[cl.Talkgroup] } +type myClock struct { + offset time.Duration + +} +func (c *myClock) Now() time.Time { + return time.Now().Add(c.offset) +} + func NewSink(ctx context.Context) *AlertSink { as := &AlertSink{ scorer: trending.NewScorer[cl.Talkgroup]( @@ -33,6 +43,8 @@ func NewSink(ctx context.Context) *AlertSink { trending.WithStorageDuration[cl.Talkgroup](StorageLookbackDays*24*time.Hour), trending.WithRecentDuration[cl.Talkgroup](RecentDuration), trending.WithHalfLife[cl.Talkgroup](HalfLife), + trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), + trending.WithCountThreshold[cl.Talkgroup](CountThreshold), ), } @@ -41,41 +53,67 @@ func NewSink(ctx context.Context) *AlertSink { return as } +func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { + ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( + []timeseries.Granularity{ + {Granularity: time.Second, Count: 60}, + {Granularity: time.Minute, Count: 60}, + {Granularity: time.Hour, Count: 24}, + {Granularity: time.Hour * 24, Count: StorageLookbackDays}, + }, + )) + return ts +} + func (as *AlertSink) startBackfill(ctx context.Context) { - since := time.Now().Add(StorageLookbackDays * -24 * time.Hour) + now := time.Now() + cl := &myClock{-18*time.Hour} + timeseries.DefaultClock = cl + since := now.Add(StorageLookbackDays * -24 * 3 * time.Hour) log.Debug().Time("since", since).Msg("starting stats backfill") count, err := as.backfill(ctx, since) if err != nil { log.Error().Err(err).Msg("backfill failed") return } - log.Debug().Int("count", count).Int("len", as.scorer.Score().Len()).Msg("backfill finished") - as.printScores() + log.Debug().Int("count", count).Str("in", time.Now().Sub(now).String()).Int("len", as.scorer.Score().Len()).Msg("backfill finished") + for { + fmt.Printf("offs: %s\n", cl.offset.String()) + as.printScores(ctx) + cl.offset += time.Minute*5 + if cl.offset == time.Minute*5 { + break + } + } } -type score[K comparable] struct { - ID K - Score float64 - Probability float64 - Expectation float64 - Maximum float64 - KLScore float64 -} - -func (as *AlertSink) printScores() { +func (as *AlertSink) printScores(ctx context.Context) { + db := database.FromCtx(ctx) + as.Lock() + defer as.Unlock() scores := as.scorer.Score() fmt.Printf("score len is %d\n", scores.Len()) + //const scoreMult = 1000000000 + const scoreMult = 1 for _, s := range scores { - fmt.Printf("%d:%d score %f prob %f exp %f max %f kl %f", s.ID.System, s.ID.Talkgroup, s.Score, - s.Probability, s.Expectation, s.Maximum, s.KLScore) + if s.ID.Talkgroup != 1616 && s.ID.Talkgroup != 1617 { + continue + } + tg, _ := db.GetTalkgroup(ctx, int(s.ID.System), int(s.ID.Talkgroup)) + tgn := "" + if tg.Name != nil { + tgn = *tg.Name + } + fmt.Printf("%s\t\t\t%d:%d c %f\trc %f\tscore %f\tprob %f\texp %f\tmax %f\tkl %f\n", tgn, s.ID.System, s.ID.Talkgroup, + s.Count, s.RecentCount, s.Score*scoreMult, s.Probability, s.Expectation, s.Maximum, s.KLScore) } } func (as *AlertSink) backfill(ctx context.Context, since time.Time) (count int, err error) { db := database.FromCtx(ctx) - const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1` + const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2` - rows, err := db.Query(ctx, backfillStatsQuery, since) + rows, err := db.Query(ctx, backfillStatsQuery, since, timeseries.DefaultClock.Now()) if err != nil { return count, err } @@ -101,22 +139,14 @@ func (as *AlertSink) backfill(ctx context.Context, since time.Time) (count int, return count, nil } -func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { - ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( - []timeseries.Granularity{ - {Granularity: time.Second, Count: 60}, - {Granularity: time.Minute, Count: 10}, - {Granularity: time.Hour, Count: 24}, - {Granularity: time.Hour * 24, Count: StorageLookbackDays}, - }, - )) - return ts -} - func (as *AlertSink) SinkType() string { return "alerting" } -func (ns *AlertSink) Call(ctx context.Context, call *cl.Call) error { +func (as *AlertSink) Call(ctx context.Context, call *cl.Call) error { + as.Lock() + defer as.Unlock() + as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) + return nil }