Move backfill query into callstore #113
2 changed files with 47 additions and 25 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"dynatron.me/x/stillbox/pkg/alerting/alert"
|
||||
"dynatron.me/x/stillbox/pkg/calls"
|
||||
"dynatron.me/x/stillbox/pkg/calls/callstore"
|
||||
"dynatron.me/x/stillbox/pkg/config"
|
||||
"dynatron.me/x/stillbox/pkg/database"
|
||||
"dynatron.me/x/stillbox/pkg/notify"
|
||||
|
@ -342,36 +343,16 @@ func (as *alerter) score(now time.Time) {
|
|||
}
|
||||
|
||||
func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Time) (count int, err error) {
|
||||
db := database.FromCtx(ctx)
|
||||
const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC`
|
||||
|
||||
rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until)
|
||||
if err != nil {
|
||||
return count, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
as.Lock()
|
||||
defer as.Unlock()
|
||||
|
||||
for rows.Next() {
|
||||
var tg talkgroups.ID
|
||||
var callDate time.Time
|
||||
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
|
||||
return count, err
|
||||
}
|
||||
as.scorer.AddEvent(tg, callDate)
|
||||
if as.sim != nil { // step the simulator if it is active
|
||||
as.sim.stepClock(callDate)
|
||||
}
|
||||
count++
|
||||
cs := callstore.FromCtx(ctx)
|
||||
var stepClock func(time.Time)
|
||||
if as.sim != nil {
|
||||
stepClock = as.sim.stepClock
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
return cs.BackfillTrending(ctx, &as.scorer, stepClock, since, until)
|
||||
}
|
||||
|
||||
func (as *alerter) SinkType() string {
|
||||
|
|
|
@ -7,12 +7,14 @@ import (
|
|||
|
||||
"dynatron.me/x/stillbox/internal/common"
|
||||
"dynatron.me/x/stillbox/internal/jsontypes"
|
||||
"dynatron.me/x/stillbox/internal/trending"
|
||||
|
||||
"dynatron.me/x/stillbox/pkg/calls"
|
||||
"dynatron.me/x/stillbox/pkg/database"
|
||||
"dynatron.me/x/stillbox/pkg/rbac"
|
||||
"dynatron.me/x/stillbox/pkg/rbac/entities"
|
||||
"dynatron.me/x/stillbox/pkg/services"
|
||||
"dynatron.me/x/stillbox/pkg/talkgroups"
|
||||
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
|
||||
"dynatron.me/x/stillbox/pkg/users"
|
||||
|
||||
|
@ -39,6 +41,9 @@ type Store interface {
|
|||
|
||||
// CallStats gets call stats by interval.
|
||||
CallStats(ctx context.Context, interval calls.StatsInterval, start, end jsontypes.Time) (*calls.Stats, error)
|
||||
|
||||
// BackfillTrending backfills call statistics into a trending scorer.
|
||||
BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error)
|
||||
}
|
||||
|
||||
type postgresStore struct {
|
||||
|
@ -313,3 +318,39 @@ func (s *postgresStore) CallStats(ctx context.Context, interval calls.StatsInter
|
|||
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
func (s *postgresStore) BackfillTrending(ctx context.Context, scorer *trending.Scorer[talkgroups.ID], stepClock func(time.Time), since, until time.Time) (count int, err error) {
|
||||
// We can do this through stats grants
|
||||
_, err = rbac.Check(ctx, &calls.Stats{}, rbac.WithActions(entities.ActionRead))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
db := database.FromCtx(ctx)
|
||||
const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC`
|
||||
|
||||
rows, err := db.DB().Query(ctx, backfillStatsQuery, since, until)
|
||||
if err != nil {
|
||||
return count, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var tg talkgroups.ID
|
||||
var callDate time.Time
|
||||
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
|
||||
return count, err
|
||||
}
|
||||
scorer.AddEvent(tg, callDate)
|
||||
if stepClock != nil { // step the simulator if it is active
|
||||
stepClock(callDate)
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return count, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue