Move tgstore to its own package

This commit is contained in:
Daniel 2024-11-20 19:59:24 -05:00
parent 5d9a08780f
commit 8207c59815
13 changed files with 113 additions and 81 deletions

View file

@ -8,6 +8,7 @@ import (
"dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/jackc/pgx/v5/pgtype"
)
@ -44,7 +45,8 @@ func (a *Alert) ToAddAlertParams() database.AddAlertParams {
}
// Make creates an alert for later rendering or storage.
func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
func Make(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) {
store := tgstore.From(ctx)
d := Alert{
Score: score,
Timestamp: time.Now(),

View file

@ -15,7 +15,8 @@ import (
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/sinks"
talkgroups "dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
@ -51,7 +52,7 @@ type alerter struct {
alertCache map[talkgroups.ID]alert.Alert
renotify time.Duration
notifier notify.Notifier
tgCache talkgroups.Store
tgCache tgstore.Store
}
type offsetClock time.Duration
@ -86,7 +87,7 @@ func WithNotifier(n notify.Notifier) AlertOption {
}
// New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Alerter {
func New(cfg config.Alerting, tgCache tgstore.Store, opts ...AlertOption) Alerter {
if !cfg.Enable {
return &noopAlerter{}
}
@ -169,7 +170,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]al
if s.Score > as.cfg.AlertThreshold || testMode {
if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify {
s.Score *= as.tgCache.Weight(ctx, s.ID, now)
a, err := alert.Make(ctx, as.tgCache, s, origScore)
a, err := alert.Make(ctx, s, origScore)
if err != nil {
return nil, fmt.Errorf("makeAlert: %w", err)
}
@ -202,7 +203,7 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ridx := rand.Intn(len(as.scores))
a, err := alert.Make(ctx, talkgroups.StoreFrom(ctx), as.scores[ridx], 1.0)
a, err := alert.Make(ctx, as.scores[ridx], 1.0)
if err != nil {
log.Error().Err(err).Msg("test notify make alert fail")
http.Error(w, err.Error(), http.StatusInternalServerError)

View file

@ -13,6 +13,7 @@ import (
"dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/rs/zerolog/log"
)
@ -59,7 +60,7 @@ func (s *Simulation) stepClock(t time.Time) {
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
func (s *Simulation) Simulate(ctx context.Context) (trending.Scores[talkgroups.ID], error) {
now := time.Now()
tgc := talkgroups.NewCache()
tgc := tgstore.NewCache()
s.Enable = true
s.alerter = New(s.Alerting, tgc, WithClock(&s.clock)).(*alerter)

View file

@ -6,6 +6,7 @@ import (
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/types/known/structpb"
@ -59,9 +60,9 @@ func (c *client) SendError(cmd *pb.Command, err error) {
}
func (c *client) Talkgroup(ctx context.Context, tg *pb.Talkgroup) error {
tgi, err := talkgroups.StoreFrom(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup))
tgi, err := tgstore.From(ctx).TG(ctx, talkgroups.TG(tg.System, tg.Talkgroup))
if err != nil {
if err != talkgroups.ErrNotFound {
if err != tgstore.ErrNotFound {
log.Error().Err(err).Int32("sys", tg.System).Int32("tg", tg.Talkgroup).Msg("get talkgroup fail")
}
return err

View file

@ -4,7 +4,7 @@ import (
"errors"
"net/http"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
@ -87,8 +87,8 @@ func internalError(err error) render.Renderer {
type errResponder func(error) render.Renderer
var statusMapping = map[error]errResponder{
talkgroups.ErrNoSuchSystem: errTextNotFound,
talkgroups.ErrNotFound: errTextNotFound,
tgstore.ErrNoSuchSystem: errTextNotFound,
tgstore.ErrNotFound: errTextNotFound,
pgx.ErrNoRows: recordNotFound,
}

View file

@ -6,6 +6,7 @@ import (
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"dynatron.me/x/stillbox/pkg/talkgroups/importer"
"github.com/go-chi/chi/v5"
@ -53,7 +54,7 @@ func (t tgParams) ToID() talkgroups.ID {
func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx)
tgs := tgstore.From(ctx)
var p tgParams
@ -91,7 +92,7 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) {
}
ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx)
tgs := tgstore.From(ctx)
input := database.UpdateTalkgroupParams{}
@ -137,12 +138,12 @@ func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) {
}
if id.System == nil { // don't think this would ever happen
wErr(w, r, badRequest(talkgroups.ErrNoSuchSystem))
wErr(w, r, badRequest(tgstore.ErrNoSuchSystem))
return
}
ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx)
tgs := tgstore.From(ctx)
var input []database.UpsertTalkgroupParams

View file

@ -9,7 +9,7 @@ import (
"dynatron.me/x/stillbox/internal/version"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/httprate"
@ -28,7 +28,7 @@ func (s *Server) setupRoutes() {
r := s.r
r.Use(middleware.WithValue(database.DBCtxKey, s.db))
r.Use(middleware.WithValue(talkgroups.StoreCtxKey, s.tgs))
r.Use(middleware.WithValue(tgstore.StoreCtxKey, s.tgs))
s.installPprof()

View file

@ -15,7 +15,8 @@ import (
"dynatron.me/x/stillbox/pkg/rest"
"dynatron.me/x/stillbox/pkg/sinks"
"dynatron.me/x/stillbox/pkg/sources"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
@ -37,7 +38,7 @@ type Server struct {
alerter alerting.Alerter
notifier notify.Notifier
hup chan os.Signal
tgs talkgroups.Store
tgs tgstore.Store
rest rest.API
}
@ -61,7 +62,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
return nil, err
}
tgCache := talkgroups.NewCache()
tgCache := tgstore.NewCache()
api := rest.New()
srv := &Server{
@ -117,7 +118,7 @@ func (s *Server) Go(ctx context.Context) error {
s.installHupHandler()
ctx = database.CtxWithDB(ctx, s.db)
ctx = talkgroups.CtxWithStore(ctx, s.tgs)
ctx = tgstore.CtxWithStore(ctx, s.tgs)
httpSrv := &http.Server{
Addr: s.conf.Listen,

View file

@ -13,6 +13,7 @@ import (
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/talkgroups"
"dynatron.me/x/stillbox/pkg/talkgroups/tgstore"
)
type ImportSource string
@ -66,9 +67,9 @@ var rrRE = regexp.MustCompile(`DEC\s+HEX\s+Mode\s+Alpha Tag\s+Description\s+Tag`
func (rr *radioReferenceImporter) importTalkgroups(ctx context.Context, sys int, r io.Reader) ([]talkgroups.Talkgroup, error) {
sc := bufio.NewScanner(r)
tgs := make([]talkgroups.Talkgroup, 0, 8)
sysn, has := talkgroups.StoreFrom(ctx).SystemName(ctx, sys)
sysn, has := tgstore.From(ctx).SystemName(ctx, sys)
if !has {
return nil, talkgroups.ErrNoSuchSystem
return nil, tgstore.ErrNoSuchSystem
}
var groupName string

View file

@ -1,4 +1,4 @@
package talkgroups
package tgstore
import (
"context"
@ -11,12 +11,13 @@ import (
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
tgsp "dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type tgMap map[ID]*Talkgroup
type tgMap map[tgsp.ID]*tgsp.Talkgroup
var (
ErrNotFound = errors.New("talkgroup not found")
@ -25,25 +26,25 @@ var (
type Store interface {
// UpdateTG updates a talkgroup record.
UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error)
UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*tgsp.Talkgroup, error)
// UpsertTGs upserts a slice of talkgroups.
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error)
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error)
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (*Talkgroup, error)
TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error)
// TGs retrieves many talkgroups from the Store.
TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error)
TGs(ctx context.Context, tgs tgsp.IDs) ([]*tgsp.Talkgroup, error)
// SystemTGs retrieves all Talkgroups associated with a System.
SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error)
SystemTGs(ctx context.Context, systemID int32) ([]*tgsp.Talkgroup, error)
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
// Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error
Hint(ctx context.Context, tgs []tgsp.ID) error
// Load loads the provided talkgroup ID tuples into the Store.
Load(ctx context.Context, tgs database.TGTuples) error
@ -52,7 +53,7 @@ type Store interface {
Invalidate()
// Weight returns the final weight of this talkgroup, including its static and rules-derived weight.
Weight(ctx context.Context, id ID, t time.Time) float64
Weight(ctx context.Context, id tgsp.ID, t time.Time) float64
// Hupper
HUP(*config.Config)
@ -66,7 +67,7 @@ func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, StoreCtxKey, s)
}
func StoreFrom(ctx context.Context) Store {
func From(ctx context.Context) Store {
s, ok := ctx.Value(StoreCtxKey).(Store)
if !ok {
return NewCache()
@ -102,7 +103,7 @@ func NewCache() Store {
return tgc
}
func (t *cache) Hint(ctx context.Context, tgs []ID) error {
func (t *cache) Hint(ctx context.Context, tgs []tgsp.ID) error {
t.RLock()
var toLoad database.TGTuples
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
@ -130,11 +131,11 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error {
return nil
}
func (t *cache) add(rec *Talkgroup) {
func (t *cache) add(rec *tgsp.Talkgroup) {
t.Lock()
defer t.Unlock()
tg := TG(rec.System.ID, rec.Talkgroup.TGID)
tg := tgsp.TG(rec.System.ID, rec.Talkgroup.TGID)
t.tgs[tg] = rec
t.systems[int32(rec.System.ID)] = rec.System.Name
}
@ -147,15 +148,15 @@ type row interface {
GetLearned() bool
}
func rowToTalkgroup[T row](r T) *Talkgroup {
return &Talkgroup{
func rowToTalkgroup[T row](r T) *tgsp.Talkgroup {
return &tgsp.Talkgroup{
Talkgroup: r.GetTalkgroup(),
System: r.GetSystem(),
Learned: r.GetLearned(),
}
}
func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) []*Talkgroup {
func addToRowList[T row](t *cache, r []*tgsp.Talkgroup, tgRecords []T) []*tgsp.Talkgroup {
for _, rec := range tgRecords {
tg := rowToTalkgroup(rec)
t.add(tg)
@ -166,11 +167,11 @@ func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) []*Talkgroup {
return r
}
func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
r := make([]*Talkgroup, 0, len(tgs))
func (t *cache) TGs(ctx context.Context, tgs tgsp.IDs) ([]*tgsp.Talkgroup, error) {
r := make([]*tgsp.Talkgroup, 0, len(tgs))
var err error
if tgs != nil {
toGet := make(IDs, 0, len(tgs))
toGet := make(tgsp.IDs, 0, len(tgs))
t.RLock()
for _, id := range tgs {
rec, has := t.tgs[id]
@ -211,7 +212,7 @@ func (t *cache) Load(ctx context.Context, tgs database.TGTuples) error {
return nil
}
func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
func (t *cache) Weight(ctx context.Context, id tgsp.ID, tm time.Time) float64 {
tg, err := t.TG(ctx, id)
if err != nil {
return 1.0
@ -224,17 +225,17 @@ func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
return float64(m)
}
func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error) {
func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*tgsp.Talkgroup, error) {
recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySystem(ctx, systemID)
if err != nil {
return nil, err
}
r := make([]*Talkgroup, 0, len(recs))
r := make([]*tgsp.Talkgroup, 0, len(recs))
return addToRowList(t, r, recs), nil
}
func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
func (t *cache) TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error) {
t.RLock()
rec, has := t.tgs[tg]
t.RUnlock()
@ -279,7 +280,7 @@ func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool)
return n, has
}
func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error) {
func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*tgsp.Talkgroup, error) {
sysName, has := t.SystemName(ctx, int(*input.SystemID))
if !has {
return nil, ErrNoSuchSystem
@ -290,7 +291,7 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
return nil, err
}
record := &Talkgroup{
record := &tgsp.Talkgroup{
Talkgroup: tg,
System: database.System{ID: int(tg.SystemID), Name: sysName},
}
@ -299,7 +300,7 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
return record, nil
}
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) {
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error) {
db := database.FromCtx(ctx)
sysName, hasSys := t.SystemName(ctx, system)
if !hasSys {
@ -310,7 +311,7 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
Name: sysName,
}
tgs := make([]*Talkgroup, 0, len(input))
tgs := make([]*tgsp.Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error {
versionParams := make([]database.StoreTGVersionParams, 0, len(input))
@ -341,7 +342,7 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
TGID: r.TGID,
Submitter: auth.UIDFrom(ctx),
})
tgs = append(tgs, &Talkgroup{
tgs = append(tgs, &tgsp.Talkgroup{
Talkgroup: r,
System: sys,
Learned: r.Learned,

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS talkgroup_versions;

View file

@ -0,0 +1,49 @@
CREATE TABLE IF NOT EXISTS talkgroup_versions(
-- version metadata
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
time TIMESTAMPTZ NOT NULL,
created_by INTEGER REFERENCES users(id),
-- talkgroup snapshot
system_id INT4 REFERENCES systems(id),
tgid INT4,
name TEXT,
alpha_tag TEXT,
tg_group TEXT,
frequency INTEGER,
metadata JSONB,
tags TEXT[],
alert BOOLEAN,
alert_config JSONB,
weight REAL,
learned BOOLEAN
);
-- Store current version
INSERT INTO talkgroup_versions(time, created_by,
system_id,
tgid,
name,
alpha_tag,
tg_group,
frequency,
metadata,
tags,
alert,
alert_config,
weight,
learned
) SELECT NOW(), @submitter,
tg.system_id,
tg.tgid,
tg.name,
tg.alpha_tag,
tg.tg_group,
tg.frequency,
tg.metadata,
tg.tags,
tg.alert,
tg.alert_config,
tg.weight,
tg.learned
FROM talkgroups;

View file

@ -29,47 +29,20 @@ SELECT
sqlc.embed(tg), sqlc.embed(sys)
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE (tg.system_id, tg.tgid) = (@system_id, @tgid) AND tg.learned IS NOT TRUE
UNION
SELECT
tgl.id, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.tg_group, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.tg_group IS NULL THEN NULL ELSE ARRAY[tgl.tg_group] END,
NOT tgl.ignored, NULL::JSONB, 1.0, TRUE learned, sys.id, sys.name
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = @system_id AND tgl.tgid = @tgid AND ignored IS NOT TRUE;
WHERE (tg.system_id, tg.tgid) = (@system_id, @tgid);
-- name: GetTalkgroupsWithLearnedBySystem :many
SELECT
sqlc.embed(tg), sqlc.embed(sys)
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.system_id = @system AND tg.learned IS NOT TRUE
UNION
SELECT
tgl.id, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.tg_group, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.tg_group IS NULL THEN NULL ELSE ARRAY[tgl.tg_group] END,
NOT tgl.ignored, NULL::JSONB, 1.0, TRUE learned, sys.id, sys.name
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = @system AND ignored IS NOT TRUE;
WHERE tg.system_id = @system;
-- name: GetTalkgroupsWithLearned :many
SELECT
sqlc.embed(tg), sqlc.embed(sys)
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.learned IS NOT TRUE
UNION
SELECT
tgl.id, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.tg_group, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.tg_group IS NULL THEN NULL ELSE ARRAY[tgl.tg_group] END,
NOT tgl.ignored, NULL::JSONB, 1.0, TRUE learned, sys.id, sys.name
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE ignored IS NOT TRUE;
-- name: GetSystemName :one