Reorg
This commit is contained in:
parent
c3233f1bc8
commit
882d9fbefa
12 changed files with 120 additions and 104 deletions
|
@ -9,8 +9,8 @@ import (
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/common"
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
"dynatron.me/x/stillbox/internal/version"
|
"dynatron.me/x/stillbox/internal/version"
|
||||||
"dynatron.me/x/stillbox/pkg/cmd/serve"
|
|
||||||
"dynatron.me/x/stillbox/pkg/cmd/admin"
|
"dynatron.me/x/stillbox/pkg/cmd/admin"
|
||||||
|
"dynatron.me/x/stillbox/pkg/cmd/serve"
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
|
@ -11,11 +11,12 @@ import (
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cl "dynatron.me/x/stillbox/pkg/calls"
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
"dynatron.me/x/stillbox/pkg/database"
|
"dynatron.me/x/stillbox/pkg/database"
|
||||||
"dynatron.me/x/stillbox/pkg/notify"
|
"dynatron.me/x/stillbox/pkg/notify"
|
||||||
"dynatron.me/x/stillbox/pkg/sinks"
|
"dynatron.me/x/stillbox/pkg/sinks"
|
||||||
|
talkgroups "dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/timeseries"
|
"dynatron.me/x/stillbox/internal/timeseries"
|
||||||
"dynatron.me/x/stillbox/internal/trending"
|
"dynatron.me/x/stillbox/internal/trending"
|
||||||
|
@ -46,14 +47,14 @@ type alerter struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
clock timeseries.Clock
|
clock timeseries.Clock
|
||||||
cfg config.Alerting
|
cfg config.Alerting
|
||||||
scorer trending.Scorer[cl.Talkgroup]
|
scorer trending.Scorer[talkgroups.ID]
|
||||||
scores trending.Scores[cl.Talkgroup]
|
scores trending.Scores[talkgroups.ID]
|
||||||
lastScore time.Time
|
lastScore time.Time
|
||||||
sim *Simulation
|
sim *Simulation
|
||||||
alertCache map[cl.Talkgroup]Alert
|
alertCache map[talkgroups.ID]Alert
|
||||||
renotify time.Duration
|
renotify time.Duration
|
||||||
notifier notify.Notifier
|
notifier notify.Notifier
|
||||||
tgCache cl.TalkgroupCache
|
tgCache talkgroups.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
type offsetClock time.Duration
|
type offsetClock time.Duration
|
||||||
|
@ -88,14 +89,14 @@ func WithNotifier(n notify.Notifier) AlertOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new Alerter using the provided configuration.
|
// New creates a new Alerter using the provided configuration.
|
||||||
func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Alerter {
|
func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter {
|
||||||
if !cfg.Enable {
|
if !cfg.Enable {
|
||||||
return &noopAlerter{}
|
return &noopAlerter{}
|
||||||
}
|
}
|
||||||
|
|
||||||
as := &alerter{
|
as := &alerter{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
alertCache: make(map[cl.Talkgroup]Alert),
|
alertCache: make(map[talkgroups.ID]Alert),
|
||||||
clock: timeseries.DefaultClock,
|
clock: timeseries.DefaultClock,
|
||||||
renotify: DefaultRenotify,
|
renotify: DefaultRenotify,
|
||||||
tgCache: tgCache,
|
tgCache: tgCache,
|
||||||
|
@ -111,12 +112,12 @@ func New(cfg config.Alerting, tgCache cl.TalkgroupCache, opts ...AlertOption) Al
|
||||||
|
|
||||||
as.scorer = trending.NewScorer(
|
as.scorer = trending.NewScorer(
|
||||||
trending.WithTimeSeries(as.newTimeSeries),
|
trending.WithTimeSeries(as.newTimeSeries),
|
||||||
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)),
|
trending.WithStorageDuration[talkgroups.ID](time.Hour*24*time.Duration(cfg.LookbackDays)),
|
||||||
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)),
|
trending.WithRecentDuration[talkgroups.ID](time.Duration(cfg.Recent)),
|
||||||
trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)),
|
trending.WithHalfLife[talkgroups.ID](time.Duration(cfg.HalfLife)),
|
||||||
trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold),
|
trending.WithScoreThreshold[talkgroups.ID](ScoreThreshold),
|
||||||
trending.WithCountThreshold[cl.Talkgroup](CountThreshold),
|
trending.WithCountThreshold[talkgroups.ID](CountThreshold),
|
||||||
trending.WithClock[cl.Talkgroup](as.clock),
|
trending.WithClock[talkgroups.ID](as.clock),
|
||||||
)
|
)
|
||||||
|
|
||||||
return as
|
return as
|
||||||
|
@ -231,8 +232,8 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// scoredTGs gets a list of TGs.
|
// scoredTGs gets a list of TGs.
|
||||||
func (as *alerter) scoredTGs() []cl.Talkgroup {
|
func (as *alerter) scoredTGs() []talkgroups.ID {
|
||||||
tgs := make([]cl.Talkgroup, 0, len(as.scores))
|
tgs := make([]talkgroups.ID, 0, len(as.scores))
|
||||||
for _, s := range as.scores {
|
for _, s := range as.scores {
|
||||||
tgs = append(tgs, s.ID)
|
tgs = append(tgs, s.ID)
|
||||||
}
|
}
|
||||||
|
@ -275,7 +276,7 @@ type Alert struct {
|
||||||
ID uuid.UUID
|
ID uuid.UUID
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
TGName string
|
TGName string
|
||||||
Score trending.Score[cl.Talkgroup]
|
Score trending.Score[talkgroups.ID]
|
||||||
OrigScore float64
|
OrigScore float64
|
||||||
Weight float32
|
Weight float32
|
||||||
Suppressed bool
|
Suppressed bool
|
||||||
|
@ -317,7 +318,7 @@ func (as *alerter) sendNotification(ctx context.Context, n []Alert) error {
|
||||||
|
|
||||||
// makeAlert creates a notification for later rendering by the template.
|
// makeAlert creates a notification for later rendering by the template.
|
||||||
// It takes a talkgroup Score as input.
|
// It takes a talkgroup Score as input.
|
||||||
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[cl.Talkgroup], origScore float64) (Alert, error) {
|
func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
|
||||||
d := Alert{
|
d := Alert{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Score: score,
|
Score: score,
|
||||||
|
@ -369,7 +370,7 @@ func (as *alerter) cleanCache() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
|
func (as *alerter) newTimeSeries(id talkgroups.ID) trending.TimeSeries {
|
||||||
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
|
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
|
||||||
[]timeseries.Granularity{
|
[]timeseries.Granularity{
|
||||||
{Granularity: time.Second, Count: 60},
|
{Granularity: time.Second, Count: 60},
|
||||||
|
@ -417,7 +418,7 @@ func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Tim
|
||||||
defer as.Unlock()
|
defer as.Unlock()
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var tg cl.Talkgroup
|
var tg talkgroups.ID
|
||||||
var callDate time.Time
|
var callDate time.Time
|
||||||
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
|
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
|
||||||
return count, err
|
return count, err
|
||||||
|
@ -440,7 +441,7 @@ func (as *alerter) SinkType() string {
|
||||||
return "alerting"
|
return "alerting"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *alerter) Call(ctx context.Context, call *cl.Call) error {
|
func (as *alerter) Call(ctx context.Context, call *calls.Call) error {
|
||||||
as.Lock()
|
as.Lock()
|
||||||
defer as.Unlock()
|
defer as.Unlock()
|
||||||
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
|
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
|
||||||
|
@ -453,7 +454,7 @@ func (*alerter) Enabled() bool { return true }
|
||||||
// noopAlerter is used when alerting is disabled.
|
// noopAlerter is used when alerting is disabled.
|
||||||
type noopAlerter struct{}
|
type noopAlerter struct{}
|
||||||
|
|
||||||
func (*noopAlerter) SinkType() string { return "noopAlerter" }
|
func (*noopAlerter) SinkType() string { return "noopAlerter" }
|
||||||
func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil }
|
func (*noopAlerter) Call(_ context.Context, _ *calls.Call) error { return nil }
|
||||||
func (*noopAlerter) Go(_ context.Context) {}
|
func (*noopAlerter) Go(_ context.Context) {}
|
||||||
func (*noopAlerter) Enabled() bool { return false }
|
func (*noopAlerter) Enabled() bool { return false }
|
||||||
|
|
|
@ -12,8 +12,8 @@ import (
|
||||||
"dynatron.me/x/stillbox/internal/forms"
|
"dynatron.me/x/stillbox/internal/forms"
|
||||||
"dynatron.me/x/stillbox/internal/jsontime"
|
"dynatron.me/x/stillbox/internal/jsontime"
|
||||||
"dynatron.me/x/stillbox/internal/trending"
|
"dynatron.me/x/stillbox/internal/trending"
|
||||||
cl "dynatron.me/x/stillbox/pkg/calls"
|
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
|
"dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
@ -58,9 +58,9 @@ func (s *Simulation) stepClock(t time.Time) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
|
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
|
||||||
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[cl.Talkgroup], error) {
|
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
tgc := cl.NewTalkgroupCache()
|
tgc := talkgroups.NewCache()
|
||||||
|
|
||||||
s.Enable = true
|
s.Enable = true
|
||||||
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)
|
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)
|
||||||
|
|
|
@ -7,9 +7,9 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/pkg/calls"
|
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
"dynatron.me/x/stillbox/pkg/database"
|
"dynatron.me/x/stillbox/pkg/database"
|
||||||
|
"dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/common"
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
"dynatron.me/x/stillbox/internal/jsontime"
|
"dynatron.me/x/stillbox/internal/jsontime"
|
||||||
|
@ -76,14 +76,14 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs))
|
tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs))
|
||||||
for _, t := range tgs {
|
for _, t := range tgs {
|
||||||
tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
|
tgMap[talkgroups.ID{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
|
||||||
}
|
}
|
||||||
|
|
||||||
renderData := struct {
|
renderData := struct {
|
||||||
TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow
|
TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow
|
||||||
Scores trending.Scores[calls.Talkgroup]
|
Scores trending.Scores[talkgroups.ID]
|
||||||
LastScore time.Time
|
LastScore time.Time
|
||||||
Simulation *Simulation
|
Simulation *Simulation
|
||||||
Config config.Alerting
|
Config config.Alerting
|
||||||
|
|
|
@ -130,7 +130,6 @@ func (c *Call) computeLength() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Call) TalkgroupTuple() talkgroups.Talkgroup {
|
func (c *Call) TalkgroupTuple() talkgroups.ID {
|
||||||
return talkgroups.TG(c.System, c.Talkgroup)
|
return talkgroups.TG(c.System, c.Talkgroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,13 +10,13 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type TalkgroupFilter struct {
|
type TalkgroupFilter struct {
|
||||||
Talkgroups []tgs.Talkgroup `json:"talkgroups,omitempty"`
|
Talkgroups []tgs.ID `json:"talkgroups,omitempty"`
|
||||||
TalkgroupsNot []tgs.Talkgroup `json:"talkgroupsNot,omitempty"`
|
TalkgroupsNot []tgs.ID `json:"talkgroupsNot,omitempty"`
|
||||||
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
|
TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
|
||||||
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
|
TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
|
||||||
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
|
TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
|
||||||
|
|
||||||
talkgroups map[tgs.Talkgroup]bool
|
talkgroups map[tgs.ID]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) {
|
func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter, error) {
|
||||||
|
@ -27,9 +27,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
if l := len(p.Talkgroups); l > 0 {
|
if l := len(p.Talkgroups); l > 0 {
|
||||||
tgf.Talkgroups = make([]tgs.Talkgroup, l)
|
tgf.Talkgroups = make([]tgs.ID, l)
|
||||||
for i, t := range p.Talkgroups {
|
for i, t := range p.Talkgroups {
|
||||||
tgf.Talkgroups[i] = tgs.Talkgroup{
|
tgf.Talkgroups[i] = tgs.ID{
|
||||||
System: uint32(t.System),
|
System: uint32(t.System),
|
||||||
Talkgroup: uint32(t.Talkgroup),
|
Talkgroup: uint32(t.Talkgroup),
|
||||||
}
|
}
|
||||||
|
@ -37,9 +37,9 @@ func TalkgroupFilterFromPB(ctx context.Context, p *pb.Filter) (*TalkgroupFilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
if l := len(p.TalkgroupsNot); l > 0 {
|
if l := len(p.TalkgroupsNot); l > 0 {
|
||||||
tgf.TalkgroupsNot = make([]tgs.Talkgroup, l)
|
tgf.TalkgroupsNot = make([]tgs.ID, l)
|
||||||
for i, t := range p.TalkgroupsNot {
|
for i, t := range p.TalkgroupsNot {
|
||||||
tgf.TalkgroupsNot[i] = tgs.Talkgroup{
|
tgf.TalkgroupsNot[i] = tgs.ID{
|
||||||
System: uint32(t.System),
|
System: uint32(t.System),
|
||||||
Talkgroup: uint32(t.Talkgroup),
|
Talkgroup: uint32(t.Talkgroup),
|
||||||
}
|
}
|
||||||
|
@ -53,12 +53,12 @@ func (f *TalkgroupFilter) hasTags() bool {
|
||||||
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
|
return len(f.TalkgroupTagsAny) > 0 || len(f.TalkgroupTagsAll) > 0 || len(f.TalkgroupTagsNot) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.Talkgroup]bool {
|
func (f *TalkgroupFilter) GetFinalTalkgroups() map[tgs.ID]bool {
|
||||||
return f.talkgroups
|
return f.talkgroups
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *TalkgroupFilter) compile(ctx context.Context) error {
|
func (f *TalkgroupFilter) compile(ctx context.Context) error {
|
||||||
f.talkgroups = make(map[tgs.Talkgroup]bool)
|
f.talkgroups = make(map[tgs.ID]bool)
|
||||||
for _, tg := range f.Talkgroups {
|
for _, tg := range f.Talkgroups {
|
||||||
f.talkgroups[tg] = true
|
f.talkgroups[tg] = true
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ func (f *TalkgroupFilter) compile(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tg := range tagTGs {
|
for _, tg := range tagTGs {
|
||||||
f.talkgroups[tgs.Talkgroup{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
|
f.talkgroups[tgs.ID{System: uint32(tg.SystemID), Talkgroup: uint32(tg.Tgid)}] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/pkg/calls"
|
|
||||||
"dynatron.me/x/stillbox/pkg/alerting"
|
"dynatron.me/x/stillbox/pkg/alerting"
|
||||||
"dynatron.me/x/stillbox/pkg/auth"
|
"dynatron.me/x/stillbox/pkg/auth"
|
||||||
"dynatron.me/x/stillbox/pkg/config"
|
"dynatron.me/x/stillbox/pkg/config"
|
||||||
|
@ -15,6 +14,7 @@ import (
|
||||||
"dynatron.me/x/stillbox/pkg/notify"
|
"dynatron.me/x/stillbox/pkg/notify"
|
||||||
"dynatron.me/x/stillbox/pkg/sinks"
|
"dynatron.me/x/stillbox/pkg/sinks"
|
||||||
"dynatron.me/x/stillbox/pkg/sources"
|
"dynatron.me/x/stillbox/pkg/sources"
|
||||||
|
"dynatron.me/x/stillbox/pkg/talkgroups"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
"github.com/go-chi/cors"
|
"github.com/go-chi/cors"
|
||||||
|
@ -35,7 +35,7 @@ type Server struct {
|
||||||
alerter alerting.Alerter
|
alerter alerting.Alerter
|
||||||
notifier notify.Notifier
|
notifier notify.Notifier
|
||||||
hup chan os.Signal
|
hup chan os.Signal
|
||||||
tgCache calls.TalkgroupCache
|
tgs talkgroups.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
|
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
|
||||||
|
@ -58,7 +58,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tgCache := calls.NewTalkgroupCache()
|
tgCache := talkgroups.NewCache()
|
||||||
|
|
||||||
srv := &Server{
|
srv := &Server{
|
||||||
auth: authenticator,
|
auth: authenticator,
|
||||||
|
@ -69,7 +69,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
|
||||||
logger: logger,
|
logger: logger,
|
||||||
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
|
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
|
||||||
notifier: notifier,
|
notifier: notifier,
|
||||||
tgCache: tgCache,
|
tgs: tgCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
|
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
|
||||||
|
|
|
@ -7,8 +7,8 @@ import (
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/common"
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
"dynatron.me/x/stillbox/internal/forms"
|
"dynatron.me/x/stillbox/internal/forms"
|
||||||
"dynatron.me/x/stillbox/pkg/calls"
|
|
||||||
"dynatron.me/x/stillbox/pkg/auth"
|
"dynatron.me/x/stillbox/pkg/auth"
|
||||||
|
"dynatron.me/x/stillbox/pkg/calls"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
|
@ -8,14 +8,14 @@ import (
|
||||||
"dynatron.me/x/stillbox/internal/trending"
|
"dynatron.me/x/stillbox/internal/trending"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AlertConfig map[Talkgroup][]AlertRule
|
type AlertConfig map[ID][]AlertRule
|
||||||
|
|
||||||
type AlertRule struct {
|
type AlertRule struct {
|
||||||
Times []ruletime.RuleTime `json:"times"`
|
Times []ruletime.RuleTime `json:"times"`
|
||||||
ScoreMultiplier float32 `json:"mult"`
|
ScoreMultiplier float32 `json:"mult"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
|
func (ac AlertConfig) AddAlertConfig(tg ID, confBytes []byte) error {
|
||||||
if len(confBytes) == 0 {
|
if len(confBytes) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ func (ac AlertConfig) AddAlertConfig(tg Talkgroup, confBytes []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac AlertConfig) ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
|
func (ac AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 {
|
||||||
s, has := ac[score.ID]
|
s, has := ac[score.ID]
|
||||||
if !has {
|
if !has {
|
||||||
return score.Score
|
return score.Score
|
||||||
|
|
|
@ -18,7 +18,7 @@ func TestAlertConfig(t *testing.T) {
|
||||||
ac := make(talkgroups.AlertConfig)
|
ac := make(talkgroups.AlertConfig)
|
||||||
parseTests := []struct {
|
parseTests := []struct {
|
||||||
name string
|
name string
|
||||||
tg talkgroups.Talkgroup
|
tg talkgroups.ID
|
||||||
conf string
|
conf string
|
||||||
compare []talkgroups.AlertRule
|
compare []talkgroups.AlertRule
|
||||||
expectErr error
|
expectErr error
|
||||||
|
@ -78,7 +78,7 @@ func TestAlertConfig(t *testing.T) {
|
||||||
|
|
||||||
evalTests := []struct {
|
evalTests := []struct {
|
||||||
name string
|
name string
|
||||||
tg talkgroups.Talkgroup
|
tg talkgroups.ID
|
||||||
t time.Time
|
t time.Time
|
||||||
origScore float64
|
origScore float64
|
||||||
expectScore float64
|
expectScore float64
|
||||||
|
@ -122,7 +122,7 @@ func TestAlertConfig(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range evalTests {
|
for _, tc := range evalTests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
cs := trending.Score[talkgroups.Talkgroup]{
|
cs := trending.Score[talkgroups.ID]{
|
||||||
ID: tc.tg,
|
ID: tc.tg,
|
||||||
Score: tc.origScore,
|
Score: tc.origScore,
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package talkgroups
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -15,49 +14,29 @@ import (
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Talkgroup struct {
|
type tgMap map[ID]database.GetTalkgroupWithLearnedByPackedIDsRow
|
||||||
System uint32
|
|
||||||
Talkgroup uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
|
type Store interface {
|
||||||
return Talkgroup{
|
// TG retrieves a Talkgroup from the Store. It returns the record and whether one was found.
|
||||||
System: uint32(sys),
|
TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
|
||||||
Talkgroup: uint32(tgid),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t Talkgroup) Pack() int64 {
|
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
|
||||||
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
|
|
||||||
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t Talkgroup) String() string {
|
|
||||||
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
|
|
||||||
}
|
|
||||||
|
|
||||||
func PackedTGs(tg []Talkgroup) []int64 {
|
|
||||||
s := make([]int64, len(tg))
|
|
||||||
|
|
||||||
for i, v := range tg {
|
|
||||||
s[i] = v.Pack()
|
|
||||||
}
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
|
|
||||||
|
|
||||||
type TalkgroupCache interface {
|
|
||||||
TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
|
|
||||||
SystemName(ctx context.Context, id int) (string, bool)
|
SystemName(ctx context.Context, id int) (string, bool)
|
||||||
ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64
|
|
||||||
Hint(ctx context.Context, tgs []Talkgroup) error
|
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
|
||||||
|
ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64
|
||||||
|
|
||||||
|
// Hint hints the Store that the provided talkgroups will be asked for.
|
||||||
|
Hint(ctx context.Context, tgs []ID) error
|
||||||
|
|
||||||
|
// Load loads the provided packed talkgroup IDs into the Store.
|
||||||
Load(ctx context.Context, tgs []int64) error
|
Load(ctx context.Context, tgs []int64) error
|
||||||
|
|
||||||
|
// Invalidate invalidates any caching in the Store.
|
||||||
Invalidate()
|
Invalidate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) Invalidate() {
|
func (t *cache) Invalidate() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
clear(t.tgs)
|
clear(t.tgs)
|
||||||
|
@ -65,15 +44,16 @@ func (t *talkgroupCache) Invalidate() {
|
||||||
clear(t.AlertConfig)
|
clear(t.AlertConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
type talkgroupCache struct {
|
type cache struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
AlertConfig
|
AlertConfig
|
||||||
tgs tgMap
|
tgs tgMap
|
||||||
systems map[int32]string
|
systems map[int32]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTalkgroupCache() TalkgroupCache {
|
// NewCache returns a new cache Store.
|
||||||
tgc := &talkgroupCache{
|
func NewCache() Store {
|
||||||
|
tgc := &cache{
|
||||||
tgs: make(tgMap),
|
tgs: make(tgMap),
|
||||||
systems: make(map[int32]string),
|
systems: make(map[int32]string),
|
||||||
AlertConfig: make(AlertConfig),
|
AlertConfig: make(AlertConfig),
|
||||||
|
@ -82,7 +62,7 @@ func NewTalkgroupCache() TalkgroupCache {
|
||||||
return tgc
|
return tgc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
|
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
var toLoad []int64
|
var toLoad []int64
|
||||||
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
|
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
|
||||||
|
@ -109,7 +89,7 @@ func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
|
func (t *cache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
|
||||||
tg := TG(rec.SystemID, rec.Tgid)
|
tg := TG(rec.SystemID, rec.Tgid)
|
||||||
t.tgs[tg] = rec
|
t.tgs[tg] = rec
|
||||||
t.systems[rec.SystemID] = rec.SystemName
|
t.systems[rec.SystemID] = rec.SystemName
|
||||||
|
@ -117,7 +97,7 @@ func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow)
|
||||||
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
|
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
|
func (t *cache) Load(ctx context.Context, tgs []int64) error {
|
||||||
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
|
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -137,7 +117,7 @@ func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
|
func (t *cache) TG(ctx context.Context, tg ID) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
rec, has := t.tgs[tg]
|
rec, has := t.tgs[tg]
|
||||||
t.RUnlock()
|
t.RUnlock()
|
||||||
|
@ -171,7 +151,7 @@ func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalk
|
||||||
return recs[0], true
|
return recs[0], true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) {
|
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
|
||||||
n, has := t.systems[int32(id)]
|
n, has := t.systems[int32(id)]
|
||||||
|
|
||||||
if !has {
|
if !has {
|
36
pkg/talkgroups/talkgroup.go
Normal file
36
pkg/talkgroups/talkgroup.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package talkgroups
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ID struct {
|
||||||
|
System uint32
|
||||||
|
Talkgroup uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID {
|
||||||
|
return ID{
|
||||||
|
System: uint32(sys),
|
||||||
|
Talkgroup: uint32(tgid),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t ID) Pack() int64 {
|
||||||
|
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
|
||||||
|
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t ID) String() string {
|
||||||
|
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
func PackedTGs(tg []ID) []int64 {
|
||||||
|
s := make([]int64, len(tg))
|
||||||
|
|
||||||
|
for i, v := range tg {
|
||||||
|
s[i] = v.Pack()
|
||||||
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
Loading…
Reference in a new issue