Compare commits

...

4 commits

Author SHA1 Message Date
d917e13c4f Embed 2024-11-03 09:45:51 -05:00
5338bb0071 Use sqlc.embed 2024-11-03 08:44:34 -05:00
882d9fbefa Reorg 2024-11-03 08:17:28 -05:00
c3233f1bc8 Split out talkgroups 2024-11-03 07:58:41 -05:00
18 changed files with 529 additions and 453 deletions

View file

@ -9,8 +9,8 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/version"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/cmd/admin"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/config"
"github.com/spf13/cobra"

View file

@ -11,11 +11,12 @@ import (
"text/template"
"time"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks"
talkgroups "dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
@ -46,14 +47,14 @@ type alerter struct {
sync.RWMutex
clock timeseries.Clock
cfg config.Alerting
scorer trending.Scorer[cl.Talkgroup]
scores trending.Scores[cl.Talkgroup]
scorer trending.Scorer[talkgroups.ID]
scores trending.Scores[talkgroups.ID]
lastScore time.Time
sim *Simulation
alertCache map[cl.Talkgroup]Alert
alertCache map[talkgroups.ID]Alert
renotify time.Duration
notifier notify.Notifier
tgCache cl.TalkgroupCache
tgCache talkgroups.Store
}
type offsetClock time.Duration
@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption {
}
// New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter {
func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter {
if !cfg.Enable {
return &noopAlerter{}
}
as := &alerter{
cfg: cfg,
alertCache: make(map[cl.Talkgroup]Alert),
alertCache: make(map[talkgroups.ID]Alert),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
tgCache: tgCache,
@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al
as.scorer = trending.NewScorer(
trending.WithTimeSeries(as.newTimeSeries),
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)),
trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold),
trending.WithCountThreshold[cl.Talkgroup](CountThreshold),
trending.WithClock[cl.Talkgroup](as.clock),
trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)),
trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold),
trending.WithCountThreshold[talkgroups.ID](CountThreshold),
trending.WithClock[talkgroups.ID](as.clock),
)
return as
@ -167,12 +168,12 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al
var notifications []Alert
for _, s := range as.scores {
origScore := s.Score
tgr, has := as.tgCache.TG(ctx, s.ID)
if has {
if !tgr.Alert {
tgr, err := as.tgCache.TG(ctx, s.ID)
if err == nil {
if !tgr.Talkgroup.Alert {
continue
}
s.Score *= float64(tgr.Weight)
s.Score *= float64(tgr.Talkgroup.Weight)
}
if s.Score > as.cfg.AlertThreshold || testMode {
@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
}
// scoredTGs gets a list of TGs.
func (as *alerter) scoredTGs() []cl.Talkgroup {
tgs := make([]cl.Talkgroup, 0, len(as.scores))
func (as *alerter) scoredTGs() []talkgroups.ID {
tgs := make([]talkgroups.ID, 0, len(as.scores))
for _, s := range as.scores {
tgs = append(tgs, s.ID)
}
@ -275,7 +276,7 @@ type Alert struct {
ID uuid.UUID
Timestamp time.Time
TGName string
Score trending.Score[cl.Talkgroup]
Score trending.Score[talkgroups.ID]
OrigScore float64
Weight float32
Suppressed bool
@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
// makeAlert creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) {
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
d := Alert{
ID: uuid.New(),
Score: score,
@ -326,20 +327,20 @@ func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgr
OrigScore: origScore,
}
tgRecord, has := as.tgCache.TG(ctx, score.ID)
switch has {
case true:
d.Weight = tgRecord.Weight
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(score.ID.System))
tgRecord, err := as.tgCache.TG(ctx, score.ID)
switch err {
case nil:
d.Weight = tgRecord.Talkgroup.Weight
if tgRecord.System.Name == "" {
tgRecord.System.Name = strconv.Itoa(int(score.ID.System))
}
if tgRecord.Name != nil {
d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.SystemName, *tgRecord.Name, score.ID.Talkgroup)
if tgRecord.Talkgroup.Name != nil {
d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(score.ID.Talkgroup))
d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup))
}
case false:
default:
system, has := as.tgCache.SystemName(ctx, int(score.ID.System))
if has {
d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup))
@ -369,7 +370,7 @@ func (as *alerter) cleanCache() {
}
}
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim
defer as.Unlock()
for rows.Next() {
var tg cl.Talkgroup
var tg talkgroups.ID
var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err
@ -440,7 +441,7 @@ func (as *alerter) SinkType() string {
return "alerting"
}
func (as *alerter) Call(ctx context.Context, call *cl.Call) error {
func (as *alerter) Call(ctx context.Context, call *calls.Call) error {
as.Lock()
defer as.Unlock()
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
@ -453,7 +454,7 @@ func (*alerter) Enabled() bool { return true }
// noopAlerter is used when alerting is disabled.
type noopAlerter struct{}
func (*noopAlerter) SinkType() string { return "noopAlerter" }
func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil }
func (*noopAlerter) Go(_ context.Context) {}
func (*noopAlerter) Enabled() bool { return false }
func (*noopAlerter) SinkType() string { return "noopAlerter" }
func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil }
func (*noopAlerter) Go(_ context.Context) {}
func (*noopAlerter) Enabled() bool { return false }

View file

@ -12,8 +12,8 @@ import (
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/rs/zerolog/log"
)
@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) {
}
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) {
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) {
now := time.Now()
tgc := cl.NewTalkgroupCache()
tgc := talkgroups.NewCache()
s.Enable = true
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)

View file

@ -7,9 +7,9 @@ import (
"net/http"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime"
@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
return
}
tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs))
tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs))
for _, t := range tgs {
tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t
}
renderData := struct {
TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[calls.Talkgroup]
TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[talkgroups.ID]
LastScore time.Time
Simulation *Simulation
Config config.Alerting

View file

@ -7,6 +7,7 @@ import (
"dynatron.me/x/stillbox/internal/audio"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
"google.golang.org/protobuf/types/known/timestamppb"
)
@ -128,3 +129,7 @@ func (c *Call) computeLength() (err error) {
return nil
}
func (c *Call) TalkgroupTuple() talkgroups.ID {
return talkgroups.TG(c.System, c.Talkgroup)
}

View file

@ -5,16 +5,18 @@ import (
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/pb"
tgs "dynatron.me/x/stillbox/pkg/talkgroups"
)
type TalkgroupFilter struct {
Talkgroups []Talkgroup `json:"talkgroups,omitempty"`
TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"`
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
Talkgroups []tgs.ID `json:"talkgroups,omitempty"`
TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"`
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
talkgroups map[Talkgroup]bool
talkgroups map[tgs.ID]bool
}
func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) {
@ -25,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
}
if l := len(p.Talkgroups); l > 0 {
tgf.Talkgroups = make([]Talkgroup, l)
tgf.Talkgroups = make([]tgs.ID, l)
for i, t := range p.Talkgroups {
tgf.Talkgroups[i] = Talkgroup{
tgf.Talkgroups[i] = tgs.ID{
System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup),
}
@ -35,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
}
if l := len(p.TalkgroupsNot); l > 0 {
tgf.TalkgroupsNot = make([]Talkgroup, l)
tgf.TalkgroupsNot = make([]tgs.ID, l)
for i, t := range p.TalkgroupsNot {
tgf.TalkgroupsNot[i] = Talkgroup{
tgf.TalkgroupsNot[i] = tgs.ID{
System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup),
}
@ -51,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool {
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
}
func (f *TalkgroupFilter) GetFinalTalkgroups() map[Talkgroup]bool {
func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool {
return f.talkgroups
}
func (f *TalkgroupFilter) compile(ctx context.Context) error {
f.talkgroups = make(map[Talkgroup]bool)
f.talkgroups = make(map[tgs.ID]bool)
for _, tg := range f.Talkgroups {
f.talkgroups[tg] = true
}
@ -69,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error {
}
for _, tg := range tagTGs {
f.talkgroups[Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
}
}

View file

@ -1,191 +0,0 @@
package calls
import (
"context"
"fmt"
"sync"
"time"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type Talkgroup struct {
System uint32
Talkgroup uint32
}
func (c *Call) TalkgroupTuple() Talkgroup {
return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)}
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
return Talkgroup{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t Talkgroup) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t Talkgroup) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
type TalkgroupCache interface {
TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
SystemName(ctx context.Context, id int) (string, bool)
ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64
Hint(ctx context.Context, tgs []Talkgroup) error
Load(ctx context.Context, tgs []int64) error
Invalidate()
}
func (t *talkgroupCache) Invalidate() {
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
clear(t.AlertConfig)
}
type talkgroupCache struct {
sync.RWMutex
AlertConfig
tgs tgMap
systems map[int32]string
}
func NewTalkgroupCache() TalkgroupCache {
tgc := &talkgroupCache{
tgs: make(tgMap),
systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
}
return tgc
}
func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
}
func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
tg := TG(rec.SystemID, rec.Tgid)
t.tgs[tg] = rec
t.systems[rec.SystemID] = rec.SystemName
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
}
func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
if err != nil {
return err
}
t.Lock()
defer t.Unlock()
for _, rec := range tgRecords {
err := t.add(rec)
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
}
return nil
}
func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
if has {
return rec, has
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
default:
log.Error().Err(err).Msg("TG() cache add db get")
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
}
if len(recs) < 1 {
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
}
t.Lock()
defer t.Unlock()
err = t.add(recs[0])
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
return recs[0], false
}
return recs[0], true
}
func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) {
n, has := t.systems[int32(id)]
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
return sys, true
}
return n, has
}

View file

@ -22,14 +22,14 @@ type Querier interface {
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetDatabaseSize(ctx context.Context) (string, error)
GetSystemName(ctx context.Context, systemID int) (string, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error)
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)
GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error)
GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error)
GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error)
GetUserByID(ctx context.Context, id int32) (User, error)
GetUserByUID(ctx context.Context, id int32) (User, error)
GetUserByUsername(ctx context.Context, username string) (User, error)

View file

@ -31,26 +31,30 @@ func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, erro
}
const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE id = systg2id($1, $2)
`
func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) {
type GetTalkgroupRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (GetTalkgroupRow, error) {
row := q.db.QueryRow(ctx, getTalkgroup, systemID, tgid)
var i Talkgroup
var i GetTalkgroupRow
err := row.Scan(
&i.ID,
&i.SystemID,
&i.Tgid,
&i.Name,
&i.AlphaTag,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
)
return i, err
}
@ -101,19 +105,17 @@ func (q *Queries) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]stri
const getTalkgroupWithLearned = `-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id($1, $2)
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -121,39 +123,29 @@ WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE
`
type GetTalkgroupWithLearnedRow struct {
ID int64 `json:"id"`
SystemID int32 `json:"system_id"`
SystemName string `json:"system_name"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) {
row := q.db.QueryRow(ctx, getTalkgroupWithLearned, systemID, tgid)
var i GetTalkgroupWithLearnedRow
err := row.Scan(
&i.ID,
&i.SystemID,
&i.SystemName,
&i.Tgid,
&i.Name,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Alert,
&i.Weight,
&i.AlertConfig,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
)
return i, err
@ -161,19 +153,17 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -181,20 +171,9 @@ WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRU
`
type GetTalkgroupWithLearnedByPackedIDsRow struct {
ID int64 `json:"id"`
SystemID int32 `json:"system_id"`
SystemName string `json:"system_name"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
AlphaTag *string `json:"alpha_tag"`
Alert bool `json:"alert"`
Weight float32 `json:"weight"`
AlertConfig []byte `json:"alert_config"`
Learned bool `json:"learned"`
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) {
@ -207,19 +186,20 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
for rows.Next() {
var i GetTalkgroupWithLearnedByPackedIDsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.SystemName,
&i.Tgid,
&i.Name,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.AlphaTag,
&i.Alert,
&i.Weight,
&i.AlertConfig,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
); err != nil {
return nil, err
@ -233,26 +213,14 @@ func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar
}
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, sys.id, sys.name FROM talkgroups tg
SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
`
type GetTalkgroupsByPackedIDsRow struct {
ID int64 `json:"id"`
SystemID int32 `json:"system_id"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Alert bool `json:"alert"`
AlertConfig []byte `json:"alert_config"`
Weight float32 `json:"weight"`
ID_2 int `json:"id_2"`
Name_2 string `json:"name_2"`
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
}
func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) {
@ -265,20 +233,20 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
for rows.Next() {
var i GetTalkgroupsByPackedIDsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.Tgid,
&i.Name,
&i.AlphaTag,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.ID_2,
&i.Name_2,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
); err != nil {
return nil, err
}
@ -291,32 +259,36 @@ func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64
}
const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE tags && ARRAY[$1]
`
func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error) {
type GetTalkgroupsWithAllTagsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithAllTags, tags)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Talkgroup
var items []GetTalkgroupsWithAllTagsRow
for rows.Next() {
var i Talkgroup
var i GetTalkgroupsWithAllTagsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.Tgid,
&i.Name,
&i.AlphaTag,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
); err != nil {
return nil, err
}
@ -329,32 +301,36 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
}
const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight FROM talkgroups
SELECT talkgroups.id, talkgroups.system_id, talkgroups.tgid, talkgroups.name, talkgroups.alpha_tag, talkgroups.tg_group, talkgroups.frequency, talkgroups.metadata, talkgroups.tags, talkgroups.alert, talkgroups.alert_config, talkgroups.weight FROM talkgroups
WHERE tags @> ARRAY[$1]
`
func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error) {
type GetTalkgroupsWithAnyTagsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
}
func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithAnyTags, tags)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Talkgroup
var items []GetTalkgroupsWithAnyTagsRow
for rows.Next() {
var i Talkgroup
var i GetTalkgroupsWithAnyTagsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.Tgid,
&i.Name,
&i.AlphaTag,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
); err != nil {
return nil, err
}

View file

@ -0,0 +1,49 @@
package database
import (
"testing"
"github.com/stretchr/testify/require"
)
const getTalkgroupWithLearnedByPackedIDsTest = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE
`
const getTalkgroupWithLearnedTest = `-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id($1, $2)
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE
`
func TestQueryColumnsMatch(t *testing.T) {
require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs)
require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned)
}

View file

@ -5,10 +5,9 @@ import (
"encoding/json"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/types/known/structpb"
)
@ -61,19 +60,18 @@ func (c *client) SendError(cmd *pb.Command, err error) {
}
func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error {
db := database.FromCtx(ctx)
tgi, err := db.GetTalkgroupWithLearned(ctx, int(tg.System), int(tg.Talkgroup))
tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup))
if err != nil {
if err != pgx.ErrNoRows {
if err != talkgroups.ErrNoTG {
log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail")
}
return err
}
var md *structpb.Struct
if len(tgi.Metadata) > 0 {
if len(tgi.Talkgroup.Metadata) > 0 {
m := make(map[string]interface{})
err := json.Unmarshal(tgi.Metadata, &m)
err := json.Unmarshal(tgi.Talkgroup.Metadata, &m)
if err != nil {
log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("unmarshal tg metadata")
}
@ -85,14 +83,14 @@ func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error {
resp := &pb.TalkgroupInfo{
Tg: tg,
Name: tgi.Name,
Group: tgi.TgGroup,
Frequency: tgi.Frequency,
Name: tgi.Talkgroup.Name,
Group: tgi.Talkgroup.TgGroup,
Frequency: tgi.Talkgroup.Frequency,
Metadata: md,
Tags: tgi.Tags,
Tags: tgi.Talkgroup.Tags,
Learned: tgi.Learned,
AlphaTag: tgi.AlphaTag,
SystemName: tgi.SystemName,
AlphaTag: tgi.Talkgroup.AlphaTag,
SystemName: tgi.System.Name,
}
_ = c.Send(&pb.Message{

View file

@ -6,7 +6,6 @@ import (
"os"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/alerting"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config"
@ -15,6 +14,7 @@ import (
"dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks"
"dynatron.me/x/stillbox/pkg/sources"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
@ -35,7 +35,7 @@ type Server struct {
alerter alerting.Alerter
notifier notify.Notifier
hup chan os.Signal
tgCache calls.TalkgroupCache
tgs talkgroups.Store
}
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
return nil, err
}
tgCache := calls.NewTalkgroupCache()
tgCache := talkgroups.NewCache()
srv := &Server{
auth: authenticator,
@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
logger: logger,
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
notifier: notifier,
tgCache: tgCache,
tgs: tgCache,
}
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
@ -104,6 +104,7 @@ func (s *Server) Go(ctx context.Context) error {
s.installHupHandler()
ctx = database.CtxWithDB(ctx, s.db)
ctx = talkgroups.CtxWithStore(ctx, s.tgs)
httpSrv := &http.Server{
Addr: s.conf.Listen,

View file

@ -7,8 +7,8 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/calls"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
)

View file

@ -1,4 +1,4 @@
package calls
package talkgroups
import (
"encoding/json"
@ -8,14 +8,14 @@ import (
"dynatron.me/x/stillbox/internal/trending"
)
type AlertConfig map[Talkgroup][]AlertRule
type AlertConfig map[ID][]AlertRule
type AlertRule struct {
Times []ruletime.RuleTime `json:"times"`
ScoreMultiplier float32 `json:"mult"`
}
func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error {
if len(confBytes) == 0 {
return nil
}
@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
return nil
}
func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
s, has := ac[score.ID]
if !has {
return score.Score

View file

@ -1,4 +1,4 @@
package calls_test
package talkgroups_test
import (
"errors"
@ -8,26 +8,26 @@ import (
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAlertConfig(t *testing.T) {
ac := make(calls.AlertConfig)
ac := make(talkgroups.AlertConfig)
parseTests := []struct {
name string
tg calls.Talkgroup
tg talkgroups.ID
conf string
compare []calls.AlertRule
compare []talkgroups.AlertRule
expectErr error
}{
{
name: "base case",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
conf: `[{"times":["7:00+2h","01:00+1h","16:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m","16:03+20m"],"mult":2.0}]`,
compare: []calls.AlertRule{
compare: []talkgroups.AlertRule{
{
Times: []ruletime.RuleTime{
ruletime.Must(ruletime.New("7:00+2h")),
@ -49,7 +49,7 @@ func TestAlertConfig(t *testing.T) {
},
{
name: "bad spec",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
conf: `[{"times":["26:00+2h","01:00+1h","19:00+4h"],"mult":0.2},{"times":["11:00+1h","15:00+30m"],"mult":2.0}]`,
expectErr: errors.New("'26:00+2h': invalid hours"),
},
@ -78,42 +78,42 @@ func TestAlertConfig(t *testing.T) {
evalTests := []struct {
name string
tg calls.Talkgroup
tg talkgroups.ID
t time.Time
origScore float64
expectScore float64
}{
{
name: "base eval",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
t: tMust("1:20"),
origScore: 3,
expectScore: 0.6,
},
{
name: "base eval",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
t: tMust("23:03"),
origScore: 3,
expectScore: 3,
},
{
name: "base eval",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
t: tMust("8:03"),
origScore: 1.0,
expectScore: 0.2,
},
{
name: "base eval",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
t: tMust("15:15"),
origScore: 3.0,
expectScore: 6.0,
},
{
name: "overlapping eval",
tg: calls.TG(197, 3),
tg: talkgroups.TG(197, 3),
t: tMust("16:10"),
origScore: 1.0,
expectScore: 0.4,
@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) {
for _, tc := range evalTests {
t.Run(tc.name, func(t *testing.T) {
cs := trending.Score[calls.Talkgroup]{
cs := trending.Score[talkgroups.ID]{
ID: tc.tg,
Score: tc.origScore,
}

195
pkg/talkgroups/cache.go Normal file
View file

@ -0,0 +1,195 @@
package talkgroups
import (
"context"
"errors"
"sync"
"time"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type tgMap map[ID]Talkgroup
type Store interface {
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (Talkgroup, error)
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
// Load loads the provided packed talkgroup IDs into the Store.
Load(ctx context.Context, tgs []int64) error
// Invalidate invalidates any caching in the Store.
Invalidate()
}
type CtxStoreKeyT string
const CtxStoreKey CtxStoreKeyT = "store"
func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, CtxStoreKey, s)
}
func StoreFrom(ctx context.Context) Store {
s, ok := ctx.Value(CtxStoreKey).(Store)
if !ok {
return NewCache()
}
return s
}
func (t *cache) Invalidate() {
t.Lock()
defer t.Unlock()
clear(t.tgs)
clear(t.systems)
clear(t.AlertConfig)
}
type cache struct {
sync.RWMutex
AlertConfig
tgs tgMap
systems map[int32]string
}
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
tgs: make(tgMap),
systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
}
return tgc
}
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
for _, tg := range tgs {
_, ok := t.tgs[tg]
if !ok {
toLoad = append(toLoad, tg.Pack())
}
}
} else {
toLoad = make([]int64, 0, len(tgs))
for _, g := range tgs {
toLoad = append(toLoad, g.Pack())
}
}
if len(toLoad) > 0 {
t.RUnlock()
return t.Load(ctx, toLoad)
}
t.RUnlock()
return nil
}
func (t *cache) add(rec Talkgroup) error {
tg := TG(rec.System.ID, int(rec.Talkgroup.Tgid))
t.tgs[tg] = rec
t.systems[int32(rec.System.ID)] = rec.System.Name
return t.AlertConfig.AddAlertConfig(tg, rec.Talkgroup.AlertConfig)
}
func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup {
return Talkgroup{
Talkgroup: r.Talkgroup,
System: r.System,
Learned: r.Learned,
}
}
func (t *cache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
if err != nil {
return err
}
t.Lock()
defer t.Unlock()
for _, rec := range tgRecords {
err := t.add(rowToTalkgroup(rec))
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
}
return nil
}
var ErrNoTG = errors.New("talkgroup not found")
func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
if has {
return rec, nil
}
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err {
case nil:
case pgx.ErrNoRows:
return Talkgroup{}, ErrNoTG
default:
log.Error().Err(err).Msg("TG() cache add db get")
return Talkgroup{}, errors.Join(ErrNoTG, err)
}
if len(recs) < 1 {
return Talkgroup{}, ErrNoTG
}
t.Lock()
defer t.Unlock()
err = t.add(rowToTalkgroup(recs[0]))
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
return rowToTalkgroup(recs[0]), errors.Join(ErrNoTG, err)
}
return rowToTalkgroup(recs[0]), nil
}
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
n, has := t.systems[int32(id)]
if !has {
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
if err != nil {
return "", false
}
return sys, true
}
return n, has
}

View file

@ -0,0 +1,44 @@
package talkgroups
import (
"fmt"
"dynatron.me/x/stillbox/pkg/database"
)
type Talkgroup struct {
database.Talkgroup
System database.System `json:"system"`
Learned bool `json:"learned"`
}
type ID struct {
System uint32
Talkgroup uint32
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID {
return ID{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t ID) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t ID) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []ID) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}

View file

@ -1,9 +1,9 @@
-- name: GetTalkgroupsWithAnyTags :many
SELECT * FROM talkgroups
SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE tags @> ARRAY[$1];
-- name: GetTalkgroupsWithAllTags :many
SELECT * FROM talkgroups
SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE tags && ARRAY[$1];
-- name: GetTalkgroupIDsByTags :many
@ -25,29 +25,27 @@ UPDATE talkgroups SET tags = $2
WHERE id = ANY($1);
-- name: GetTalkgroup :one
SELECT * FROM talkgroups
SELECT sqlc.embed(talkgroups) FROM talkgroups
WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid));
-- name: GetTalkgroupsByPackedIDs :many
SELECT * FROM talkgroups tg
SELECT sqlc.embed(tg), sqlc.embed(sys) FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]);
-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
sqlc.embed(tg), sqlc.embed(sys),
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid))
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
@ -55,19 +53,17 @@ WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND igno
-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alpha_tag,
tg.alert, tg.weight, tg.alert_config,
sqlc.embed(tg), sqlc.embed(sys),
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, sys.name system_name, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, tgl.alpha_tag,
TRUE, 1.0, NULL::JSONB,
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id