diff --git a/internal/trending/trending.go b/internal/trending/trending.go index a7278c6..e25c2c6 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -1,6 +1,7 @@ package trending import ( + "fmt" "sort" "time" @@ -175,6 +176,7 @@ func (s *Scorer[K]) AddEvent(id K, time time.Time) { } func (s *Scorer[K]) addToItem(item *item[K], time time.Time) { + fmt.Println("add", time.String()) item.eventSeries.IncreaseAtTime(1, time) } diff --git a/pkg/gordio/database/calls.sql.go b/pkg/gordio/database/calls.sql.go index 8c0301a..9089bd9 100644 --- a/pkg/gordio/database/calls.sql.go +++ b/pkg/gordio/database/calls.sql.go @@ -7,9 +7,9 @@ package database import ( "context" - "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" ) const addCall = `-- name: AddCall :one @@ -36,22 +36,22 @@ RETURNING id ` type AddCallParams struct { - Submitter *int32 `json:"submitter"` - System int `json:"system"` - Talkgroup int `json:"talkgroup"` - CallDate time.Time `json:"call_date"` - AudioName *string `json:"audio_name"` - AudioBlob []byte `json:"audio_blob"` - AudioType *string `json:"audio_type"` - AudioUrl *string `json:"audio_url"` - Duration *int32 `json:"duration"` - Frequency int `json:"frequency"` - Frequencies []int `json:"frequencies"` - Patches []int `json:"patches"` - TgLabel *string `json:"tg_label"` - TgAlphaTag *string `json:"tg_alpha_tag"` - TgGroup *string `json:"tg_group"` - Source int `json:"source"` + Submitter *int32 `json:"submitter"` + System int `json:"system"` + Talkgroup int `json:"talkgroup"` + CallDate pgtype.Timestamptz `json:"call_date"` + AudioName *string `json:"audio_name"` + AudioBlob []byte `json:"audio_blob"` + AudioType *string `json:"audio_type"` + AudioUrl *string `json:"audio_url"` + Duration *int32 `json:"duration"` + Frequency int `json:"frequency"` + Frequencies []int `json:"frequencies"` + Patches []int `json:"patches"` + TgLabel *string `json:"tg_label"` + TgAlphaTag *string `json:"tg_alpha_tag"` + TgGroup *string `json:"tg_group"` + Source int `json:"source"` } func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, error) { diff --git a/pkg/gordio/database/database.go b/pkg/gordio/database/database.go index 8aee6fd..d2590c1 100644 --- a/pkg/gordio/database/database.go +++ b/pkg/gordio/database/database.go @@ -40,7 +40,12 @@ func NewClient(ctx context.Context, conf config.DB) (*DB, error) { m.Close() - pool, err := pgxpool.New(ctx, conf.Connect) + pgConf, err := pgxpool.ParseConfig(conf.Connect) + if err != nil { + return nil, err + } + + pool, err := pgxpool.NewWithConfig(ctx, pgConf) if err != nil { return nil, err } diff --git a/pkg/gordio/database/models.go b/pkg/gordio/database/models.go index 6683e3a..25df383 100644 --- a/pkg/gordio/database/models.go +++ b/pkg/gordio/database/models.go @@ -21,24 +21,24 @@ type ApiKey struct { } type Call struct { - ID uuid.UUID `json:"id"` - Submitter *int32 `json:"submitter"` - System int `json:"system"` - Talkgroup int `json:"talkgroup"` - CallDate time.Time `json:"call_date"` - AudioName *string `json:"audio_name"` - AudioBlob []byte `json:"audio_blob"` - Duration *int32 `json:"duration"` - AudioType *string `json:"audio_type"` - AudioUrl *string `json:"audio_url"` - Frequency int `json:"frequency"` - Frequencies []int `json:"frequencies"` - Patches []int `json:"patches"` - TgLabel *string `json:"tg_label"` - TgAlphaTag *string `json:"tg_alpha_tag"` - TgGroup *string `json:"tg_group"` - Source int `json:"source"` - Transcript *string `json:"transcript"` + ID uuid.UUID `json:"id"` + Submitter *int32 `json:"submitter"` + System int `json:"system"` + Talkgroup int `json:"talkgroup"` + CallDate pgtype.Timestamptz `json:"call_date"` + AudioName *string `json:"audio_name"` + AudioBlob []byte `json:"audio_blob"` + Duration *int32 `json:"duration"` + AudioType *string `json:"audio_type"` + AudioUrl *string `json:"audio_url"` + Frequency int `json:"frequency"` + Frequencies []int `json:"frequencies"` + Patches []int `json:"patches"` + TgLabel *string `json:"tg_label"` + TgAlphaTag *string `json:"tg_alpha_tag"` + TgGroup *string `json:"tg_group"` + Source int `json:"source"` + Transcript *string `json:"transcript"` } type Incident struct { diff --git a/pkg/gordio/sinks/alerting/alerting.go b/pkg/gordio/sinks/alerting/alerting.go index 73a1527..a29bf09 100644 --- a/pkg/gordio/sinks/alerting/alerting.go +++ b/pkg/gordio/sinks/alerting/alerting.go @@ -16,9 +16,9 @@ import ( ) const ( - StorageLookbackDays = 4 + StorageLookbackDays = 7 HalfLife = 30 * time.Minute - RecentDuration = 12*time.Hour + RecentDuration = 2*time.Hour ScoreThreshold = -1 CountThreshold = 1 ) @@ -57,7 +57,7 @@ func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ {Granularity: time.Second, Count: 60}, - {Granularity: time.Minute, Count: 60}, + {Granularity: time.Minute, Count: 10}, {Granularity: time.Hour, Count: 24}, {Granularity: time.Hour * 24, Count: StorageLookbackDays}, }, @@ -67,9 +67,8 @@ func newTimeSeries(id cl.Talkgroup) trending.TimeSeries { func (as *AlertSink) startBackfill(ctx context.Context) { now := time.Now() - cl := &myClock{-18*time.Hour} - timeseries.DefaultClock = cl - since := now.Add(StorageLookbackDays * -24 * 3 * time.Hour) + cl := &myClock{-24*StorageLookbackDays*time.Hour} + since := now.Add(StorageLookbackDays * -24 * time.Hour) log.Debug().Time("since", since).Msg("starting stats backfill") count, err := as.backfill(ctx, since) if err != nil { @@ -77,8 +76,9 @@ func (as *AlertSink) startBackfill(ctx context.Context) { return } log.Debug().Int("count", count).Str("in", time.Now().Sub(now).String()).Int("len", as.scorer.Score().Len()).Msg("backfill finished") + timeseries.DefaultClock = cl for { - fmt.Printf("offs: %s\n", cl.offset.String()) + fmt.Printf("offs: %s (%s)\n", cl.offset.String(), cl.Now().String()) as.printScores(ctx) cl.offset += time.Minute*5 if cl.offset == time.Minute*5 { @@ -92,11 +92,11 @@ func (as *AlertSink) printScores(ctx context.Context) { as.Lock() defer as.Unlock() scores := as.scorer.Score() - fmt.Printf("score len is %d\n", scores.Len()) + //fmt.Printf("score len is %d\n", scores.Len()) //const scoreMult = 1000000000 const scoreMult = 1 for _, s := range scores { - if s.ID.Talkgroup != 1616 && s.ID.Talkgroup != 1617 { + if s.ID.Talkgroup != 1185 { continue } tg, _ := db.GetTalkgroup(ctx, int(s.ID.System), int(s.ID.Talkgroup)) @@ -111,7 +111,7 @@ func (as *AlertSink) printScores(ctx context.Context) { func (as *AlertSink) backfill(ctx context.Context, since time.Time) (count int, err error) { db := database.FromCtx(ctx) - const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2` + const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 AND talkgroup = 1185 ` rows, err := db.Query(ctx, backfillStatsQuery, since, timeseries.DefaultClock.Now()) if err != nil { diff --git a/pkg/gordio/sinks/database.go b/pkg/gordio/sinks/database.go index ca29298..a2ff46d 100644 --- a/pkg/gordio/sinks/database.go +++ b/pkg/gordio/sinks/database.go @@ -8,6 +8,7 @@ import ( "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/database" + "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog/log" ) @@ -24,6 +25,7 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { log.Debug().Str("call", call.String()).Msg("received dontStore call") return nil } + dbCall, err := s.db.AddCall(ctx, s.toAddCallParams(call)) if err != nil { return fmt.Errorf("add call: %w", err) @@ -43,7 +45,7 @@ func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams Submitter: call.Submitter.Int32Ptr(), System: call.System, Talkgroup: call.Talkgroup, - CallDate: call.DateTime, + CallDate: pgtype.Timestamptz{Time: call.DateTime, Valid: true}, AudioName: common.PtrOrNull(call.AudioName), AudioBlob: call.Audio, AudioType: common.PtrOrNull(call.AudioType), diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index 1fb4ef2..6fb7874 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -91,7 +91,7 @@ CREATE TABLE IF NOT EXISTS calls( submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, system INTEGER NOT NULL, talkgroup INTEGER NOT NULL, - call_date TIMESTAMP NOT NULL, + call_date TIMESTAMPTZ NOT NULL, audio_name TEXT, audio_blob BYTEA, duration INTEGER, @@ -107,7 +107,6 @@ CREATE TABLE IF NOT EXISTS calls( transcript TEXT ); - CREATE OR REPLACE TRIGGER learn_tg AFTER INSERT ON calls FOR EACH ROW EXECUTE FUNCTION learn_talkgroup();