From 170970e92d0c0124792fc68cd75e0770855d38df Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 4 Nov 2024 11:15:24 -0500 Subject: [PATCH 1/6] wip --- pkg/api/api.go | 122 ++++++++++++++++++++++++++++++++++++ pkg/server/routes.go | 1 + pkg/server/server.go | 4 ++ pkg/talkgroups/cache.go | 53 +++++++++++++--- pkg/talkgroups/talkgroup.go | 11 ++++ 5 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 pkg/api/api.go diff --git a/pkg/api/api.go b/pkg/api/api.go new file mode 100644 index 0000000..eb7eb7e --- /dev/null +++ b/pkg/api/api.go @@ -0,0 +1,122 @@ +package api + +import ( + "encoding/json" + "errors" + "net/http" + + "dynatron.me/x/stillbox/pkg/talkgroups" + + "github.com/go-chi/chi/v5" + "github.com/go-viper/mapstructure/v2" + "github.com/jackc/pgx/v5" + "github.com/rs/zerolog/log" +) + +type API interface { + Subrouter() http.Handler +} + +type api struct { + tgs talkgroups.Store +} + +func New(tgs talkgroups.Store) API { + s := &api{ + tgs: tgs, + } + + return s +} + +func (a *api) Subrouter() http.Handler { + r := chi.NewMux() + + r.Get("/talkgroup/{system:\\d+}/{id:\\d+}", a.talkgroup) + r.Get("/talkgroup/{system:\\d+}/", a.talkgroup) + r.Get("/talkgroup/", a.talkgroup) + return r +} + +var statusMapping = map[error]int{ + talkgroups.ErrNotFound: http.StatusNotFound, + pgx.ErrNoRows: http.StatusNotFound, +} + +func httpCode(err error) int { + c, ok := statusMapping[err] + if ok { + return c + } + + for e, c := range statusMapping { // check if err wraps an error we know about + if errors.Is(err, e) { + return c + } + } + + return http.StatusInternalServerError +} + +func (a *api) writeJSON(w http.ResponseWriter, r *http.Request, data interface{}, err error) { + if err != nil { + log.Error().Str("path", r.URL.Path).Err(err).Msg("request failed") + http.Error(w, err.Error(), httpCode(err)) + return + } + + enc := json.NewEncoder(w) + err = enc.Encode(data) + if err != nil { + log.Error().Str("path", r.URL.Path).Err(err).Msg("response marshal failed") + http.Error(w, err.Error(), httpCode(err)) + return + } +} + +func decodeParams(d interface{}, r *http.Request) error { + params := chi.RouteContext(r.Context()).URLParams + m := make(map[string]string, len(params.Keys)) + + for i, k := range params.Keys { + m[k] = params.Values[i] + } + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Metadata: nil, + Result: d, + TagName: "param", + WeaklyTypedInput: true, + }) + if err != nil { + return err + } + + return dec.Decode(m) +} + +func (a *api) badReq(w http.ResponseWriter, err error) { + http.Error(w, err.Error(), http.StatusBadRequest) +} + +func (a *api) talkgroup(w http.ResponseWriter, r *http.Request) { + p := struct { + System *int `param:"system"` + ID *int `param:"id"` + }{} + + err := decodeParams(&p, r) + if err != nil { + a.badReq(w, err) + return + } + + var res interface{} + switch { + case p.System != nil && p.ID != nil: + res, err = a.tgs.TG(r.Context(), talkgroups.TG(*p.System, *p.ID)) + case p.System != nil: + default: + } + a.writeJSON(w, r, res, err) +} diff --git a/pkg/server/routes.go b/pkg/server/routes.go index 1d6b87b..bcc7c3b 100644 --- a/pkg/server/routes.go +++ b/pkg/server/routes.go @@ -36,6 +36,7 @@ func (s *Server) setupRoutes() { s.nex.PrivateRoutes(r) s.auth.PrivateRoutes(r) s.alerter.PrivateRoutes(r) + r.Mount("/api", s.api.Subrouter()) }) r.Group(func(r chi.Router) { diff --git a/pkg/server/server.go b/pkg/server/server.go index fb0c23b..ed11996 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -7,6 +7,7 @@ import ( "time" "dynatron.me/x/stillbox/pkg/alerting" + "dynatron.me/x/stillbox/pkg/api" "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" @@ -36,6 +37,7 @@ type Server struct { notifier notify.Notifier hup chan os.Signal tgs talkgroups.Store + api api.API } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -59,6 +61,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { } tgCache := talkgroups.NewCache() + api := api.New(tgCache) srv := &Server{ auth: authenticator, @@ -70,6 +73,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), notifier: notifier, tgs: tgCache, + api: api, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index 22e9500..dd7a6cb 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -15,11 +15,14 @@ import ( "github.com/rs/zerolog/log" ) -type tgMap map[ID]Talkgroup +type tgMap map[ID]*Talkgroup type Store interface { // TG retrieves a Talkgroup from the Store. - TG(ctx context.Context, tg ID) (Talkgroup, error) + TG(ctx context.Context, tg ID) (*Talkgroup, error) + + // TGs retrieves many talkgroups from the Store. + TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) @@ -117,7 +120,7 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error { return nil } -func (t *cache) add(rec Talkgroup) error { +func (t *cache) add(rec *Talkgroup) error { t.Lock() defer t.Unlock() @@ -128,14 +131,46 @@ func (t *cache) add(rec Talkgroup) error { return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig) } -func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup { - return Talkgroup{ +func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) *Talkgroup { + return &Talkgroup{ Talkgroup: r.Talkgroup, System: r.System, Learned: r.Learned, } } +func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) { + r := make([]*Talkgroup, 0, len(tgs)) + toGet := make(IDs, 0, len(tgs)) + t.RLock() + for _, id := range tgs { + rec, has := t.tgs[id] + if has { + r = append(r, rec) + } else { + toGet = append(toGet, id) + } + } + t.RUnlock() + + tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, toGet.Packed()) + if err != nil { + return nil, err + } + + for _, rec := range tgRecords { + tg := rowToTalkgroup(rec) + err := t.add(tg) + if err != nil { + return nil, err + } + + r = append(r, tg) + } + + return r, nil +} + func (t *cache) Load(ctx context.Context, tgs []int64) error { tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) if err != nil { @@ -168,7 +203,7 @@ func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 { return float64(m) } -func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { +func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) { t.RLock() rec, has := t.tgs[tg] t.RUnlock() @@ -181,14 +216,14 @@ func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { switch err { case nil: case pgx.ErrNoRows: - return Talkgroup{}, ErrNotFound + return nil, ErrNotFound default: log.Error().Err(err).Msg("TG() cache add db get") - return Talkgroup{}, errors.Join(ErrNotFound, err) + return nil, errors.Join(ErrNotFound, err) } if len(recs) < 1 { - return Talkgroup{}, ErrNotFound + return nil, ErrNotFound } err = t.add(rowToTalkgroup(recs[0])) diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index bf68dfa..a988c24 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -17,6 +17,17 @@ type ID struct { Talkgroup uint32 } +type IDs []ID + +func (ids *IDs) Packed() []int64 { + r := make([]int64, len(*ids)) + for i := range *ids { + r[i] = (*ids)[i].Pack() + } + + return r +} + func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID { return ID{ System: uint32(sys), From fb2387c2127316cc771788f51475ed5dfa25acd3 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 4 Nov 2024 23:41:52 -0500 Subject: [PATCH 2/6] API begin --- internal/trending/item.go | 2 +- internal/trending/trending.go | 2 +- pkg/alerting/stats.go | 8 +- pkg/api/api.go | 17 +- pkg/database/extend.go | 11 ++ pkg/database/querier.go | 4 +- pkg/database/talkgroups.sql.go | 243 +++++++++++++++++++++------- pkg/database/talkgroups.sql_test.go | 41 ++++- pkg/pb/stillbox.pb.go | 2 +- pkg/talkgroups/cache.go | 85 +++++++--- sql/postgres/queries/talkgroups.sql | 38 ++++- 11 files changed, 351 insertions(+), 102 deletions(-) create mode 100644 pkg/database/extend.go diff --git a/internal/trending/item.go b/internal/trending/item.go index aa5dc24..058274e 100644 --- a/internal/trending/item.go +++ b/internal/trending/item.go @@ -31,7 +31,7 @@ func newItem[K comparable](id K, options *options[K]) *item[K] { } } -func (i *item[K]) score(id K) Score[K] { +func (i *item[K]) score() Score[K] { recentCount, count := i.computeCounts() if recentCount < i.options.countThreshold { return Score[K]{} diff --git a/internal/trending/trending.go b/internal/trending/trending.go index 1814225..db76f2b 100644 --- a/internal/trending/trending.go +++ b/internal/trending/trending.go @@ -192,7 +192,7 @@ func (s *Scorer[K]) addToItem(item *item[K], tm time.Time) { func (s *Scorer[K]) Score() Scores[K] { var scores Scores[K] for id, item := range s.items { - score := item.score(id) + score := item.score() score.ID = id scores = append(scores, score) } diff --git a/pkg/alerting/stats.go b/pkg/alerting/stats.go index e513edd..0d5c3be 100644 --- a/pkg/alerting/stats.go +++ b/pkg/alerting/stats.go @@ -69,20 +69,20 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() db := database.FromCtx(ctx) - tgs, err := db.GetTalkgroupsByPackedIDs(ctx, as.packedScoredTGs()) + tgs, err := db.GetTalkgroupsWithLearnedByPackedIDs(ctx, as.packedScoredTGs()) if err != nil { log.Error().Err(err).Msg("stats TG get failed") http.Error(w, err.Error(), http.StatusInternalServerError) return } - tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs)) + tgMap := make(map[talkgroups.ID]database.GetTalkgroupsWithLearnedByPackedIDsRow, len(tgs)) for _, t := range tgs { - tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t + tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.Tgid)}] = t } renderData := struct { - TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow + TGs map[talkgroups.ID]database.GetTalkgroupsWithLearnedByPackedIDsRow Scores trending.Scores[talkgroups.ID] LastScore time.Time Simulation *Simulation diff --git a/pkg/api/api.go b/pkg/api/api.go index eb7eb7e..19ab3ef 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -58,13 +58,14 @@ func httpCode(err error) int { return http.StatusInternalServerError } -func (a *api) writeJSON(w http.ResponseWriter, r *http.Request, data interface{}, err error) { +func (a *api) writeResponse(w http.ResponseWriter, r *http.Request, data interface{}, err error) { if err != nil { log.Error().Str("path", r.URL.Path).Err(err).Msg("request failed") http.Error(w, err.Error(), httpCode(err)) return } + w.Header().Set("Content-Type", "application/json") enc := json.NewEncoder(w) err = enc.Encode(data) if err != nil { @@ -83,9 +84,9 @@ func decodeParams(d interface{}, r *http.Request) error { } dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - Metadata: nil, - Result: d, - TagName: "param", + Metadata: nil, + Result: d, + TagName: "param", WeaklyTypedInput: true, }) if err != nil { @@ -100,6 +101,7 @@ func (a *api) badReq(w http.ResponseWriter, err error) { } func (a *api) talkgroup(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() p := struct { System *int `param:"system"` ID *int `param:"id"` @@ -114,9 +116,12 @@ func (a *api) talkgroup(w http.ResponseWriter, r *http.Request) { var res interface{} switch { case p.System != nil && p.ID != nil: - res, err = a.tgs.TG(r.Context(), talkgroups.TG(*p.System, *p.ID)) + res, err = a.tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID)) case p.System != nil: + res, err = a.tgs.SystemTGs(ctx, int32(*p.System)) default: + res, err = a.tgs.TGs(ctx, nil) } - a.writeJSON(w, r, res, err) + + a.writeResponse(w, r, res, err) } diff --git a/pkg/database/extend.go b/pkg/database/extend.go new file mode 100644 index 0000000..3a96cd1 --- /dev/null +++ b/pkg/database/extend.go @@ -0,0 +1,11 @@ +package database + +func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetTalkgroup() Talkgroup { return d.Talkgroup } +func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetSystem() System { return d.System } +func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetLearned() bool { return d.Learned } +func (g GetTalkgroupsWithLearnedRow) GetTalkgroup() Talkgroup { return g.Talkgroup } +func (g GetTalkgroupsWithLearnedRow) GetSystem() System { return g.System } +func (g GetTalkgroupsWithLearnedRow) GetLearned() bool { return g.Learned } +func (g GetTalkgroupsWithLearnedBySystemRow) GetTalkgroup() Talkgroup { return g.Talkgroup } +func (g GetTalkgroupsWithLearnedBySystemRow) GetSystem() System { return g.System } +func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g.Learned } diff --git a/pkg/database/querier.go b/pkg/database/querier.go index 4bf2177..9d8d057 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -26,10 +26,12 @@ type Querier interface { GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) - GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) + GetTalkgroupsWithLearned(ctx context.Context) ([]GetTalkgroupsWithLearnedRow, error) + GetTalkgroupsWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsWithLearnedByPackedIDsRow, error) + GetTalkgroupsWithLearnedBySystem(ctx context.Context, system int32) ([]GetTalkgroupsWithLearnedBySystemRow, error) GetUserByID(ctx context.Context, id int32) (User, error) GetUserByUID(ctx context.Context, id int32) (User, error) GetUserByUsername(ctx context.Context, username string) (User, error) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index 8700be2..b32deb3 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -151,67 +151,6 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi return i, err } -const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many -SELECT -tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, -FALSE learned -FROM talkgroups tg -JOIN systems sys ON tg.system_id = sys.id -WHERE tg.id = ANY($1::INT8[]) -UNION -SELECT -tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, -tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, -CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, -TRUE, NULL::JSONB, 1.0, sys.id, sys.name, -TRUE learned -FROM talkgroups_learned tgl -JOIN systems sys ON tgl.system_id = sys.id -WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE -` - -type GetTalkgroupWithLearnedByPackedIDsRow struct { - Talkgroup Talkgroup `json:"talkgroup"` - System System `json:"system"` - Learned bool `json:"learned"` -} - -func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) { - rows, err := q.db.Query(ctx, getTalkgroupWithLearnedByPackedIDs, dollar_1) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetTalkgroupWithLearnedByPackedIDsRow - for rows.Next() { - var i GetTalkgroupWithLearnedByPackedIDsRow - if err := rows.Scan( - &i.Talkgroup.ID, - &i.Talkgroup.SystemID, - &i.Talkgroup.Tgid, - &i.Talkgroup.Name, - &i.Talkgroup.AlphaTag, - &i.Talkgroup.TgGroup, - &i.Talkgroup.Frequency, - &i.Talkgroup.Metadata, - &i.Talkgroup.Tags, - &i.Talkgroup.Alert, - &i.Talkgroup.AlertConfig, - &i.Talkgroup.Weight, - &i.System.ID, - &i.System.Name, - &i.Learned, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg JOIN systems sys ON tg.system_id = sys.id @@ -342,6 +281,188 @@ func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ( return items, nil } +const getTalkgroupsWithLearned = `-- name: GetTalkgroupsWithLearned :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE ignored IS NOT TRUE +` + +type GetTalkgroupsWithLearnedRow struct { + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` +} + +func (q *Queries) GetTalkgroupsWithLearned(ctx context.Context) ([]GetTalkgroupsWithLearnedRow, error) { + rows, err := q.db.Query(ctx, getTalkgroupsWithLearned) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTalkgroupsWithLearnedRow + for rows.Next() { + var i GetTalkgroupsWithLearnedRow + if err := rows.Scan( + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, + &i.Learned, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getTalkgroupsWithLearnedByPackedIDs = `-- name: GetTalkgroupsWithLearnedByPackedIDs :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.id = ANY($1::INT8[]) +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE +` + +type GetTalkgroupsWithLearnedByPackedIDsRow struct { + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` +} + +func (q *Queries) GetTalkgroupsWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsWithLearnedByPackedIDsRow, error) { + rows, err := q.db.Query(ctx, getTalkgroupsWithLearnedByPackedIDs, dollar_1) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTalkgroupsWithLearnedByPackedIDsRow + for rows.Next() { + var i GetTalkgroupsWithLearnedByPackedIDsRow + if err := rows.Scan( + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, + &i.Learned, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getTalkgroupsWithLearnedBySystem = `-- name: GetTalkgroupsWithLearnedBySystem :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.system_id = $1 +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tgl.system_id = $1 AND ignored IS NOT TRUE +` + +type GetTalkgroupsWithLearnedBySystemRow struct { + Talkgroup Talkgroup `json:"talkgroup"` + System System `json:"system"` + Learned bool `json:"learned"` +} + +func (q *Queries) GetTalkgroupsWithLearnedBySystem(ctx context.Context, system int32) ([]GetTalkgroupsWithLearnedBySystemRow, error) { + rows, err := q.db.Query(ctx, getTalkgroupsWithLearnedBySystem, system) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTalkgroupsWithLearnedBySystemRow + for rows.Next() { + var i GetTalkgroupsWithLearnedBySystemRow + if err := rows.Scan( + &i.Talkgroup.ID, + &i.Talkgroup.SystemID, + &i.Talkgroup.Tgid, + &i.Talkgroup.Name, + &i.Talkgroup.AlphaTag, + &i.Talkgroup.TgGroup, + &i.Talkgroup.Frequency, + &i.Talkgroup.Metadata, + &i.Talkgroup.Tags, + &i.Talkgroup.Alert, + &i.Talkgroup.AlertConfig, + &i.Talkgroup.Weight, + &i.System.ID, + &i.System.Name, + &i.Learned, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const setTalkgroupTags = `-- name: SetTalkgroupTags :exec UPDATE talkgroups SET tags = $3 WHERE id = systg2id($1, $2) diff --git a/pkg/database/talkgroups.sql_test.go b/pkg/database/talkgroups.sql_test.go index cc39c55..248b3dc 100644 --- a/pkg/database/talkgroups.sql_test.go +++ b/pkg/database/talkgroups.sql_test.go @@ -43,7 +43,46 @@ JOIN systems sys ON tgl.system_id = sys.id WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE ` +const getTalkgroupsWithLearnedBySystemTest = `-- name: GetTalkgroupsWithLearnedBySystem :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.system_id = $1 +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tg.system_id = $1 AND ignored IS NOT TRUE +` + +const getTalkgroupsWithLearnedTest = `-- name: GetTalkgroupsWithLearned :many +SELECT +tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name, +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE ignored IS NOT TRUE +` + func TestQueryColumnsMatch(t *testing.T) { - require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) + require.Equal(t, getTalkgroupsWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned) + require.Equal(t, getTalkgroupsWithLearnedBySystemTest, getTalkgroupsWithLearnedBySystem) + require.Equal(t, getTalkgroupsWithLearnedTest, getTalkgroupsWithLearned) } diff --git a/pkg/pb/stillbox.pb.go b/pkg/pb/stillbox.pb.go index c5b119c..64329b2 100644 --- a/pkg/pb/stillbox.pb.go +++ b/pkg/pb/stillbox.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.33.0 -// protoc v5.28.2 +// protoc v5.28.3 // source: stillbox.proto package pb diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/cache.go index dd7a6cb..d6a866f 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/cache.go @@ -24,6 +24,9 @@ type Store interface { // TGs retrieves many talkgroups from the Store. TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) + // SystemTGs retrieves all Talkgroups associated with a System. + SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error) + // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) @@ -131,33 +134,23 @@ func (t *cache) add(rec *Talkgroup) error { return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig) } -func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) *Talkgroup { +type row interface { + database.GetTalkgroupsWithLearnedByPackedIDsRow | database.GetTalkgroupsWithLearnedRow | + database.GetTalkgroupsWithLearnedBySystemRow + GetTalkgroup() database.Talkgroup + GetSystem() database.System + GetLearned() bool +} + +func rowToTalkgroup[T row](r T) *Talkgroup { return &Talkgroup{ - Talkgroup: r.Talkgroup, - System: r.System, - Learned: r.Learned, + Talkgroup: r.GetTalkgroup(), + System: r.GetSystem(), + Learned: r.GetLearned(), } } -func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) { - r := make([]*Talkgroup, 0, len(tgs)) - toGet := make(IDs, 0, len(tgs)) - t.RLock() - for _, id := range tgs { - rec, has := t.tgs[id] - if has { - r = append(r, rec) - } else { - toGet = append(toGet, id) - } - } - t.RUnlock() - - tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, toGet.Packed()) - if err != nil { - return nil, err - } - +func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) ([]*Talkgroup, error) { for _, rec := range tgRecords { tg := rowToTalkgroup(rec) err := t.add(tg) @@ -171,8 +164,40 @@ func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) { return r, nil } +func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) { + r := make([]*Talkgroup, 0, len(tgs)) + var err error + if tgs != nil { + toGet := make(IDs, 0, len(tgs)) + t.RLock() + for _, id := range tgs { + rec, has := t.tgs[id] + if has { + r = append(r, rec) + } else { + toGet = append(toGet, id) + } + } + t.RUnlock() + + tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, toGet.Packed()) + if err != nil { + return nil, err + } + return addToRowList(t, r, tgRecords) + } + + // get all talkgroups + + tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearned(ctx) + if err != nil { + return nil, err + } + return addToRowList(t, r, tgRecords) +} + func (t *cache) Load(ctx context.Context, tgs []int64) error { - tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) + tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, tgs) if err != nil { return err } @@ -203,6 +228,16 @@ func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 { return float64(m) } +func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error) { + recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySystem(ctx, systemID) + if err != nil { + return nil, err + } + + r := make([]*Talkgroup, 0, len(recs)) + return addToRowList(t, r, recs) +} + func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) { t.RLock() rec, has := t.tgs[tg] @@ -212,7 +247,7 @@ func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) { return rec, nil } - recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) + recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) switch err { case nil: case pgx.ErrNoRows: diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index a2ab786..d201d0e 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -51,7 +51,7 @@ FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE; --- name: GetTalkgroupWithLearnedByPackedIDs :many +-- name: GetTalkgroupsWithLearnedByPackedIDs :many SELECT sqlc.embed(tg), sqlc.embed(sys), FALSE learned @@ -69,5 +69,41 @@ FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE; +-- name: GetTalkgroupsWithLearnedBySystem :many +SELECT +sqlc.embed(tg), sqlc.embed(sys), +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +WHERE tg.system_id = @system +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE tgl.system_id = @system AND ignored IS NOT TRUE; + +-- name: GetTalkgroupsWithLearned :many +SELECT +sqlc.embed(tg), sqlc.embed(sys), +FALSE learned +FROM talkgroups tg +JOIN systems sys ON tg.system_id = sys.id +UNION +SELECT +tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name, +tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB, +CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END, +TRUE, NULL::JSONB, 1.0, sys.id, sys.name, +TRUE learned +FROM talkgroups_learned tgl +JOIN systems sys ON tgl.system_id = sys.id +WHERE ignored IS NOT TRUE; + + -- name: GetSystemName :one SELECT name FROM systems WHERE id = sqlc.arg(system_id); From 65cddbcc0da3bda129e23c5f91768dabd143797f Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 6 Nov 2024 14:20:15 -0500 Subject: [PATCH 3/6] Don't apply talkgroup weight twice Also tag ID fields --- pkg/alerting/alerting.go | 7 ++----- pkg/talkgroups/talkgroup.go | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index cf83f66..3222a3e 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -169,11 +169,8 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al for _, s := range as.scores { origScore := s.Score tgr, err := as.tgCache.TG(ctx, s.ID) - if err == nil { - if !tgr.Talkgroup.Alert { - continue - } - s.Score *= float64(tgr.Talkgroup.Weight) + if err == nil && !tgr.Talkgroup.Alert { + continue } if s.Score > as.cfg.AlertThreshold || testMode { diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index a988c24..a491661 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -13,8 +13,8 @@ type Talkgroup struct { } type ID struct { - System uint32 - Talkgroup uint32 + System uint32 `json:"sys"` + Talkgroup uint32 `json:"tg"` } type IDs []ID From c1a258f63ce72585424bdab5dbcd1191d52dc6f9 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 6 Nov 2024 20:13:46 -0500 Subject: [PATCH 4/6] Change login route, add form --- cmd/calls/main.go | 2 +- pkg/auth/auth.go | 20 ++++++++++++++++++++ pkg/auth/jwt.go | 8 -------- pkg/auth/login.html | 17 +++++++++++++++++ 4 files changed, 38 insertions(+), 9 deletions(-) create mode 100644 pkg/auth/login.html diff --git a/cmd/calls/main.go b/cmd/calls/main.go index 03ee2b0..c81258f 100644 --- a/cmd/calls/main.go +++ b/cmd/calls/main.go @@ -57,7 +57,7 @@ func main() { loginForm.Add("username", *username) loginForm.Add("password", *password) - loginReq, err := http.NewRequest("POST", "http"+secureSuffix()+"://"+*addr+"/login", strings.NewReader(loginForm.Encode())) + loginReq, err := http.NewRequest("POST", "http"+secureSuffix()+"://"+*addr+"/api/login", strings.NewReader(loginForm.Encode())) if err != nil { log.Fatal(err) } diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go index 5184aae..817d302 100644 --- a/pkg/auth/auth.go +++ b/pkg/auth/auth.go @@ -4,7 +4,10 @@ import ( "errors" "net/http" + _ "embed" + "dynatron.me/x/stillbox/pkg/config" + "github.com/go-chi/chi/v5" "github.com/go-chi/jwtauth/v5" ) @@ -66,3 +69,20 @@ func ErrorResponse(w http.ResponseWriter, err error) { http.Error(w, err.Error(), http.StatusInternalServerError) } } + +func (a *Auth) PublicRoutes(r chi.Router) { + r.Post("/api/login", a.routeAuth) + r.Get("/api/login", a.routeLogin) +} + +func (a *Auth) PrivateRoutes(r chi.Router) { + r.Get("/refresh", a.routeRefresh) +} + +//go:embed login.html +var loginPage []byte + +func (a *Auth) routeLogin(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "text/html") + _, _ = w.Write(loginPage) +} diff --git a/pkg/auth/jwt.go b/pkg/auth/jwt.go index ea65a92..6241805 100644 --- a/pkg/auth/jwt.go +++ b/pkg/auth/jwt.go @@ -110,14 +110,6 @@ func (a *Auth) newToken(uid int32) string { return tokenString } -func (a *Auth) PublicRoutes(r chi.Router) { - r.Post("/login", a.routeAuth) -} - -func (a *Auth) PrivateRoutes(r chi.Router) { - r.Get("/refresh", a.routeRefresh) -} - func (a *Auth) allowInsecureCookie(r *http.Request) bool { host := strings.Split(r.Host, ":") v, has := a.cfg.AllowInsecure[host[0]] diff --git a/pkg/auth/login.html b/pkg/auth/login.html new file mode 100644 index 0000000..eeb9783 --- /dev/null +++ b/pkg/auth/login.html @@ -0,0 +1,17 @@ + + + + Login + + +
+
+ + + + + +
+
+ + From c4df60a89a6639332c22fdb682819fd7a247ad99 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 6 Nov 2024 20:17:13 -0500 Subject: [PATCH 5/6] Change ws route --- cmd/calls/main.go | 2 +- pkg/nexus/websocket.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/calls/main.go b/cmd/calls/main.go index c81258f..9be38b3 100644 --- a/cmd/calls/main.go +++ b/cmd/calls/main.go @@ -87,7 +87,7 @@ func main() { log.Fatal(err) } - u := url.URL{Scheme: "ws" + secureSuffix(), Host: *addr, Path: "/ws"} + u := url.URL{Scheme: "ws" + secureSuffix(), Host: *addr, Path: "/api/ws"} log.Printf("connecting to %s", u.String()) dialer := websocket.Dialer{ diff --git a/pkg/nexus/websocket.go b/pkg/nexus/websocket.go index e69c8a2..886d051 100644 --- a/pkg/nexus/websocket.go +++ b/pkg/nexus/websocket.go @@ -183,5 +183,5 @@ func (conn *wsConn) writeToClient(w io.WriteCloser, msg ToClient) { } func (n *wsManager) PrivateRoutes(r chi.Router) { - r.HandleFunc("/ws", n.serveWS) + r.HandleFunc("/api/ws", n.serveWS) } From 3f133e152a9a10f64a2a8990f094ffb4117e7dd6 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 6 Nov 2024 20:47:10 -0500 Subject: [PATCH 6/6] UUID is part of Call and is generated by us. Closes #8 --- pkg/calls/call.go | 4 ++ pkg/database/calls.sql.go | 67 +++++++++++++++++++++------------- pkg/database/querier.go | 2 +- pkg/pb/stillbox.pb.go | 59 +++++++++++++++++------------- pkg/pb/stillbox.proto | 25 +++++++------ pkg/sinks/database.go | 5 ++- sql/postgres/queries/calls.sql | 57 +++++++++++++++++++---------- 7 files changed, 134 insertions(+), 85 deletions(-) diff --git a/pkg/calls/call.go b/pkg/calls/call.go index 9b8e78d..d7d8c0e 100644 --- a/pkg/calls/call.go +++ b/pkg/calls/call.go @@ -9,6 +9,7 @@ import ( "dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/talkgroups" + "github.com/google/uuid" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -32,6 +33,7 @@ func (d CallDuration) Seconds() int32 { } type Call struct { + ID uuid.UUID Audio []byte AudioName string AudioType string @@ -68,6 +70,7 @@ func Make(call *Call, dontStore bool) (*Call, error) { } call.shouldStore = dontStore + call.ID = uuid.New() return call, nil } @@ -92,6 +95,7 @@ func toInt32Slice(s []int) []int32 { func (c *Call) ToPB() *pb.Call { return &pb.Call{ + Id: c.ID.String(), AudioName: c.AudioName, AudioType: c.AudioType, DateTime: timestamppb.New(c.DateTime), diff --git a/pkg/database/calls.sql.go b/pkg/database/calls.sql.go index 73f3f71..cfed6e7 100644 --- a/pkg/database/calls.sql.go +++ b/pkg/database/calls.sql.go @@ -52,30 +52,48 @@ func (q *Queries) AddAlert(ctx context.Context, arg AddAlertParams) error { return err } -const addCall = `-- name: AddCall :one +const addCall = `-- name: AddCall :exec INSERT INTO calls ( - id, - submitter, - system, - talkgroup, - call_date, - audio_name, - audio_blob, - audio_type, - audio_url, - duration, - frequency, - frequencies, - patches, - tg_label, - tg_alpha_tag, - tg_group, - source - ) VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) -RETURNING id +id, +submitter, +system, +talkgroup, +call_date, +audio_name, +audio_blob, +audio_type, +audio_url, +duration, +frequency, +frequencies, +patches, +tg_label, +tg_alpha_tag, +tg_group, +source +) VALUES ( +$1, +$2, +$3, +$4, +$5, +$6, +$7, +$8, +$9, +$10, +$11, +$12, +$13, +$14, +$15, +$16, +$17 +) ` type AddCallParams struct { + ID uuid.UUID `json:"id"` Submitter *int32 `json:"submitter"` System int `json:"system"` Talkgroup int `json:"talkgroup"` @@ -94,8 +112,9 @@ type AddCallParams struct { Source int `json:"source"` } -func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, error) { - row := q.db.QueryRow(ctx, addCall, +func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) error { + _, err := q.db.Exec(ctx, addCall, + arg.ID, arg.Submitter, arg.System, arg.Talkgroup, @@ -113,9 +132,7 @@ func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, er arg.TgGroup, arg.Source, ) - var id uuid.UUID - err := row.Scan(&id) - return id, err + return err } const getDatabaseSize = `-- name: GetDatabaseSize :one diff --git a/pkg/database/querier.go b/pkg/database/querier.go index 9d8d057..ba1ace1 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -13,7 +13,7 @@ import ( type Querier interface { AddAlert(ctx context.Context, arg AddAlertParams) error - AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, error) + AddCall(ctx context.Context, arg AddCallParams) error BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []string) error CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error) diff --git a/pkg/pb/stillbox.pb.go b/pkg/pb/stillbox.pb.go index 64329b2..8d29825 100644 --- a/pkg/pb/stillbox.pb.go +++ b/pkg/pb/stillbox.pb.go @@ -288,18 +288,19 @@ type Call struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - AudioName string `protobuf:"bytes,1,opt,name=audioName,proto3" json:"audioName,omitempty"` - AudioType string `protobuf:"bytes,2,opt,name=audioType,proto3" json:"audioType,omitempty"` - DateTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=date_time,json=dateTime,proto3" json:"date_time,omitempty"` - System int32 `protobuf:"varint,4,opt,name=system,proto3" json:"system,omitempty"` - Talkgroup int32 `protobuf:"varint,5,opt,name=talkgroup,proto3" json:"talkgroup,omitempty"` - Source int32 `protobuf:"varint,6,opt,name=source,proto3" json:"source,omitempty"` - Frequency int64 `protobuf:"varint,7,opt,name=frequency,proto3" json:"frequency,omitempty"` - Frequencies []int64 `protobuf:"varint,8,rep,packed,name=frequencies,proto3" json:"frequencies,omitempty"` - Patches []int32 `protobuf:"varint,9,rep,packed,name=patches,proto3" json:"patches,omitempty"` - Sources []int32 `protobuf:"varint,10,rep,packed,name=sources,proto3" json:"sources,omitempty"` - Duration *int32 `protobuf:"varint,11,opt,name=duration,proto3,oneof" json:"duration,omitempty"` - Audio []byte `protobuf:"bytes,12,opt,name=audio,proto3" json:"audio,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + AudioName string `protobuf:"bytes,2,opt,name=audioName,proto3" json:"audioName,omitempty"` + AudioType string `protobuf:"bytes,3,opt,name=audioType,proto3" json:"audioType,omitempty"` + DateTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=date_time,json=dateTime,proto3" json:"date_time,omitempty"` + System int32 `protobuf:"varint,5,opt,name=system,proto3" json:"system,omitempty"` + Talkgroup int32 `protobuf:"varint,6,opt,name=talkgroup,proto3" json:"talkgroup,omitempty"` + Source int32 `protobuf:"varint,7,opt,name=source,proto3" json:"source,omitempty"` + Frequency int64 `protobuf:"varint,8,opt,name=frequency,proto3" json:"frequency,omitempty"` + Frequencies []int64 `protobuf:"varint,9,rep,packed,name=frequencies,proto3" json:"frequencies,omitempty"` + Patches []int32 `protobuf:"varint,10,rep,packed,name=patches,proto3" json:"patches,omitempty"` + Sources []int32 `protobuf:"varint,11,rep,packed,name=sources,proto3" json:"sources,omitempty"` + Duration *int32 `protobuf:"varint,12,opt,name=duration,proto3,oneof" json:"duration,omitempty"` + Audio []byte `protobuf:"bytes,13,opt,name=audio,proto3" json:"audio,omitempty"` } func (x *Call) Reset() { @@ -334,6 +335,13 @@ func (*Call) Descriptor() ([]byte, []int) { return file_stillbox_proto_rawDescGZIP(), []int{2} } +func (x *Call) GetId() string { + if x != nil { + return x.Id + } + return "" +} + func (x *Call) GetAudioName() string { if x != nil { return x.AudioName @@ -1187,29 +1195,30 @@ var file_stillbox_proto_rawDesc = []byte{ 0x6f, 0x75, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x06, 0x74, 0x67, 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x12, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x5f, 0x69, 0x64, 0x22, 0x81, 0x03, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x1c, 0x0a, - 0x09, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x64, 0x5f, 0x69, 0x64, 0x22, 0x91, 0x03, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, 0x0a, + 0x09, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x61, - 0x75, 0x64, 0x69, 0x6f, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x75, 0x64, 0x69, 0x6f, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x79, 0x70, 0x65, 0x12, 0x37, 0x0a, 0x09, 0x64, 0x61, 0x74, - 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x61, 0x74, 0x65, 0x54, 0x69, - 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x04, 0x20, 0x01, + 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x61, - 0x6c, 0x6b, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x74, + 0x6c, 0x6b, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x74, 0x61, 0x6c, 0x6b, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x07, 0x20, + 0x63, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x12, 0x1c, 0x0a, 0x09, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x20, - 0x0a, 0x0b, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73, 0x18, 0x08, 0x20, + 0x0a, 0x0b, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73, 0x18, 0x09, 0x20, 0x03, 0x28, 0x03, 0x52, 0x0b, 0x66, 0x72, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73, - 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x73, 0x18, 0x09, 0x20, 0x03, 0x28, + 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x05, 0x52, 0x07, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x05, 0x52, 0x07, 0x73, 0x6f, 0x75, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x05, 0x52, 0x07, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x1f, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x18, 0x0c, + 0x18, 0x0c, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x3e, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x35, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x6e, 0x66, 0x6f, diff --git a/pkg/pb/stillbox.proto b/pkg/pb/stillbox.proto index 99b2bdb..815c9b3 100644 --- a/pkg/pb/stillbox.proto +++ b/pkg/pb/stillbox.proto @@ -24,18 +24,19 @@ message CommandResponse { } message Call { - string audioName = 1; - string audioType = 2; - google.protobuf.Timestamp date_time = 3; - int32 system = 4; - int32 talkgroup = 5; - int32 source = 6; - int64 frequency = 7; - repeated int64 frequencies = 8; - repeated int32 patches = 9; - repeated int32 sources = 10; - optional int32 duration = 11; - bytes audio = 12; + string id = 1; + string audioName = 2; + string audioType = 3; + google.protobuf.Timestamp date_time = 4; + int32 system = 5; + int32 talkgroup = 6; + int32 source = 7; + int64 frequency = 8; + repeated int64 frequencies = 9; + repeated int32 patches = 10; + repeated int32 sources = 11; + optional int32 duration = 12; + bytes audio = 13; } message Hello { diff --git a/pkg/sinks/database.go b/pkg/sinks/database.go index 2cf9d60..61ff7c0 100644 --- a/pkg/sinks/database.go +++ b/pkg/sinks/database.go @@ -26,12 +26,12 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { return nil } - dbCall, err := s.db.AddCall(ctx, s.toAddCallParams(call)) + err := s.db.AddCall(ctx, s.toAddCallParams(call)) if err != nil { return fmt.Errorf("add call: %w", err) } - log.Debug().Str("id", dbCall.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("stored") + log.Debug().Str("id", call.ID.String()).Int("system", call.System).Int("tgid", call.Talkgroup).Msg("stored") return nil } @@ -42,6 +42,7 @@ func (s *DatabaseSink) SinkType() string { func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams { return database.AddCallParams{ + ID: call.ID, Submitter: call.Submitter.Int32Ptr(), System: call.System, Talkgroup: call.Talkgroup, diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index 9f4ff97..7781ee3 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -1,24 +1,41 @@ --- name: AddCall :one +-- name: AddCall :exec INSERT INTO calls ( - id, - submitter, - system, - talkgroup, - call_date, - audio_name, - audio_blob, - audio_type, - audio_url, - duration, - frequency, - frequencies, - patches, - tg_label, - tg_alpha_tag, - tg_group, - source - ) VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) -RETURNING id; +id, +submitter, +system, +talkgroup, +call_date, +audio_name, +audio_blob, +audio_type, +audio_url, +duration, +frequency, +frequencies, +patches, +tg_label, +tg_alpha_tag, +tg_group, +source +) VALUES ( +@id, +@submitter, +@system, +@talkgroup, +@call_date, +@audio_name, +@audio_blob, +@audio_type, +@audio_url, +@duration, +@frequency, +@frequencies, +@patches, +@tg_label, +@tg_alpha_tag, +@tg_group, +@source +); -- name: SetCallTranscript :exec UPDATE calls SET transcript = $2 WHERE id = $1;