diff --git a/config.sample.yaml b/config.sample.yaml index 9ee1eb2..3b15d97 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -24,3 +24,8 @@ rateLimit: enable: true requests: 200 over: 2m +alerting: + enable: true + lookbackDays: 7 + halfLife: 30m + recent: 2h diff --git a/internal/trending/item.go b/internal/trending/item.go index 818e8e3..058274e 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -22,7 +22,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] { defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour) defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration) return &item[K]{ - eventSeries: options.creator(id, options.clock), + eventSeries: options.creator(id), maxSeries: options.slidingWindowCreator(id), options: options, diff --git a/internal/trending/trending.go b/internal/trending/trending.go index f213e89..db76f2b 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -118,9 +118,9 @@ type TimeSeries interface { Range(start, end time.Time) (float64, error) } -type TimeSeriesCreator[K comparable] func(K, timeseries.Clock) TimeSeries +type TimeSeriesCreator[K comparable] func(K) TimeSeries -func NewMemoryTimeSeries[K comparable](id K, clock timeseries.Clock) TimeSeries { +func NewMemoryTimeSeries[K comparable](id K) TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, @@ -128,7 +128,7 @@ func NewMemoryTimeSeries[K comparable](id K, clock timeseries.Clock) TimeSeries {Granularity: time.Hour, Count: 24}, {Granularity: time.Hour * 24, Count: 7}, }, - ), timeseries.WithClock(clock)) + )) return ts } diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index 6dc7396..12c9bfa 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -7,7 +7,9 @@ import ( "time" cl "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/database" + "dynatron.me/x/stillbox/pkg/gordio/sinks" "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" @@ -16,49 +18,85 @@ import ( ) const ( - StorageLookbackDays = 7 - HalfLife = 30 * time.Minute - RecentDuration = 2 * time.Hour ScoreThreshold = -1 CountThreshold = 1.0 - AlerterTickInterval = time.Minute + alerterTickInterval = time.Minute ) -type Alerter struct { +type Alerter interface { + sinks.Sink + + Enabled() bool + Go(context.Context) + + stats +} + +type alerter struct { sync.RWMutex + clock timeseries.Clock + cfg config.Alerting scorer trending.Scorer[cl.Talkgroup] scores trending.Scores[cl.Talkgroup] lastScore time.Time } -type myClock struct { - offset time.Duration +type noopAlerter struct{} + +type offsetClock time.Duration + +func (c *offsetClock) Now() time.Time { + return time.Now().Add(c.Duration()) } -func (c *myClock) Now() time.Time { - return time.Now().Add(c.offset) +func (c *offsetClock) Duration() time.Duration { + return time.Duration(*c) } -func New() *Alerter { - as := &Alerter{ - scorer: trending.NewScorer[cl.Talkgroup]( - trending.WithTimeSeries(newTimeSeries), - trending.WithStorageDuration[cl.Talkgroup](StorageLookbackDays*24*time.Hour), - trending.WithRecentDuration[cl.Talkgroup](RecentDuration), - trending.WithHalfLife[cl.Talkgroup](HalfLife), - trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), - trending.WithCountThreshold[cl.Talkgroup](CountThreshold), - ), +func OffsetClock(d time.Duration) offsetClock { + return offsetClock(d) +} + +type AlertOption func(*alerter) + +func WithClock(clock timeseries.Clock) AlertOption { + return func(as *alerter) { + as.clock = clock } +} + +func New(cfg config.Alerting, opts ...AlertOption) Alerter { + if !cfg.Enable { + return &noopAlerter{} + } + + as := &alerter{ + cfg: cfg, + clock: timeseries.DefaultClock, + } + + for _, opt := range opts { + opt(as) + } + + as.scorer = trending.NewScorer[cl.Talkgroup]( + trending.WithTimeSeries(as.newTimeSeries), + trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), + trending.WithRecentDuration[cl.Talkgroup](cfg.Recent), + trending.WithHalfLife[cl.Talkgroup](cfg.HalfLife), + trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), + trending.WithCountThreshold[cl.Talkgroup](CountThreshold), + trending.WithClock[cl.Talkgroup](as.clock), + ) return as } -func (as *Alerter) Go(ctx context.Context) { +func (as *alerter) Go(ctx context.Context) { as.startBackfill(ctx) as.score(ctx, time.Now()) - ticker := time.NewTicker(AlerterTickInterval) + ticker := time.NewTicker(alerterTickInterval) for { select { @@ -72,21 +110,21 @@ func (as *Alerter) Go(ctx context.Context) { } -func newTimeSeries(id cl.Talkgroup, clock timeseries.Clock) trending.TimeSeries { +func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, {Granularity: time.Minute, Count: 10}, {Granularity: time.Hour, Count: 24}, - {Granularity: time.Hour * 24, Count: StorageLookbackDays}, + {Granularity: time.Hour * 24, Count: int(as.cfg.LookbackDays)}, }, - ), timeseries.WithClock(clock)) + ), timeseries.WithClock(as.clock)) return ts } -func (as *Alerter) startBackfill(ctx context.Context) { +func (as *alerter) startBackfill(ctx context.Context) { now := time.Now() - since := now.Add(StorageLookbackDays * -24 * time.Hour) + since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays)) log.Debug().Time("since", since).Msg("starting stats backfill") count, err := as.backfill(ctx, since) if err != nil { @@ -96,7 +134,7 @@ func (as *Alerter) startBackfill(ctx context.Context) { log.Debug().Int("count", count).Str("in", time.Now().Sub(now).String()).Int("len", as.scorer.Score().Len()).Msg("backfill finished") } -func (as *Alerter) score(ctx context.Context, now time.Time) { +func (as *alerter) score(ctx context.Context, now time.Time) { as.Lock() defer as.Unlock() @@ -105,7 +143,7 @@ func (as *Alerter) score(ctx context.Context, now time.Time) { sort.Sort(as.scores) } -func (as *Alerter) backfill(ctx context.Context, since time.Time) (count int, err error) { +func (as *alerter) backfill(ctx context.Context, since time.Time) (count int, err error) { db := database.FromCtx(ctx) const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2` @@ -135,14 +173,21 @@ func (as *Alerter) backfill(ctx context.Context, since time.Time) (count int, er return count, nil } -func (as *Alerter) SinkType() string { +func (as *alerter) SinkType() string { return "alerting" } -func (as *Alerter) Call(ctx context.Context, call *cl.Call) error { +func (as *alerter) Call(ctx context.Context, call *cl.Call) error { as.Lock() defer as.Unlock() as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) return nil } + +func (*alerter) Enabled() bool { return true } + +func (*noopAlerter) SinkType() string { return "noopAlerter" } +func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil } +func (*noopAlerter) Go(_ context.Context) {} +func (*noopAlerter) Enabled() bool { return false } diff --git a/pkg/gordio/alerting/stats.go b/pkg/gordio/alerting/stats.go index 9e1fafa..164b565 100644 --- a/pkg/gordio/alerting/stats.go +++ b/pkg/gordio/alerting/stats.go @@ -19,6 +19,10 @@ import ( //go:embed stats.html var statsTemplateFile string +type stats interface { + PrivateRoutes(chi.Router) +} + var ( funcMap = template.FuncMap{ "f": func(v float64) string { @@ -28,11 +32,13 @@ var ( statTmpl = template.Must(template.New("stats").Funcs(funcMap).Parse(statsTemplateFile)) ) -func (as *Alerter) PrivateRoutes(r chi.Router) { +func (as *alerter) PrivateRoutes(r chi.Router) { r.Get("/tgstats", as.tgStats) } -func (as *Alerter) tgStats(w http.ResponseWriter, r *http.Request) { +func (as *noopAlerter) PrivateRoutes(r chi.Router) {} + +func (as *alerter) tgStats(w http.ResponseWriter, r *http.Request) { ctx := r.Context() db := database.FromCtx(ctx) diff --git a/pkg/gordio/config/config.go b/pkg/gordio/config/config.go index 03a9988..31f9899 100644 --- a/pkg/gordio/config/config.go +++ b/pkg/gordio/config/config.go @@ -14,6 +14,7 @@ type Config struct { DB DB `yaml:"db"` CORS CORS `yaml:"cors"` Auth Auth `yaml:"auth"` + Alerting Alerting `yaml:"alerting"` Log []Logger `yaml:"log"` Listen string `yaml:"listen"` Public bool `yaml:"public"` @@ -49,6 +50,13 @@ type RateLimit struct { verifyError sync.Once } +type Alerting struct { + Enable bool `yaml:"enable"` + LookbackDays uint `yaml:"lookbackDays"` + HalfLife time.Duration `yaml:"halfLife"` + Recent time.Duration `yaml:"recent"` +} + func (rl *RateLimit) Verify() bool { if rl.Enable { if rl.Requests > 0 && rl.Over > 0 { diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index bd64082..27b725b 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -30,7 +30,7 @@ type Server struct { sinks sinks.Sinks nex *nexus.Nexus logger *Logger - alerter *alerting.Alerter + alerter alerting.Alerter hup chan os.Signal } @@ -56,12 +56,16 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { r: r, nex: nexus.New(), logger: logger, - alerter: alerting.New(), + alerter: alerting.New(cfg.Alerting), } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false) - srv.sinks.Register("alerting", sinks.NewAlerterSink(srv.alerter), false) + + if srv.alerter.Enabled() { + srv.sinks.Register("alerting", srv.alerter, false) + } + srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) r.Use(middleware.RequestID) diff --git a/pkg/gordio/sinks/alerter.go b/pkg/gordio/sinks/alerter.go deleted file mode 100644 index 1b63bd8..0000000 --- a/pkg/gordio/sinks/alerter.go +++ /dev/null @@ -1,9 +0,0 @@ -package sinks - -import ( - "dynatron.me/x/stillbox/pkg/gordio/alerting" -) - -func NewAlerterSink(a *alerting.Alerter) *alerting.Alerter { - return a -}