191 lines
4 KiB
Go
191 lines
4 KiB
Go
package calls
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"dynatron.me/x/stillbox/pkg/database"
|
|
|
|
"dynatron.me/x/stillbox/internal/ruletime"
|
|
"dynatron.me/x/stillbox/internal/trending"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type Talkgroup struct {
|
|
System uint32
|
|
Talkgroup uint32
|
|
}
|
|
|
|
func (c *Call) TalkgroupTuple() Talkgroup {
|
|
return Talkgroup{System: uint32(c.System), Talkgroup: uint32(c.Talkgroup)}
|
|
}
|
|
|
|
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) Talkgroup {
|
|
return Talkgroup{
|
|
System: uint32(sys),
|
|
Talkgroup: uint32(tgid),
|
|
}
|
|
}
|
|
|
|
func (t Talkgroup) Pack() int64 {
|
|
// P25 system IDs are 12 bits, so we can fit them in a signed 8 byte int (int64, pg INT8)
|
|
return int64((int64(t.System) << 32) | int64(t.Talkgroup))
|
|
}
|
|
|
|
func (t Talkgroup) String() string {
|
|
return fmt.Sprintf("%d:%d", t.System, t.Talkgroup)
|
|
}
|
|
|
|
func PackedTGs(tg []Talkgroup) []int64 {
|
|
s := make([]int64, len(tg))
|
|
|
|
for i, v := range tg {
|
|
s[i] = v.Pack()
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
type tgMap map[Talkgroup]database.GetTalkgroupWithLearnedByPackedIDsRow
|
|
|
|
type TalkgroupCache interface {
|
|
TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool)
|
|
SystemName(ctx context.Context, id int) (string, bool)
|
|
ApplyAlertRules(score trending.Score[Talkgroup], t time.Time, coversOpts ...ruletime.CoversOption) float64
|
|
Hint(ctx context.Context, tgs []Talkgroup) error
|
|
Load(ctx context.Context, tgs []int64) error
|
|
Invalidate()
|
|
}
|
|
|
|
func (t *talkgroupCache) Invalidate() {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
clear(t.tgs)
|
|
clear(t.systems)
|
|
clear(t.AlertConfig)
|
|
}
|
|
|
|
type talkgroupCache struct {
|
|
sync.RWMutex
|
|
AlertConfig
|
|
tgs tgMap
|
|
systems map[int32]string
|
|
}
|
|
|
|
func NewTalkgroupCache() TalkgroupCache {
|
|
tgc := &talkgroupCache{
|
|
tgs: make(tgMap),
|
|
systems: make(map[int32]string),
|
|
AlertConfig: make(AlertConfig),
|
|
}
|
|
|
|
return tgc
|
|
}
|
|
|
|
func (t *talkgroupCache) Hint(ctx context.Context, tgs []Talkgroup) error {
|
|
t.RLock()
|
|
var toLoad []int64
|
|
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
|
|
for _, tg := range tgs {
|
|
_, ok := t.tgs[tg]
|
|
if !ok {
|
|
toLoad = append(toLoad, tg.Pack())
|
|
}
|
|
}
|
|
|
|
} else {
|
|
toLoad = make([]int64, 0, len(tgs))
|
|
for _, g := range tgs {
|
|
toLoad = append(toLoad, g.Pack())
|
|
}
|
|
}
|
|
|
|
if len(toLoad) > 0 {
|
|
t.RUnlock()
|
|
return t.Load(ctx, toLoad)
|
|
}
|
|
|
|
t.RUnlock()
|
|
return nil
|
|
}
|
|
|
|
func (t *talkgroupCache) add(rec database.GetTalkgroupWithLearnedByPackedIDsRow) error {
|
|
tg := TG(rec.SystemID, rec.Tgid)
|
|
t.tgs[tg] = rec
|
|
t.systems[rec.SystemID] = rec.SystemName
|
|
|
|
return t.AlertConfig.AddAlertConfig(tg, rec.AlertConfig)
|
|
}
|
|
|
|
func (t *talkgroupCache) Load(ctx context.Context, tgs []int64) error {
|
|
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
for _, rec := range tgRecords {
|
|
err := t.add(rec)
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("add alert config fail")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *talkgroupCache) TG(ctx context.Context, tg Talkgroup) (database.GetTalkgroupWithLearnedByPackedIDsRow, bool) {
|
|
t.RLock()
|
|
rec, has := t.tgs[tg]
|
|
t.RUnlock()
|
|
|
|
if has {
|
|
return rec, has
|
|
}
|
|
|
|
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
|
|
switch err {
|
|
case nil:
|
|
case pgx.ErrNoRows:
|
|
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
|
|
default:
|
|
log.Error().Err(err).Msg("TG() cache add db get")
|
|
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
|
|
}
|
|
|
|
if len(recs) < 1 {
|
|
return database.GetTalkgroupWithLearnedByPackedIDsRow{}, false
|
|
}
|
|
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
err = t.add(recs[0])
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("TG() cache add")
|
|
return recs[0], false
|
|
}
|
|
|
|
return recs[0], true
|
|
}
|
|
|
|
func (t *talkgroupCache) SystemName(ctx context.Context, id int) (name string, has bool) {
|
|
n, has := t.systems[int32(id)]
|
|
|
|
if !has {
|
|
sys, err := database.FromCtx(ctx).GetSystemName(ctx, id)
|
|
if err != nil {
|
|
return "", false
|
|
}
|
|
|
|
return sys, true
|
|
}
|
|
|
|
return n, has
|
|
}
|