From ccb72deb8832ba78bb9f6a7a8446f24c9958171d Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 17 Feb 2025 14:38:13 -0500 Subject: [PATCH] Move backfill query into callstore. Closes #111. --- pkg/alerting/alerting.go | 31 ++++++--------------------- pkg/calls/callstore/store.go | 41 ++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index a5ad0aa..72b39e8 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -11,6 +11,7 @@ import ( "dynatron.me/x/stillbox/pkg/alerting/alert" "dynatron.me/x/stillbox/pkg/calls" + "dynatron.me/x/stillbox/pkg/calls/callstore" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/notify" @@ -342,36 +343,16 @@ func (as *alerter) score(now time.Time) { } func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Time) (count int, err error) { - db := database.FromCtx(ctx) - const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC` - - rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until) - if err != nil { - return count, err - } - defer rows.Close() - as.Lock() defer as.Unlock() - for rows.Next() { - var tg talkgroups.ID - var callDate time.Time - if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { - return count, err - } - as.scorer.AddEvent(tg, callDate) - if as.sim != nil { // step the simulator if it is active - as.sim.stepClock(callDate) - } - count++ + cs := callstore.FromCtx(ctx) + var stepClock func(time.Time) + if as.sim != nil { + stepClock = as.sim.stepClock } - if err := rows.Err(); err != nil { - return count, err - } - - return count, nil + return cs.BackfillTrending(ctx, &as.scorer, stepClock, since, until) } func (as *alerter) SinkType() string { diff --git a/pkg/calls/callstore/store.go b/pkg/calls/callstore/store.go index d446b52..5c09dbd 100644 --- a/pkg/calls/callstore/store.go +++ b/pkg/calls/callstore/store.go @@ -7,12 +7,14 @@ import ( "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/jsontypes" + "dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/rbac" "dynatron.me/x/stillbox/pkg/rbac/entities" "dynatron.me/x/stillbox/pkg/services" + "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/pkg/talkgroups/tgstore" "dynatron.me/x/stillbox/pkg/users" @@ -39,6 +41,9 @@ type Store interface { // CallStats gets call stats by interval. CallStats(ctx context.Context, interval calls.StatsInterval, start, end jsontypes.Time) (*calls.Stats, error) + + // BackfillTrending backfills call statistics into a trending scorer. + BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error) } type postgresStore struct { @@ -313,3 +318,39 @@ func (s *postgresStore) CallStats(ctx context.Context, interval calls.StatsInter return cs, nil } + +func (s *postgresStore) BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error) { + // We can do this through stats grants + _, err = rbac.Check(ctx, &calls.Stats{}, rbac.WithActions(entities.ActionRead)) + if err != nil { + return 0, err + } + + db := database.FromCtx(ctx) + const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC` + + rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until) + if err != nil { + return count, err + } + defer rows.Close() + + for rows.Next() { + var tg talkgroups.ID + var callDate time.Time + if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { + return count, err + } + scorer.AddEvent(tg, callDate) + if stepClock != nil { // step the simulator if it is active + stepClock(callDate) + } + count++ + } + + if err := rows.Err(); err != nil { + return count, err + } + + return count, nil +}