Compare commits

...

5 commits

Author SHA1 Message Date
e7d59ed78c Merge pull request 'putTalkgroups' (#41) from putTalkgroups into trunk
Reviewed-on: #41
2024-11-20 11:39:48 -05:00
2872f1d437 Lint 2024-11-20 11:07:58 -05:00
da73227c79 Talkgroup bulk upsert call, name improvements 2024-11-20 09:37:57 -05:00
89446b8a58 fix test 2024-11-20 07:48:03 -05:00
f909723f7d wip pre-batch 2024-11-20 07:26:59 -05:00
18 changed files with 379 additions and 52 deletions

View file

@ -35,5 +35,6 @@ func main() {
cmds := append([]*cobra.Command{serve.Command(cfg)}, admin.Command(cfg)...) cmds := append([]*cobra.Command{serve.Command(cfg)}, admin.Command(cfg)...)
rootCmd.AddCommand(cmds...) rootCmd.AddCommand(cmds...)
rootCmd.Execute() // cobra is already checking for errors and will print them
_ = rootCmd.Execute()
} }

View file

@ -3,7 +3,6 @@ package alert
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"time" "time"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
@ -56,15 +55,7 @@ func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talk
switch err { switch err {
case nil: case nil:
d.Weight = tgRecord.Talkgroup.Weight d.Weight = tgRecord.Talkgroup.Weight
if tgRecord.System.Name == "" { d.TGName = tgRecord.String()
tgRecord.System.Name = strconv.Itoa(int(score.ID.System))
}
if tgRecord.Talkgroup.Name != nil {
d.TGName = fmt.Sprintf("%s %s [%d]", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup))
}
default: default:
system, has := store.SystemName(ctx, int(score.ID.System)) system, has := store.SystemName(ctx, int(score.ID.System))
if has { if has {

View file

@ -198,7 +198,6 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]al
} }
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
alerts := make([]alert.Alert, 0, len(as.scores))
ctx := r.Context() ctx := r.Context()
alerts, err := as.eval(ctx, time.Now(), true) alerts, err := as.eval(ctx, time.Now(), true)

View file

@ -86,7 +86,7 @@
{{ $tg := (index $.TGs .ID) }} {{ $tg := (index $.TGs .ID) }}
<tr> <tr>
<td>{{ $tg.System.Name}}</td> <td>{{ $tg.System.Name}}</td>
<td>{{ $tg.Talkgroup.Name}}</td> <td>{{ $tg.Talkgroup }}</td>
<td>{{ .ID.Talkgroup }}</td> <td>{{ .ID.Talkgroup }}</td>
<td>{{ f .Count 0 }}</td> <td>{{ f .Count 0 }}</td>
<td>{{ f .RecentCount 0 }}</td> <td>{{ f .RecentCount 0 }}</td>

132
pkg/database/batch.go Normal file
View file

@ -0,0 +1,132 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
// source: batch.go
package database
import (
"context"
"errors"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/alerting/rules"
"github.com/jackc/pgx/v5"
)
var (
ErrBatchAlreadyClosed = errors.New("batch already closed")
)
const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE($3, tg.name),
alpha_tag = COALESCE($4, tg.alpha_tag),
tg_group = COALESCE($5, tg.tg_group),
frequency = COALESCE($6, tg.frequency),
metadata = COALESCE($7, tg.metadata),
tags = COALESCE($8, tg.tags),
alert = COALESCE($9, tg.alert),
alert_config = COALESCE($10, tg.alert_config),
weight = COALESCE($11, tg.weight),
learned = COALESCE($12, tg.learned)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
`
type UpsertTalkgroupBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type UpsertTalkgroupParams struct {
SystemID int32 `json:"system_id"`
TGID int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TGGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata jsontypes.Metadata `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
}
func (q *Queries) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.SystemID,
a.TGID,
a.Name,
a.AlphaTag,
a.TGGroup,
a.Frequency,
a.Metadata,
a.Tags,
a.Alert,
a.AlertConfig,
a.Weight,
a.Learned,
}
batch.Queue(upsertTalkgroup, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &UpsertTalkgroupBatchResults{br, len(arg), false}
}
func (b *UpsertTalkgroupBatchResults) QueryRow(f func(int, Talkgroup, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
var i Talkgroup
if b.closed {
if f != nil {
f(t, i, ErrBatchAlreadyClosed)
}
continue
}
row := b.br.QueryRow()
err := row.Scan(
&i.ID,
&i.SystemID,
&i.TGID,
&i.Name,
&i.AlphaTag,
&i.TGGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Learned,
)
if f != nil {
f(t, i, err)
}
}
}
func (b *UpsertTalkgroupBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View file

@ -43,6 +43,7 @@ func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOp
return fmt.Errorf("Tx begin: %w", err) return fmt.Errorf("Tx begin: %w", err)
} }
//nolint:errcheck
defer tx.Rollback(ctx) defer tx.Rollback(ctx)
dbtx := &Database{Pool: db.Pool, Queries: db.Queries.WithTx(tx)} dbtx := &Database{Pool: db.Pool, Queries: db.Queries.WithTx(tx)}

View file

@ -15,6 +15,7 @@ type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row QueryRow(context.Context, string, ...interface{}) pgx.Row
SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
} }
func New(db DBTX) *Queries { func New(db DBTX) *Queries {

View file

@ -1,5 +1,9 @@
package database package database
import (
"strconv"
)
func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup } func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup }
func (d GetTalkgroupsRow) GetSystem() System { return d.System } func (d GetTalkgroupsRow) GetSystem() System { return d.System }
func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned } func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned }
@ -15,3 +19,18 @@ func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g
func (g Talkgroup) GetTalkgroup() Talkgroup { return g } func (g Talkgroup) GetTalkgroup() Talkgroup { return g }
func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} } func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} }
func (g Talkgroup) GetLearned() bool { return false } func (g Talkgroup) GetLearned() bool { return false }
func (g Talkgroup) String() string {
switch {
case g.AlphaTag != nil:
return *g.AlphaTag
case g.Name != nil && g.TGGroup != nil:
return *g.TGGroup + " " + *g.Name
case g.Name != nil:
return *g.Name + " [" + strconv.Itoa(int(g.TGID)) + "]"
case g.TGGroup != nil:
return *g.TGGroup + " [" + strconv.Itoa(int(g.TGID)) + "]"
}
return strconv.Itoa(int(g.TGID))
}

View file

@ -1771,6 +1771,55 @@ func (_c *Store_UpdateTalkgroup_Call) RunAndReturn(run func(context.Context, dat
return _c return _c
} }
// UpsertTalkgroup provides a mock function with given fields: ctx, arg
func (_m *Store) UpsertTalkgroup(ctx context.Context, arg []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults {
ret := _m.Called(ctx, arg)
if len(ret) == 0 {
panic("no return value specified for UpsertTalkgroup")
}
var r0 *database.UpsertTalkgroupBatchResults
if rf, ok := ret.Get(0).(func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults); ok {
r0 = rf(ctx, arg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*database.UpsertTalkgroupBatchResults)
}
}
return r0
}
// Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup'
type Store_UpsertTalkgroup_Call struct {
*mock.Call
}
// UpsertTalkgroup is a helper method to define mock.On call
// - ctx context.Context
// - arg []database.UpsertTalkgroupParams
func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call {
return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)}
}
func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg []database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]database.UpsertTalkgroupParams))
})
return _c
}
func (_c *Store_UpsertTalkgroup_Call) Return(_a0 *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(run)
return _c
}
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value. // The first argument is typically a *testing.T value.
func NewStore(t interface { func NewStore(t interface {

View file

@ -39,6 +39,7 @@ type Querier interface {
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
UpdatePassword(ctx context.Context, username string, password string) error UpdatePassword(ctx context.Context, username string, password string) error
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults
} }
var _ Querier = (*Queries)(nil) var _ Querier = (*Queries)(nil)

View file

@ -57,7 +57,7 @@ INSERT INTO talkgroups (
) VALUES( ) VALUES(
$1, $1,
$2, $2,
't' TRUE
) )
` `
@ -419,8 +419,9 @@ SET
tags = COALESCE($6, tags), tags = COALESCE($6, tags),
alert = COALESCE($7, alert), alert = COALESCE($7, alert),
alert_config = COALESCE($8, alert_config), alert_config = COALESCE($8, alert_config),
weight = COALESCE($9, weight) weight = COALESCE($9, weight),
WHERE id = $10 OR (system_id = $11 AND tgid = $12) learned = COALESCE($10, learned)
WHERE id = $11 OR (system_id = $12 AND tgid = $13)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
` `
@ -434,6 +435,7 @@ type UpdateTalkgroupParams struct {
Alert *bool `json:"alert"` Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"` AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"` Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
ID *int32 `json:"id"` ID *int32 `json:"id"`
SystemID *int32 `json:"system_id"` SystemID *int32 `json:"system_id"`
TGID *int32 `json:"tgid"` TGID *int32 `json:"tgid"`
@ -450,6 +452,7 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams
arg.Alert, arg.Alert,
arg.AlertConfig, arg.AlertConfig,
arg.Weight, arg.Weight,
arg.Learned,
arg.ID, arg.ID,
arg.SystemID, arg.SystemID,
arg.TGID, arg.TGID,

View file

@ -42,7 +42,6 @@ type errResponse struct {
func (e *errResponse) Render(w http.ResponseWriter, r *http.Request) error { func (e *errResponse) Render(w http.ResponseWriter, r *http.Request) error {
switch e.Code { switch e.Code {
case http.StatusNotFound:
default: default:
log.Error().Str("path", r.URL.Path).Err(e.Err).Int("code", e.Code).Str("msg", e.Error).Msg("request failed") log.Error().Str("path", r.URL.Path).Err(e.Err).Int("code", e.Code).Str("msg", e.Error).Msg("request failed")
} }
@ -71,7 +70,7 @@ func recordNotFound(err error) render.Renderer {
func internalError(err error) render.Renderer { func internalError(err error) render.Renderer {
return &errResponse{ return &errResponse{
Err: err, Err: err,
Code: http.StatusNotFound, Code: http.StatusInternalServerError,
Error: "Internal server error", Error: "Internal server error",
} }
} }

View file

@ -17,9 +17,10 @@ type talkgroupAPI struct {
func (tga *talkgroupAPI) Subrouter() http.Handler { func (tga *talkgroupAPI) Subrouter() http.Handler {
r := chi.NewMux() r := chi.NewMux()
r.Get("/{system:\\d+}/{id:\\d+}", tga.get) r.Get(`/{system:\d+}/{id:\d+}`, tga.get)
r.Put("/{system:\\d+}/{id:\\d+}", tga.put) r.Put(`/{system:\d+}/{id:\d+}`, tga.put)
r.Get("/{system:\\d+}/", tga.get) r.Put(`/{system:\d+}`, tga.putTalkgroups)
r.Get(`/{system:\d+}/`, tga.get)
r.Get("/", tga.get) r.Get("/", tga.get)
r.Post("/import", tga.tgImport) r.Post("/import", tga.tgImport)
@ -31,7 +32,7 @@ type tgParams struct {
ID *int `param:"id"` ID *int `param:"id"`
} }
func (t tgParams) haveBoth() bool { func (t tgParams) hasBoth() bool {
return t.System != nil && t.ID != nil return t.System != nil && t.ID != nil
} }
@ -64,11 +65,12 @@ func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) {
var res interface{} var res interface{}
switch { switch {
case p.System != nil && p.ID != nil: case p.hasBoth():
res, err = tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID)) res, err = tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID))
case p.System != nil: case p.System != nil:
res, err = tgs.SystemTGs(ctx, int32(*p.System)) res, err = tgs.SystemTGs(ctx, int32(*p.System))
default: default:
// get all talkgroups
res, err = tgs.TGs(ctx, nil) res, err = tgs.TGs(ctx, nil)
} }
@ -99,6 +101,8 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) {
return return
} }
input.Learned = nil // ignore for this call
record, err := tgs.UpdateTG(ctx, input) record, err := tgs.UpdateTG(ctx, input)
if err != nil { if err != nil {
wErr(w, r, autoError(err)) wErr(w, r, autoError(err))
@ -123,3 +127,36 @@ func (tga *talkgroupAPI) tgImport(w http.ResponseWriter, r *http.Request) {
respond(w, r, recs) respond(w, r, recs)
} }
func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) {
var id tgParams
err := decodeParams(&id, r)
if err != nil {
wErr(w, r, badRequest(err))
return
}
if id.System == nil { // don't think this would ever happen
wErr(w, r, badRequest(talkgroups.ErrNoSuchSystem))
return
}
ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx)
var input []database.UpsertTalkgroupParams
err = forms.Unmarshal(r, &input, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty())
if err != nil {
wErr(w, r, badRequest(err))
return
}
record, err := tgs.UpsertTGs(ctx, *id.System, input)
if err != nil {
wErr(w, r, autoError(err))
return
}
respond(w, r, record)
}

View file

@ -110,6 +110,7 @@ func (rr *radioReferenceImporter) importTalkgroups(ctx context.Context, sys int,
gn := groupName // must take a copy gn := groupName // must take a copy
tgs = append(tgs, talkgroups.Talkgroup{ tgs = append(tgs, talkgroups.Talkgroup{
Talkgroup: database.Talkgroup{ Talkgroup: database.Talkgroup{
ID: len(tgs), // need unique ID for the UI to track
TGID: int32(tgt.Talkgroup), TGID: int32(tgt.Talkgroup),
SystemID: int32(tgt.System), SystemID: int32(tgt.System),
Name: &fields[4], Name: &fields[4],

File diff suppressed because one or more lines are too long

View file

@ -3,9 +3,11 @@ package talkgroups
import ( import (
"context" "context"
"errors" "errors"
"strings"
"sync" "sync"
"time" "time"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
@ -24,6 +26,9 @@ type Store interface {
// UpdateTG updates a talkgroup record. // UpdateTG updates a talkgroup record.
UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error) UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error)
// UpsertTGs upserts a slice of talkgroups.
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error)
// TG retrieves a Talkgroup from the Store. // TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (*Talkgroup, error) TG(ctx context.Context, tg ID) (*Talkgroup, error)
@ -124,15 +129,13 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error {
return nil return nil
} }
func (t *cache) add(rec *Talkgroup) error { func (t *cache) add(rec *Talkgroup) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
tg := TG(rec.System.ID, rec.Talkgroup.TGID) tg := TG(rec.System.ID, rec.Talkgroup.TGID)
t.tgs[tg] = rec t.tgs[tg] = rec
t.systems[int32(rec.System.ID)] = rec.System.Name t.systems[int32(rec.System.ID)] = rec.System.Name
return nil
} }
type row interface { type row interface {
@ -151,18 +154,15 @@ func rowToTalkgroup[T row](r T) *Talkgroup {
} }
} }
func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) ([]*Talkgroup, error) { func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) []*Talkgroup {
for _, rec := range tgRecords { for _, rec := range tgRecords {
tg := rowToTalkgroup(rec) tg := rowToTalkgroup(rec)
err := t.add(tg) t.add(tg)
if err != nil {
return nil, err
}
r = append(r, tg) r = append(r, tg)
} }
return r, nil return r
} }
func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) { func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
@ -185,7 +185,7 @@ func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return addToRowList(t, r, tgRecords) return addToRowList(t, r, tgRecords), nil
} }
// get all talkgroups // get all talkgroups
@ -194,7 +194,7 @@ func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return addToRowList(t, r, tgRecords) return addToRowList(t, r, tgRecords), nil
} }
func (t *cache) Load(ctx context.Context, tgs database.TGTuples) error { func (t *cache) Load(ctx context.Context, tgs database.TGTuples) error {
@ -204,11 +204,7 @@ func (t *cache) Load(ctx context.Context, tgs database.TGTuples) error {
} }
for _, rec := range tgRecords { for _, rec := range tgRecords {
err := t.add(rowToTalkgroup(rec)) t.add(rowToTalkgroup(rec))
if err != nil {
log.Error().Err(err).Msg("add alert config fail")
}
} }
return nil return nil
@ -234,7 +230,7 @@ func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, er
} }
r := make([]*Talkgroup, 0, len(recs)) r := make([]*Talkgroup, 0, len(recs))
return addToRowList(t, r, recs) return addToRowList(t, r, recs), nil
} }
func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) { func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
@ -256,11 +252,7 @@ func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
return nil, errors.Join(ErrNotFound, err) return nil, errors.Join(ErrNotFound, err)
} }
err = t.add(rowToTalkgroup(record)) t.add(rowToTalkgroup(record))
if err != nil {
log.Error().Err(err).Msg("TG() cache add")
return rowToTalkgroup(record), errors.Join(ErrNotFound, err)
}
return rowToTalkgroup(record), nil return rowToTalkgroup(record), nil
} }
@ -305,3 +297,64 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
return record, nil return record, nil
} }
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) {
db := database.FromCtx(ctx)
sysName, hasSys := t.SystemName(ctx, system)
if !hasSys {
return nil, ErrNoSuchSystem
}
sys := database.System{
ID: system,
Name: sysName,
}
tgs := make([]*Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error {
for i := range input {
// normalize tags
for j, tag := range input[i].Tags {
input[i].Tags[j] = strings.ToLower(tag)
}
input[i].SystemID = int32(system)
input[i].Learned = common.PtrTo(false)
}
var oerr error
batch := db.UpsertTalkgroup(ctx, input)
defer batch.Close()
batch.QueryRow(func(_ int, r database.Talkgroup, err error) {
if err != nil {
oerr = err
return
}
tgs = append(tgs, &Talkgroup{
Talkgroup: r,
System: sys,
Learned: r.Learned,
})
})
if oerr != nil {
return oerr
}
return nil
}, pgx.TxOptions{})
if err != nil {
return nil, err
}
// update the cache
for _, tg := range tgs {
t.add(tg)
}
return tgs, nil
}

View file

@ -2,6 +2,7 @@ package talkgroups
import ( import (
"fmt" "fmt"
"strconv"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
) )
@ -12,13 +13,20 @@ type Talkgroup struct {
Learned bool `json:"learned"` Learned bool `json:"learned"`
} }
type Metadata map[string]interface{} func (t Talkgroup) String() string {
if t.System.Name == "" {
t.System.Name = strconv.Itoa(int(t.Talkgroup.TGID))
}
type Names struct { if t.Talkgroup.Name != nil || t.Talkgroup.TGGroup != nil || t.Talkgroup.AlphaTag != nil {
System string return t.System.Name + " " + t.Talkgroup.String()
Talkgroup string }
return fmt.Sprintf("%s:%d", t.System.Name, int(t.Talkgroup.TGID))
} }
type Metadata map[string]interface{}
type ID struct { type ID struct {
System uint32 `json:"sys"` System uint32 `json:"sys"`
Talkgroup uint32 `json:"tg"` Talkgroup uint32 `json:"tg"`

View file

@ -86,10 +86,42 @@ SET
tags = COALESCE(sqlc.narg('tags'), tags), tags = COALESCE(sqlc.narg('tags'), tags),
alert = COALESCE(sqlc.narg('alert'), alert), alert = COALESCE(sqlc.narg('alert'), alert),
alert_config = COALESCE(sqlc.narg('alert_config'), alert_config), alert_config = COALESCE(sqlc.narg('alert_config'), alert_config),
weight = COALESCE(sqlc.narg('weight'), weight) weight = COALESCE(sqlc.narg('weight'), weight),
learned = COALESCE(sqlc.narg('learned'), learned)
WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid')) WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid'))
RETURNING *; RETURNING *;
-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
@system_id,
@tgid,
sqlc.narg('name'),
sqlc.narg('alpha_tag'),
sqlc.narg('tg_group'),
sqlc.narg('frequency'),
sqlc.narg('metadata'),
sqlc.narg('tags'),
sqlc.narg('alert'),
sqlc.narg('alert_config'),
sqlc.narg('weight'),
sqlc.narg('learned')
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE(sqlc.narg('name'), tg.name),
alpha_tag = COALESCE(sqlc.narg('alpha_tag'), tg.alpha_tag),
tg_group = COALESCE(sqlc.narg('tg_group'), tg.tg_group),
frequency = COALESCE(sqlc.narg('frequency'), tg.frequency),
metadata = COALESCE(sqlc.narg('metadata'), tg.metadata),
tags = COALESCE(sqlc.narg('tags'), tg.tags),
alert = COALESCE(sqlc.narg('alert'), tg.alert),
alert_config = COALESCE(sqlc.narg('alert_config'), tg.alert_config),
weight = COALESCE(sqlc.narg('weight'), tg.weight),
learned = COALESCE(sqlc.narg('learned'), tg.learned)
RETURNING *;
-- name: AddTalkgroupWithLearnedFlag :exec -- name: AddTalkgroupWithLearnedFlag :exec
INSERT INTO talkgroups ( INSERT INTO talkgroups (
system_id, system_id,
@ -98,7 +130,7 @@ INSERT INTO talkgroups (
) VALUES( ) VALUES(
@system_id, @system_id,
@tgid, @tgid,
't' TRUE
); );
-- name: AddLearnedTalkgroup :one -- name: AddLearnedTalkgroup :one