package alerting import ( "context" "fmt" "sync" "time" cl "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" "github.com/rs/zerolog/log" ) const ( StorageLookbackDays = 7 HalfLife = 30 * time.Minute RecentDuration = 2*time.Hour ScoreThreshold = -1 CountThreshold = 1 ) type AlertSink struct { sync.RWMutex scorer trending.Scorer[cl.Talkgroup] } type myClock struct { offset time.Duration } func (c *myClock) Now() time.Time { return time.Now().Add(c.offset) } func NewSink(ctx context.Context) *AlertSink { as := &AlertSink{ scorer: trending.NewScorer[cl.Talkgroup]( trending.WithTimeSeries(newTimeSeries), trending.WithStorageDuration[cl.Talkgroup](StorageLookbackDays*24*time.Hour), trending.WithRecentDuration[cl.Talkgroup](RecentDuration), trending.WithHalfLife[cl.Talkgroup](HalfLife), trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), trending.WithCountThreshold[cl.Talkgroup](CountThreshold), ), } go as.startBackfill(ctx) return as } func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, {Granularity: time.Minute, Count: 10}, {Granularity: time.Hour, Count: 24}, {Granularity: time.Hour * 24, Count: StorageLookbackDays}, }, )) return ts } func (as *AlertSink) startBackfill(ctx context.Context) { now := time.Now() cl := &myClock{-24*StorageLookbackDays*time.Hour} since := now.Add(StorageLookbackDays * -24 * time.Hour) log.Debug().Time("since", since).Msg("starting stats backfill") count, err := as.backfill(ctx, since) if err != nil { log.Error().Err(err).Msg("backfill failed") return } log.Debug().Int("count", count).Str("in", time.Now().Sub(now).String()).Int("len", as.scorer.Score().Len()).Msg("backfill finished") timeseries.DefaultClock = cl for { fmt.Printf("offs: %s (%s)\n", cl.offset.String(), cl.Now().String()) as.printScores(ctx) cl.offset += time.Minute*5 if cl.offset == time.Minute*5 { break } } } func (as *AlertSink) printScores(ctx context.Context) { db := database.FromCtx(ctx) as.Lock() defer as.Unlock() scores := as.scorer.Score() //fmt.Printf("score len is %d\n", scores.Len()) //const scoreMult = 1000000000 const scoreMult = 1 for _, s := range scores { if s.ID.Talkgroup != 1185 { continue } tg, _ := db.GetTalkgroup(ctx, int(s.ID.System), int(s.ID.Talkgroup)) tgn := "" if tg.Name != nil { tgn = *tg.Name } fmt.Printf("%s\t\t\t%d:%d c %f\trc %f\tscore %f\tprob %f\texp %f\tmax %f\tkl %f\n", tgn, s.ID.System, s.ID.Talkgroup, s.Count, s.RecentCount, s.Score*scoreMult, s.Probability, s.Expectation, s.Maximum, s.KLScore) } } func (as *AlertSink) backfill(ctx context.Context, since time.Time) (count int, err error) { db := database.FromCtx(ctx) const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 AND talkgroup = 1185 ` rows, err := db.Query(ctx, backfillStatsQuery, since, timeseries.DefaultClock.Now()) if err != nil { return count, err } defer rows.Close() as.Lock() defer as.Unlock() for rows.Next() { var tg cl.Talkgroup var callDate time.Time if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { return count, err } as.scorer.AddEvent(tg, callDate) count++ } if err := rows.Err(); err != nil { return count, err } return count, nil } func (as *AlertSink) SinkType() string { return "alerting" } func (as *AlertSink) Call(ctx context.Context, call *cl.Call) error { as.Lock() defer as.Unlock() as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) return nil }