Big reorg #23

Merged
amigan merged 4 commits from rest into trunk 2024-11-03 13:46:12 -05:00
18 changed files with 529 additions and 453 deletions

View file

@ -9,8 +9,8 @@ import (
"dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/version" "dynatron.me/x/stillbox/internal/version"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/cmd/admin" "dynatron.me/x/stillbox/pkg/cmd/admin"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"github.com/spf13/cobra" "github.com/spf13/cobra"

View file

@ -11,11 +11,12 @@ import (
"text/template" "text/template"
"time" "time"
cl "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks" "dynatron.me/x/stillbox/pkg/sinks"
talkgroups "dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
@ -46,14 +47,14 @@ type alerter struct {
sync.RWMutex sync.RWMutex
clock timeseries.Clock clock timeseries.Clock
cfg config.Alerting cfg config.Alerting
scorer trending.Scorer[cl.Talkgroup] scorer trending.Scorer[talkgroups.ID]
scores trending.Scores[cl.Talkgroup] scores trending.Scores[talkgroups.ID]
lastScore time.Time lastScore time.Time
sim *Simulation sim *Simulation
alertCache map[cl.Talkgroup]Alert alertCache map[talkgroups.ID]Alert
renotify time.Duration renotify time.Duration
notifier notify.Notifier notifier notify.Notifier
tgCache cl.TalkgroupCache tgCache talkgroups.Store
} }
type offsetClock time.Duration type offsetClock time.Duration
@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption {
} }
// New creates a new Alerter using the provided configuration. // New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter { func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter {
if !cfg.Enable { if !cfg.Enable {
return &noopAlerter{} return &noopAlerter{}
} }
as := &alerter{ as := &alerter{
cfg: cfg, cfg: cfg,
alertCache: make(map[cl.Talkgroup]Alert), alertCache: make(map[talkgroups.ID]Alert),
clock: timeseries.DefaultClock, clock: timeseries.DefaultClock,
renotify: DefaultRenotify, renotify: DefaultRenotify,
tgCache: tgCache, tgCache: tgCache,
@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al
as.scorer = trending.NewScorer( as.scorer = trending.NewScorer(
trending.WithTimeSeries(as.newTimeSeries), trending.WithTimeSeries(as.newTimeSeries),
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)), trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)), trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)),
trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)), trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold), trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold),
trending.WithCountThreshold[cl.Talkgroup](CountThreshold), trending.WithCountThreshold[talkgroups.ID](CountThreshold),
trending.WithClock[cl.Talkgroup](as.clock), trending.WithClock[talkgroups.ID](as.clock),
) )
return as return as
@ -167,12 +168,12 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al
var notifications []Alert var notifications []Alert
for _, s := range as.scores { for _, s := range as.scores {
origScore := s.Score origScore := s.Score
tgr, has := as.tgCache.TG(ctx, s.ID) tgr, err := as.tgCache.TG(ctx, s.ID)
if has { if err == nil {
if !tgr.Alert { if !tgr.Talkgroup.Alert {
continue continue
} }
s.Score *= float64(tgr.Weight) s.Score *= float64(tgr.Talkgroup.Weight)
} }
if s.Score > as.cfg.AlertThreshold || testMode { if s.Score > as.cfg.AlertThreshold || testMode {
@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
} }
// scoredTGs gets a list of TGs. // scoredTGs gets a list of TGs.
func (as *alerter) scoredTGs() []cl.Talkgroup { func (as *alerter) scoredTGs() []talkgroups.ID {
tgs := make([]cl.Talkgroup, 0, len(as.scores)) tgs := make([]talkgroups.ID, 0, len(as.scores))
for _, s := range as.scores { for _, s := range as.scores {
tgs = append(tgs, s.ID) tgs = append(tgs, s.ID)
} }
@ -275,7 +276,7 @@ type Alert struct {
ID uuid.UUID ID uuid.UUID
Timestamp time.Time Timestamp time.Time
TGName string TGName string
Score trending.Score[cl.Talkgroup] Score trending.Score[talkgroups.ID]
OrigScore float64 OrigScore float64
Weight float32 Weight float32
Suppressed bool Suppressed bool
@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
// makeAlert creates a notification for later rendering by the template. // makeAlert creates a notification for later rendering by the template.
// It takes a talkgroup Score as input. // It takes a talkgroup Score as input.
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) { func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
d := Alert{ d := Alert{
ID: uuid.New(), ID: uuid.New(),
Score: score, Score: score,
@ -326,20 +327,20 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgr
OrigScore: origScore, OrigScore: origScore,
} }
tgRecord, has := as.tgCache.TG(ctx, score.ID) tgRecord, err := as.tgCache.TG(ctx, score.ID)
switch has { switch err {
case true: case nil:
d.Weight = tgRecord.Weight d.Weight = tgRecord.Talkgroup.Weight
if tgRecord.SystemName == "" { if tgRecord.System.Name == "" {
tgRecord.SystemName = strconv.Itoa(int(score.ID.System)) tgRecord.System.Name = strconv.Itoa(int(score.ID.System))
} }
if tgRecord.Name != nil { if tgRecord.Talkgroup.Name != nil {
d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup) d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup)
} else { } else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup)) d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup))
} }
case false: default:
system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) system, has := as.tgCache.SystemName(ctx, int(score.ID.System))
if has { if has {
d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup))
@ -369,7 +370,7 @@ func (as *alerter) cleanCache() {
} }
} }
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{ []timeseries.Granularity{
{Granularity: time.Second, Count: 60}, {Granularity: time.Second, Count: 60},
@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim
defer as.Unlock() defer as.Unlock()
for rows.Next() { for rows.Next() {
var tg cl.Talkgroup var tg talkgroups.ID
var callDate time.Time var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil { if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err return count, err
@ -440,7 +441,7 @@ func (as *alerter) SinkType() string {
return "alerting" return "alerting"
} }
func (as *alerter) Call(ctx context.Context, call *cl.Call) error { func (as *alerter) Call(ctx context.Context, call *calls.Call) error {
as.Lock() as.Lock()
defer as.Unlock() defer as.Unlock()
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime) as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
@ -453,7 +454,7 @@ func (*alerter) Enabled() bool { return true }
// noopAlerter is used when alerting is disabled. // noopAlerter is used when alerting is disabled.
type noopAlerter struct{} type noopAlerter struct{}
func (*noopAlerter) SinkType() string { return "noopAlerter" } func (*noopAlerter) SinkType() string { return "noopAlerter" }
func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil } func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil }
func (*noopAlerter) Go(_ context.Context) {} func (*noopAlerter) Go(_ context.Context) {}
func (*noopAlerter) Enabled() bool { return false } func (*noopAlerter) Enabled() bool { return false }

View file

@ -12,8 +12,8 @@ import (
"dynatron.me/x/stillbox/internal/forms" "dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) {
} }
// Simulate begins the simulation using the DB handle from ctx. It returns final scores. // Simulate begins the simulation using the DB handle from ctx. It returns final scores.
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) { func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) {
now := time.Now() now := time.Now()
tgc := cl.NewTalkgroupCache() tgc := talkgroups.NewCache()
s.Enable = true s.Enable = true
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter) s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)

View file

@ -7,9 +7,9 @@ import (
"net/http" "net/http"
"time" "time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/jsontime"
@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs)) tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs))
for _, t := range tgs { for _, t := range tgs {
tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t
} }
renderData := struct { renderData := struct {
TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[calls.Talkgroup] Scores trending.Scores[talkgroups.ID]
LastScore time.Time LastScore time.Time
Simulation *Simulation Simulation *Simulation
Config config.Alerting Config config.Alerting

View file

@ -7,6 +7,7 @@ import (
"dynatron.me/x/stillbox/internal/audio" "dynatron.me/x/stillbox/internal/audio"
"dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
) )
@ -128,3 +129,7 @@ func (c *Call) computeLength() (err error) {
return nil return nil
} }
func (c *Call) TalkgroupTuple() talkgroups.ID {
return talkgroups.TG(c.System, c.Talkgroup)
}

View file

@ -5,16 +5,18 @@ import (
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
tgs "dynatron.me/x/stillbox/pkg/talkgroups"
) )
type TalkgroupFilter struct { type TalkgroupFilter struct {
Talkgroups []Talkgroup `json:"talkgroups,omitempty"` Talkgroups []tgs.ID `json:"talkgroups,omitempty"`
TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"` TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"`
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
talkgroups map[Talkgroup]bool talkgroups map[tgs.ID]bool
} }
func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) { func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) {
@ -25,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
} }
if l := len(p.Talkgroups); l > 0 { if l := len(p.Talkgroups); l > 0 {
tgf.Talkgroups = make([]Talkgroup, l) tgf.Talkgroups = make([]tgs.ID, l)
for i, t := range p.Talkgroups { for i, t := range p.Talkgroups {
tgf.Talkgroups[i] = Talkgroup{ tgf.Talkgroups[i] = tgs.ID{
System: uint32(t.System), System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup), Talkgroup: uint32(t.Talkgroup),
} }
@ -35,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
} }
if l := len(p.TalkgroupsNot); l > 0 { if l := len(p.TalkgroupsNot); l > 0 {
tgf.TalkgroupsNot = make([]Talkgroup, l) tgf.TalkgroupsNot = make([]tgs.ID, l)
for i, t := range p.TalkgroupsNot { for i, t := range p.TalkgroupsNot {
tgf.TalkgroupsNot[i] = Talkgroup{ tgf.TalkgroupsNot[i] = tgs.ID{
System: uint32(t.System), System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup), Talkgroup: uint32(t.Talkgroup),
} }
@ -51,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool {
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0 return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
} }
func (f *TalkgroupFilter) GetFinalTalkgroups() map[Talkgroup]bool { func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool {
return f.talkgroups return f.talkgroups
} }
func (f *TalkgroupFilter) compile(ctx context.Context) error { func (f *TalkgroupFilter) compile(ctx context.Context) error {
f.talkgroups = make(map[Talkgroup]bool) f.talkgroups = make(map[tgs.ID]bool)
for _, tg := range f.Talkgroups { for _, tg := range f.Talkgroups {
f.talkgroups[tg] = true f.talkgroups[tg] = true
} }
@ -69,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error {
} }
for _, tg := range tagTGs { for _, tg := range tagTGs {
f.talkgroups[Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
} }
} }

View file

@ -1,191 +0,0 @@
package calls
import (
"context"
"fmt"
"sync"
"time"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type Talkgroup struct {
System uint32
Talkgroup uint32
}
func (c *Call) TalkgroupTuple() Talkgroup {
return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)}
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
return Talkgroup{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t Talkgroup) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t Talkgroup) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
type TalkgroupCache interface {
TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
SystemName(ctx context.Context, id int) (string, bool)
ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64
Hint(ctx context.Context, tgs []Talkgroup) error
Load(ctx context.Context, tgs []int64) error
Invalidate()
}
func (t *talkgroupCache) Invalidate() {
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
clear(t.AlertConfig)
}
type talkgroupCache struct {
sync.RWMutex
AlertConfig
tgs tgMap
systems map[int32]string
}
func NewTalkgroupCache() TalkgroupCache {
tgc := &talkgroupCache{
tgs: make(tgMap),
systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
}
return tgc
}
func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
}
func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
tg := TG(rec.SystemID, rec.Tgid)
t.tgs[tg] = rec
t.systems[rec.SystemID] = rec.SystemName
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
}
func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
if err != nil {
return err
}
t.Lock()
defer t.Unlock()
for _, rec := range tgRecords {
err := t.add(rec)
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
}
return nil
}
func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
if has {
return rec, has
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
default:
log.Error().Err(err).Msg("TG() cache add db get")
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
}
if len(recs) < 1 {
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
}
t.Lock()
defer t.Unlock()
err = t.add(recs[0])
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
return recs[0], false
}
return recs[0], true
}
func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) {
n, has := t.systems[int32(id)]
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
return sys, true
}
return n, has
}

View file

@ -22,14 +22,14 @@ type Querier interface {
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetDatabaseSize(ctx context.Context) (string, error) GetDatabaseSize(ctx context.Context) (string, error)
GetSystemName(ctx context.Context, systemID int) (string, error) GetSystemName(ctx context.Context, systemID int) (string, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error)
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)
GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error)
GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error)
GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error)
GetUserByID(ctx context.Context, id int32) (User, error) GetUserByID(ctx context.Context, id int32) (User, error)
GetUserByUID(ctx context.Context, id int32) (User, error) GetUserByUID(ctx context.Context, id int32) (User, error)
GetUserByUsername(ctx context.Context, username string) (User, error) GetUserByUsername(ctx context.Context, username string) (User, error)

View file

@ -31,26 +31,30 @@ func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, erro
} }
const getTalkgroup = `-- name: GetTalkgroup :one const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE id = systg2id($1, $2) WHERE id = systg2id($1, $2)
` `
func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) { type GetTalkgroupRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) {
row := q.db.QueryRow(ctx, getTalkgroup, systemID, tgid) row := q.db.QueryRow(ctx, getTalkgroup, systemID, tgid)
var i Talkgroup var i GetTalkgroupRow
err := row.Scan( err := row.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.Tgid, &i.Talkgroup.Tgid,
&i.Name, &i.Talkgroup.Name,
&i.AlphaTag, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.Alert, &i.Talkgroup.Alert,
&i.AlertConfig, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
) )
return i, err return i, err
} }
@ -101,19 +105,17 @@ func (q *Queries) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]stri
const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one
SELECT SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
FALSE learned FALSE learned
FROM talkgroups tg FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id($1, $2) WHERE tg.id = systg2id($1, $2)
UNION UNION
SELECT SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, 1.0, NULL::JSONB, TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
@ -121,39 +123,29 @@ WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE
` `
type GetTalkgroupWithLearnedRow struct { type GetTalkgroupWithLearnedRow struct {
ID int64 `json:"id"` Talkgroup Talkgroup `json:"talkgroup"`
SystemID int32 `json:"system_id"` System System `json:"system"`
SystemName string `json:"system_name"` Learned bool `json:"learned"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
} }
func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) { func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) {
row := q.db.QueryRow(ctx, getTalkgroupWithLearned, systemID, tgid) row := q.db.QueryRow(ctx, getTalkgroupWithLearned, systemID, tgid)
var i GetTalkgroupWithLearnedRow var i GetTalkgroupWithLearnedRow
err := row.Scan( err := row.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.SystemName, &i.Talkgroup.Tgid,
&i.Tgid, &i.Talkgroup.Name,
&i.Name, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.AlphaTag, &i.Talkgroup.Alert,
&i.Alert, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
&i.AlertConfig, &i.System.ID,
&i.System.Name,
&i.Learned, &i.Learned,
) )
return i, err return i, err
@ -161,19 +153,17 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
FALSE learned FALSE learned
FROM talkgroups tg FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]) WHERE tg.id = ANY($1::INT8[])
UNION UNION
SELECT SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, 1.0, NULL::JSONB, TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
@ -181,20 +171,9 @@ WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRU
` `
type GetTalkgroupWithLearnedByPackedIDsRow struct { type GetTalkgroupWithLearnedByPackedIDsRow struct {
ID int64 `json:"id"` Talkgroup Talkgroup `json:"talkgroup"`
SystemID int32 `json:"system_id"` System System `json:"system"`
SystemName string `json:"system_name"` Learned bool `json:"learned"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
} }
func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) { func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) {
@ -207,19 +186,20 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
for rows.Next() { for rows.Next() {
var i GetTalkgroupWithLearnedByPackedIDsRow var i GetTalkgroupWithLearnedByPackedIDsRow
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.SystemName, &i.Talkgroup.Tgid,
&i.Tgid, &i.Talkgroup.Name,
&i.Name, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.AlphaTag, &i.Talkgroup.Alert,
&i.Alert, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
&i.AlertConfig, &i.System.ID,
&i.System.Name,
&i.Learned, &i.Learned,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -233,26 +213,14 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
} }
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, sys.id, sys.name FROM talkgroups tg SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]) WHERE tg.id = ANY($1::INT8[])
` `
type GetTalkgroupsByPackedIDsRow struct { type GetTalkgroupsByPackedIDsRow struct {
ID int64 `json:"id"` Talkgroup Talkgroup `json:"talkgroup"`
SystemID int32 `json:"system_id"` System System `json:"system"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Alert bool `json:"alert"`
AlertConfig []byte `json:"alert_config"`
Weight float32 `json:"weight"`
ID_2 int `json:"id_2"`
Name_2 string `json:"name_2"`
} }
func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) { func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) {
@ -265,20 +233,20 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
for rows.Next() { for rows.Next() {
var i GetTalkgroupsByPackedIDsRow var i GetTalkgroupsByPackedIDsRow
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.Tgid, &i.Talkgroup.Tgid,
&i.Name, &i.Talkgroup.Name,
&i.AlphaTag, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.Alert, &i.Talkgroup.Alert,
&i.AlertConfig, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
&i.ID_2, &i.System.ID,
&i.Name_2, &i.System.Name,
); err != nil { ); err != nil {
return nil, err return nil, err
} }
@ -291,32 +259,36 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
} }
const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE tags && ARRAY[$1] WHERE tags && ARRAY[$1]
` `
func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) { type GetTalkgroupsWithAllTagsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithAllTags, tags) rows, err := q.db.Query(ctx, getTalkgroupsWithAllTags, tags)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var items []Talkgroup var items []GetTalkgroupsWithAllTagsRow
for rows.Next() { for rows.Next() {
var i Talkgroup var i GetTalkgroupsWithAllTagsRow
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.Tgid, &i.Talkgroup.Tgid,
&i.Name, &i.Talkgroup.Name,
&i.AlphaTag, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.Alert, &i.Talkgroup.Alert,
&i.AlertConfig, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
); err != nil { ); err != nil {
return nil, err return nil, err
} }
@ -329,32 +301,36 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
} }
const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE tags @> ARRAY[$1] WHERE tags @> ARRAY[$1]
` `
func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) { type GetTalkgroupsWithAnyTagsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithAnyTags, tags) rows, err := q.db.Query(ctx, getTalkgroupsWithAnyTags, tags)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var items []Talkgroup var items []GetTalkgroupsWithAnyTagsRow
for rows.Next() { for rows.Next() {
var i Talkgroup var i GetTalkgroupsWithAnyTagsRow
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.Talkgroup.ID,
&i.SystemID, &i.Talkgroup.SystemID,
&i.Tgid, &i.Talkgroup.Tgid,
&i.Name, &i.Talkgroup.Name,
&i.AlphaTag, &i.Talkgroup.AlphaTag,
&i.TgGroup, &i.Talkgroup.TgGroup,
&i.Frequency, &i.Talkgroup.Frequency,
&i.Metadata, &i.Talkgroup.Metadata,
&i.Tags, &i.Talkgroup.Tags,
&i.Alert, &i.Talkgroup.Alert,
&i.AlertConfig, &i.Talkgroup.AlertConfig,
&i.Weight, &i.Talkgroup.Weight,
); err != nil { ); err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,49 @@
package database
import (
"testing"
"github.com/stretchr/testify/require"
)
const getTalkgroupWithLearnedByPackedIDsTest = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE
`
const getTalkgroupWithLearnedTest = `-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id($1, $2)
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE
`
func TestQueryColumnsMatch(t *testing.T) {
require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs)
require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned)
}

View file

@ -5,10 +5,9 @@ import (
"encoding/json" "encoding/json"
"dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/structpb"
) )
@ -61,19 +60,18 @@ func (c *client) SendError(cmd *pb.Command, err error) {
} }
func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error { func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error {
db := database.FromCtx(ctx) tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup))
tgi, err := db.GetTalkgroupWithLearned(ctx, int(tg.System), int(tg.Talkgroup))
if err != nil { if err != nil {
if err != pgx.ErrNoRows { if err != talkgroups.ErrNoTG {
log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail") log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail")
} }
return err return err
} }
var md *structpb.Struct var md *structpb.Struct
if len(tgi.Metadata) > 0 { if len(tgi.Talkgroup.Metadata) > 0 {
m := make(map[string]interface{}) m := make(map[string]interface{})
err := json.Unmarshal(tgi.Metadata, &m) err := json.Unmarshal(tgi.Talkgroup.Metadata, &m)
if err != nil { if err != nil {
log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("unmarshal tg metadata") log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("unmarshal tg metadata")
} }
@ -85,14 +83,14 @@ func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error {
resp := &pb.TalkgroupInfo{ resp := &pb.TalkgroupInfo{
Tg: tg, Tg: tg,
Name: tgi.Name, Name: tgi.Talkgroup.Name,
Group: tgi.TgGroup, Group: tgi.Talkgroup.TgGroup,
Frequency: tgi.Frequency, Frequency: tgi.Talkgroup.Frequency,
Metadata: md, Metadata: md,
Tags: tgi.Tags, Tags: tgi.Talkgroup.Tags,
Learned: tgi.Learned, Learned: tgi.Learned,
AlphaTag: tgi.AlphaTag, AlphaTag: tgi.Talkgroup.AlphaTag,
SystemName: tgi.SystemName, SystemName: tgi.System.Name,
} }
_ = c.Send(&pb.Message{ _ = c.Send(&pb.Message{

View file

@ -6,7 +6,6 @@ import (
"os" "os"
"time" "time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/alerting" "dynatron.me/x/stillbox/pkg/alerting"
"dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
@ -15,6 +14,7 @@ import (
"dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks" "dynatron.me/x/stillbox/pkg/sinks"
"dynatron.me/x/stillbox/pkg/sources" "dynatron.me/x/stillbox/pkg/sources"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors" "github.com/go-chi/cors"
@ -35,7 +35,7 @@ type Server struct {
alerter alerting.Alerter alerter alerting.Alerter
notifier notify.Notifier notifier notify.Notifier
hup chan os.Signal hup chan os.Signal
tgCache calls.TalkgroupCache tgs talkgroups.Store
} }
func New(ctx context.Context, cfg *config.Config) (*Server, error) { func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
return nil, err return nil, err
} }
tgCache := calls.NewTalkgroupCache() tgCache := talkgroups.NewCache()
srv := &Server{ srv := &Server{
auth: authenticator, auth: authenticator,
@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
logger: logger, logger: logger,
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
notifier: notifier, notifier: notifier,
tgCache: tgCache, tgs: tgCache,
} }
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
@ -104,6 +104,7 @@ func (s *Server) Go(ctx context.Context) error {
s.installHupHandler() s.installHupHandler()
ctx = database.CtxWithDB(ctx, s.db) ctx = database.CtxWithDB(ctx, s.db)
ctx = talkgroups.CtxWithStore(ctx, s.tgs)
httpSrv := &http.Server{ httpSrv := &http.Server{
Addr: s.conf.Listen, Addr: s.conf.Listen,

View file

@ -7,8 +7,8 @@ import (
"dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/forms" "dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/calls"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )

View file

@ -1,4 +1,4 @@
package calls package talkgroups
import ( import (
"encoding/json" "encoding/json"
@ -8,14 +8,14 @@ import (
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
) )
type AlertConfig map[Talkgroup][]AlertRule type AlertConfig map[ID][]AlertRule
type AlertRule struct { type AlertRule struct {
Times []ruletime.RuleTime `json:"times"` Times []ruletime.RuleTime `json:"times"`
ScoreMultiplier float32 `json:"mult"` ScoreMultiplier float32 `json:"mult"`
} }
func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error { func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error {
if len(confBytes) == 0 { if len(confBytes) == 0 {
return nil return nil
} }
@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
return nil return nil
} }
func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 { func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
s, has := ac[score.ID] s, has := ac[score.ID]
if !has { if !has {
return score.Score return score.Score

View file

@ -1,4 +1,4 @@
package calls_test package talkgroups_test
import ( import (
"errors" "errors"
@ -8,26 +8,26 @@ import (
"dynatron.me/x/stillbox/internal/ruletime" "dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAlertConfig(t *testing.T) { func TestAlertConfig(t *testing.T) {
ac := make(calls.AlertConfig) ac := make(talkgroups.AlertConfig)
parseTests := []struct { parseTests := []struct {
name string name string
tg calls.Talkgroup tg talkgroups.ID
conf string conf string
compare []calls.AlertRule compare []talkgroups.AlertRule
expectErr error expectErr error
}{ }{
{ {
name: "base case", name: "base case",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`, conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`,
compare: []calls.AlertRule{ compare: []talkgroups.AlertRule{
{ {
Times: []ruletime.RuleTime{ Times: []ruletime.RuleTime{
ruletime.Must(ruletime.New("7:00+2h")), ruletime.Must(ruletime.New("7:00+2h")),
@ -49,7 +49,7 @@ func TestAlertConfig(t *testing.T) {
}, },
{ {
name: "bad spec", name: "bad spec",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`, conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`,
expectErr: errors.New("'26:00+2h': invalid hours"), expectErr: errors.New("'26:00+2h': invalid hours"),
}, },
@ -78,42 +78,42 @@ func TestAlertConfig(t *testing.T) {
evalTests := []struct { evalTests := []struct {
name string name string
tg calls.Talkgroup tg talkgroups.ID
t time.Time t time.Time
origScore float64 origScore float64
expectScore float64 expectScore float64
}{ }{
{ {
name: "base eval", name: "base eval",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
t: tMust("1:20"), t: tMust("1:20"),
origScore: 3, origScore: 3,
expectScore: 0.6, expectScore: 0.6,
}, },
{ {
name: "base eval", name: "base eval",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
t: tMust("23:03"), t: tMust("23:03"),
origScore: 3, origScore: 3,
expectScore: 3, expectScore: 3,
}, },
{ {
name: "base eval", name: "base eval",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
t: tMust("8:03"), t: tMust("8:03"),
origScore: 1.0, origScore: 1.0,
expectScore: 0.2, expectScore: 0.2,
}, },
{ {
name: "base eval", name: "base eval",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
t: tMust("15:15"), t: tMust("15:15"),
origScore: 3.0, origScore: 3.0,
expectScore: 6.0, expectScore: 6.0,
}, },
{ {
name: "overlapping eval", name: "overlapping eval",
tg: calls.TG(197, 3), tg: talkgroups.TG(197, 3),
t: tMust("16:10"), t: tMust("16:10"),
origScore: 1.0, origScore: 1.0,
expectScore: 0.4, expectScore: 0.4,
@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) {
for _, tc := range evalTests { for _, tc := range evalTests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
cs := trending.Score[calls.Talkgroup]{ cs := trending.Score[talkgroups.ID]{
ID: tc.tg, ID: tc.tg,
Score: tc.origScore, Score: tc.origScore,
} }

195
pkg/talkgroups/cache.go Normal file
View file

@ -0,0 +1,195 @@
package talkgroups
import (
"context"
"errors"
"sync"
"time"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type tgMap map[ID]Talkgroup
type Store interface {
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (Talkgroup, error)
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
// Load loads the provided packed talkgroup IDs into the Store.
Load(ctx context.Context, tgs []int64) error
// Invalidate invalidates any caching in the Store.
Invalidate()
}
type CtxStoreKeyT string
const CtxStoreKey CtxStoreKeyT = "store"
func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, CtxStoreKey, s)
}
func StoreFrom(ctx context.Context) Store {
s, ok := ctx.Value(CtxStoreKey).(Store)
if !ok {
return NewCache()
}
return s
}
func (t *cache) Invalidate() {
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
clear(t.AlertConfig)
}
type cache struct {
sync.RWMutex
AlertConfig
tgs tgMap
systems map[int32]string
}
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
tgs: make(tgMap),
systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
}
return tgc
}
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
}
func (t *cache) add(rec Talkgroup) error {
tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid))
t.tgs[tg] = rec
t.systems[int32(rec.System.ID)] = rec.System.Name
return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig)
}
func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup {
return Talkgroup{
Talkgroup: r.Talkgroup,
System: r.System,
Learned: r.Learned,
}
}
func (t *cache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
if err != nil {
return err
}
t.Lock()
defer t.Unlock()
for _, rec := range tgRecords {
err := t.add(rowToTalkgroup(rec))
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
}
return nil
}
var ErrNoTG = errors.New("talkgroup not found")
func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
if has {
return rec, nil
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
return Talkgroup{}, ErrNoTG
default:
log.Error().Err(err).Msg("TG() cache add db get")
return Talkgroup{}, errors.Join(ErrNoTG, err)
}
if len(recs) < 1 {
return Talkgroup{}, ErrNoTG
}
t.Lock()
defer t.Unlock()
err = t.add(rowToTalkgroup(recs[0]))
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
return rowToTalkgroup(recs[0]), errors.Join(ErrNoTG, err)
}
return rowToTalkgroup(recs[0]), nil
}
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
n, has := t.systems[int32(id)]
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
return sys, true
}
return n, has
}

View file

@ -0,0 +1,44 @@
package talkgroups
import (
"fmt"
"dynatron.me/x/stillbox/pkg/database"
)
type Talkgroup struct {
database.Talkgroup
System database.System `json:"system"`
Learned bool `json:"learned"`
}
type ID struct {
System uint32
Talkgroup uint32
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID {
return ID{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t ID) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t ID) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []ID) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}

View file

@ -1,9 +1,9 @@
-- name: GetTalkgroupsWithAnyTags :many -- name: GetTalkgroupsWithAnyTags :many
SELECT * FROM talkgroups SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE tags @> ARRAY[$1]; WHERE tags @> ARRAY[$1];
-- name: GetTalkgroupsWithAllTags :many -- name: GetTalkgroupsWithAllTags :many
SELECT * FROM talkgroups SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE tags && ARRAY[$1]; WHERE tags && ARRAY[$1];
-- name: GetTalkgroupIDsByTags :many -- name: GetTalkgroupIDsByTags :many
@ -25,29 +25,27 @@ UPDATE talkgroups SET tags = $2
WHERE id = ANY($1); WHERE id = ANY($1);
-- name: GetTalkgroup :one -- name: GetTalkgroup :one
SELECT * FROM talkgroups SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)); WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid));
-- name: GetTalkgroupsByPackedIDs :many -- name: GetTalkgroupsByPackedIDs :many
SELECT * FROM talkgroups tg SELECT sqlc.embed(tg), sqlc.embed(sys) FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]); WHERE tg.id = ANY($1::INT8[]);
-- name: GetTalkgroupWithLearned :one -- name: GetTalkgroupWithLearned :one
SELECT SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, sqlc.embed(tg), sqlc.embed(sys),
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
FALSE learned FALSE learned
FROM talkgroups tg FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid)) WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid))
UNION UNION
SELECT SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, 1.0, NULL::JSONB, TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
@ -55,19 +53,17 @@ WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND igno
-- name: GetTalkgroupWithLearnedByPackedIDs :many -- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name, sqlc.embed(tg), sqlc.embed(sys),
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
FALSE learned FALSE learned
FROM talkgroups tg FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]) WHERE tg.id = ANY($1::INT8[])
UNION UNION
SELECT SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name, tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag, CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, 1.0, NULL::JSONB, TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id