Move backfill query into callstore #113

Merged
amigan merged 1 commit from alertingCallStore111 into trunk 2025-02-17 14:39:42 -05:00
2 changed files with 47 additions and 25 deletions

View file

@ -11,6 +11,7 @@ import (
"dynatron.me/x/stillbox/pkg/alerting/alert"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/calls/callstore"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/notify"
@ -342,36 +343,16 @@ func (as *alerter) score(now time.Time) {
}
func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Time) (count int, err error) {
db := database.FromCtx(ctx)
const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC`
rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until)
if err != nil {
return count, err
}
defer rows.Close()
as.Lock()
defer as.Unlock()
for rows.Next() {
var tg talkgroups.ID
var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err
}
as.scorer.AddEvent(tg, callDate)
if as.sim != nil { // step the simulator if it is active
as.sim.stepClock(callDate)
}
count++
cs := callstore.FromCtx(ctx)
var stepClock func(time.Time)
if as.sim != nil {
stepClock = as.sim.stepClock
}
if err := rows.Err(); err != nil {
return count, err
}
return count, nil
return cs.BackfillTrending(ctx, &as.scorer, stepClock, since, until)
}
func (as *alerter) SinkType() string {

View file

@ -7,12 +7,14 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/rbac"
"dynatron.me/x/stillbox/pkg/rbac/entities"
"dynatron.me/x/stillbox/pkg/services"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"dynatron.me/x/stillbox/pkg/users"
@ -39,6 +41,9 @@ type Store interface {
// CallStats gets call stats by interval.
CallStats(ctx context.Context, interval calls.StatsInterval, start, end jsontypes.Time) (*calls.Stats, error)
// BackfillTrending backfills call statistics into a trending scorer.
BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error)
}
type postgresStore struct {
@ -313,3 +318,39 @@ func (s *postgresStore) CallStats(ctx context.Context, interval calls.StatsInter
return cs, nil
}
func (s *postgresStore) BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error) {
// We can do this through stats grants
_, err = rbac.Check(ctx, &calls.Stats{}, rbac.WithActions(entities.ActionRead))
if err != nil {
return 0, err
}
db := database.FromCtx(ctx)
const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC`
rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until)
if err != nil {
return count, err
}
defer rows.Close()
for rows.Next() {
var tg talkgroups.ID
var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err
}
scorer.AddEvent(tg, callDate)
if stepClock != nil { // step the simulator if it is active
stepClock(callDate)
}
count++
}
if err := rows.Err(); err != nil {
return count, err
}
return count, nil
}