This commit is contained in:
Daniel 2024-11-03 08:09:49 -05:00
parent 8d32757334
commit 1f8fe24dd2
12 changed files with 120 additions and 104 deletions

View file

@ -9,8 +9,8 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/version"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/cmd/admin"
"dynatron.me/x/stillbox/pkg/cmd/serve"
"dynatron.me/x/stillbox/pkg/config"
"github.com/spf13/cobra"

View file

@ -11,11 +11,12 @@ import (
"text/template"
"time"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks"
talkgroups "dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
@ -46,14 +47,14 @@ type alerter struct {
sync.RWMutex
clock timeseries.Clock
cfg config.Alerting
scorer trending.Scorer[cl.Talkgroup]
scores trending.Scores[cl.Talkgroup]
scorer trending.Scorer[talkgroups.ID]
scores trending.Scores[talkgroups.ID]
lastScore time.Time
sim *Simulation
alertCache map[cl.Talkgroup]Alert
alertCache map[talkgroups.ID]Alert
renotify time.Duration
notifier notify.Notifier
tgCache cl.TalkgroupCache
tgCache talkgroups.Store
}
type offsetClock time.Duration
@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption {
}
// New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter {
func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter {
if !cfg.Enable {
return &noopAlerter{}
}
as := &alerter{
cfg: cfg,
alertCache: make(map[cl.Talkgroup]Alert),
alertCache: make(map[talkgroups.ID]Alert),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
tgCache: tgCache,
@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al
as.scorer = trending.NewScorer(
trending.WithTimeSeries(as.newTimeSeries),
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)),
trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold),
trending.WithCountThreshold[cl.Talkgroup](CountThreshold),
trending.WithClock[cl.Talkgroup](as.clock),
trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)),
trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold),
trending.WithCountThreshold[talkgroups.ID](CountThreshold),
trending.WithClock[talkgroups.ID](as.clock),
)
return as
@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
}
// scoredTGs gets a list of TGs.
func (as *alerter) scoredTGs() []cl.Talkgroup {
tgs := make([]cl.Talkgroup, 0, len(as.scores))
func (as *alerter) scoredTGs() []talkgroups.ID {
tgs := make([]talkgroups.ID, 0, len(as.scores))
for _, s := range as.scores {
tgs = append(tgs, s.ID)
}
@ -275,7 +276,7 @@ type Alert struct {
ID uuid.UUID
Timestamp time.Time
TGName string
Score trending.Score[cl.Talkgroup]
Score trending.Score[talkgroups.ID]
OrigScore float64
Weight float32
Suppressed bool
@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
// makeAlert creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) {
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
d := Alert{
ID: uuid.New(),
Score: score,
@ -369,7 +370,7 @@ func (as *alerter) cleanCache() {
}
}
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim
defer as.Unlock()
for rows.Next() {
var tg cl.Talkgroup
var tg talkgroups.ID
var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err
@ -440,7 +441,7 @@ func (as *alerter) SinkType() string {
return "alerting"
}
func (as *alerter) Call(ctx context.Context, call *cl.Call) error {
func (as *alerter) Call(ctx context.Context, call *calls.Call) error {
as.Lock()
defer as.Unlock()
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
@ -454,6 +455,6 @@ func (*alerter) Enabled() bool { return true }
type noopAlerter struct{}
func (*noopAlerter) SinkType() string { return "noopAlerter" }
func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil }
func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil }
func (*noopAlerter) Go(_ context.Context) {}
func (*noopAlerter) Enabled() bool { return false }

View file

@ -12,8 +12,8 @@ import (
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/rs/zerolog/log"
)
@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) {
}
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) {
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) {
now := time.Now()
tgc := cl.NewTalkgroupCache()
tgc := talkgroups.NewCache()
s.Enable = true
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)

View file

@ -7,9 +7,9 @@ import (
"net/http"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime"
@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
return
}
tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs))
tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs))
for _, t := range tgs {
tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
tgMap[talkgroups.ID{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
}
renderData := struct {
TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[calls.Talkgroup]
TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[talkgroups.ID]
LastScore time.Time
Simulation *Simulation
Config config.Alerting

View file

@ -130,7 +130,6 @@ func (c *Call) computeLength() (err error) {
return nil
}
func (c *Call) TalkgroupTuple() talkgroups.Talkgroup {
func (c *Call) TalkgroupTuple() talkgroups.ID {
return talkgroups.TG(c.System, c.Talkgroup)
}

View file

@ -10,13 +10,13 @@ import (
)
type TalkgroupFilter struct {
Talkgroups []tgs.Talkgroup `json:"talkgroups,omitempty"`
TalkgroupsNot []tgs.Talkgroup `json:"talkgroupsNot,omitempty"`
Talkgroups []tgs.ID `json:"talkgroups,omitempty"`
TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"`
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
talkgroups map[tgs.Talkgroup]bool
talkgroups map[tgs.ID]bool
}
func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) {
@ -27,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
}
if l := len(p.Talkgroups); l > 0 {
tgf.Talkgroups = make([]tgs.Talkgroup, l)
tgf.Talkgroups = make([]tgs.ID, l)
for i, t := range p.Talkgroups {
tgf.Talkgroups[i] = tgs.Talkgroup{
tgf.Talkgroups[i] = tgs.ID{
System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup),
}
@ -37,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
}
if l := len(p.TalkgroupsNot); l > 0 {
tgf.TalkgroupsNot = make([]tgs.Talkgroup, l)
tgf.TalkgroupsNot = make([]tgs.ID, l)
for i, t := range p.TalkgroupsNot {
tgf.TalkgroupsNot[i] = tgs.Talkgroup{
tgf.TalkgroupsNot[i] = tgs.ID{
System: uint32(t.System),
Talkgroup: uint32(t.Talkgroup),
}
@ -53,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool {
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
}
func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.Talkgroup]bool {
func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool {
return f.talkgroups
}
func (f *TalkgroupFilter) compile(ctx context.Context) error {
f.talkgroups = make(map[tgs.Talkgroup]bool)
f.talkgroups = make(map[tgs.ID]bool)
for _, tg := range f.Talkgroups {
f.talkgroups[tg] = true
}
@ -71,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error {
}
for _, tg := range tagTGs {
f.talkgroups[tgs.Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
}
}

View file

@ -6,7 +6,6 @@ import (
"os"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/alerting"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config"
@ -15,6 +14,7 @@ import (
"dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks"
"dynatron.me/x/stillbox/pkg/sources"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
@ -35,7 +35,7 @@ type Server struct {
alerter alerting.Alerter
notifier notify.Notifier
hup chan os.Signal
tgCache calls.TalkgroupCache
tgs talkgroups.Store
}
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
return nil, err
}
tgCache := calls.NewTalkgroupCache()
tgCache := talkgroups.NewCache()
srv := &Server{
auth: authenticator,
@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
logger: logger,
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
notifier: notifier,
tgCache: tgCache,
tgs: tgCache,
}
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)

View file

@ -7,8 +7,8 @@ import (
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/calls"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
)

View file

@ -8,14 +8,14 @@ import (
"dynatron.me/x/stillbox/internal/trending"
)
type AlertConfig map[Talkgroup][]AlertRule
type AlertConfig map[ID][]AlertRule
type AlertRule struct {
Times []ruletime.RuleTime `json:"times"`
ScoreMultiplier float32 `json:"mult"`
}
func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error {
if len(confBytes) == 0 {
return nil
}
@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
return nil
}
func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
s, has := ac[score.ID]
if !has {
return score.Score

View file

@ -18,7 +18,7 @@ func TestAlertConfig(t *testing.T) {
ac := make(talkgroups.AlertConfig)
parseTests := []struct {
name string
tg talkgroups.Talkgroup
tg talkgroups.ID
conf string
compare []talkgroups.AlertRule
expectErr error
@ -78,7 +78,7 @@ func TestAlertConfig(t *testing.T) {
evalTests := []struct {
name string
tg talkgroups.Talkgroup
tg talkgroups.ID
t time.Time
origScore float64
expectScore float64
@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) {
for _, tc := range evalTests {
t.Run(tc.name, func(t *testing.T) {
cs := trending.Score[talkgroups.Talkgroup]{
cs := trending.Score[talkgroups.ID]{
ID: tc.tg,
Score: tc.origScore,
}

View file

@ -2,7 +2,6 @@ package talkgroups
import (
"context"
"fmt"
"sync"
"time"
@ -15,49 +14,29 @@ import (
"github.com/rs/zerolog/log"
)
type Talkgroup struct {
System uint32
Talkgroup uint32
}
type tgMap map[ID]database.GetTalkgroupWithLearnedByPackedIDsRow
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
return Talkgroup{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
type Store interface {
// TG retrieves a Talkgroup from the Store. It returns the record and whether one was found.
TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
func (t Talkgroup) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t Talkgroup) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
type TalkgroupCache interface {
TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64
Hint(ctx context.Context, tgs []Talkgroup) error
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
// Load loads the provided packed talkgroup IDs into the Store.
Load(ctx context.Context, tgs []int64) error
// Invalidate invalidates any caching in the Store.
Invalidate()
}
func (t *talkgroupCache) Invalidate() {
func (t *cache) Invalidate() {
t.Lock()
defer t.Unlock()
clear(t.tgs)
@ -65,15 +44,16 @@ func (t *talkgroupCache) Invalidate() {
clear(t.AlertConfig)
}
type talkgroupCache struct {
type cache struct {
sync.RWMutex
AlertConfig
tgs tgMap
systems map[int32]string
}
func NewTalkgroupCache() TalkgroupCache {
tgc := &talkgroupCache{
// NewCache returns a new cache Store.
func NewCache() Store {
tgc := &cache{
tgs: make(tgMap),
systems: make(map[int32]string),
AlertConfig: make(AlertConfig),
@ -82,7 +62,7 @@ func NewTalkgroupCache() TalkgroupCache {
return tgc
}
func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
t.RLock()
var toLoad []int64
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
@ -109,7 +89,7 @@ func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
return nil
}
func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
tg := TG(rec.SystemID, rec.Tgid)
t.tgs[tg] = rec
t.systems[rec.SystemID] = rec.SystemName
@ -117,7 +97,7 @@ func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow)
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
}
func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
func (t *cache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
if err != nil {
return err
@ -137,7 +117,7 @@ func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
return nil
}
func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
func (t *cache) TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
@ -171,7 +151,7 @@ func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalk
return recs[0], true
}
func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) {
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
n, has := t.systems[int32(id)]
if !has {

View file

@ -0,0 +1,36 @@
package talkgroups
import (
"fmt"
)
type ID struct {
System uint32
Talkgroup uint32
}
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID {
return ID{
System: uint32(sys),
Talkgroup: uint32(tgid),
}
}
func (t ID) Pack() int64 {
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
}
func (t ID) String() string {
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
}
func PackedTGs(tg []ID) []int64 {
s := make([]int64, len(tg))
for i, v := range tg {
s[i] = v.Pack()
}
return s
}