Compare commits

...

3 commits

Author SHA1 Message Date
da73227c79 Talkgroup bulk upsert call, name improvements 2024-11-20 09:37:57 -05:00
89446b8a58 fix test 2024-11-20 07:48:03 -05:00
f909723f7d wip pre-batch 2024-11-20 07:26:59 -05:00
15 changed files with 366 additions and 26 deletions

View file

@ -3,7 +3,6 @@ package alert
import (
"context"
"fmt"
"strconv"
"time"
"dynatron.me/x/stillbox/internal/trending"
@ -56,15 +55,7 @@ func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talk
switch err {
case nil:
d.Weight = tgRecord.Talkgroup.Weight
if tgRecord.System.Name == "" {
tgRecord.System.Name = strconv.Itoa(int(score.ID.System))
}
if tgRecord.Talkgroup.Name != nil {
d.TGName = fmt.Sprintf("%s %s [%d]", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup))
}
d.TGName = tgRecord.String()
default:
system, has := store.SystemName(ctx, int(score.ID.System))
if has {

View file

@ -86,7 +86,7 @@
{{ $tg := (index $.TGs .ID) }}
<tr>
<td>{{ $tg.System.Name}}</td>
<td>{{ $tg.Talkgroup.Name}}</td>
<td>{{ $tg.Talkgroup }}</td>
<td>{{ .ID.Talkgroup }}</td>
<td>{{ f .Count 0 }}</td>
<td>{{ f .RecentCount 0 }}</td>

132
pkg/database/batch.go Normal file
View file

@ -0,0 +1,132 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
// source: batch.go
package database
import (
"context"
"errors"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/alerting/rules"
"github.com/jackc/pgx/v5"
)
var (
ErrBatchAlreadyClosed = errors.New("batch already closed")
)
const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE($3, tg.name),
alpha_tag = COALESCE($4, tg.alpha_tag),
tg_group = COALESCE($5, tg.tg_group),
frequency = COALESCE($6, tg.frequency),
metadata = COALESCE($7, tg.metadata),
tags = COALESCE($8, tg.tags),
alert = COALESCE($9, tg.alert),
alert_config = COALESCE($10, tg.alert_config),
weight = COALESCE($11, tg.weight),
learned = COALESCE($12, tg.learned)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
`
type UpsertTalkgroupBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type UpsertTalkgroupParams struct {
SystemID int32 `json:"system_id"`
TGID int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TGGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata jsontypes.Metadata `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
}
func (q *Queries) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.SystemID,
a.TGID,
a.Name,
a.AlphaTag,
a.TGGroup,
a.Frequency,
a.Metadata,
a.Tags,
a.Alert,
a.AlertConfig,
a.Weight,
a.Learned,
}
batch.Queue(upsertTalkgroup, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &UpsertTalkgroupBatchResults{br, len(arg), false}
}
func (b *UpsertTalkgroupBatchResults) QueryRow(f func(int, Talkgroup, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
var i Talkgroup
if b.closed {
if f != nil {
f(t, i, ErrBatchAlreadyClosed)
}
continue
}
row := b.br.QueryRow()
err := row.Scan(
&i.ID,
&i.SystemID,
&i.TGID,
&i.Name,
&i.AlphaTag,
&i.TGGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Learned,
)
if f != nil {
f(t, i, err)
}
}
}
func (b *UpsertTalkgroupBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View file

@ -15,6 +15,7 @@ type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
}
func New(db DBTX) *Queries {

View file

@ -1,5 +1,9 @@
package database
import (
"strconv"
)
func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup }
func (d GetTalkgroupsRow) GetSystem() System { return d.System }
func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned }
@ -15,3 +19,18 @@ func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g
func (g Talkgroup) GetTalkgroup() Talkgroup { return g }
func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} }
func (g Talkgroup) GetLearned() bool { return false }
func (g Talkgroup) String() string {
switch {
case g.AlphaTag != nil:
return *g.AlphaTag
case g.Name != nil && g.TGGroup != nil:
return *g.TGGroup + " " + *g.Name
case g.Name != nil:
return *g.Name + " [" + strconv.Itoa(int(g.TGID)) + "]"
case g.TGGroup != nil:
return *g.TGGroup + " [" + strconv.Itoa(int(g.TGID)) + "]"
}
return strconv.Itoa(int(g.TGID))
}

View file

@ -1771,6 +1771,55 @@ func (_c *Store_UpdateTalkgroup_Call) RunAndReturn(run func(context.Context, dat
return _c
}
// UpsertTalkgroup provides a mock function with given fields: ctx, arg
func (_m *Store) UpsertTalkgroup(ctx context.Context, arg []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults {
ret := _m.Called(ctx, arg)
if len(ret) == 0 {
panic("no return value specified for UpsertTalkgroup")
}
var r0 *database.UpsertTalkgroupBatchResults
if rf, ok := ret.Get(0).(func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults); ok {
r0 = rf(ctx, arg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*database.UpsertTalkgroupBatchResults)
}
}
return r0
}
// Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup'
type Store_UpsertTalkgroup_Call struct {
*mock.Call
}
// UpsertTalkgroup is a helper method to define mock.On call
// - ctx context.Context
// - arg []database.UpsertTalkgroupParams
func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call {
return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)}
}
func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg []database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]database.UpsertTalkgroupParams))
})
return _c
}
func (_c *Store_UpsertTalkgroup_Call) Return(_a0 *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(run)
return _c
}
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewStore(t interface {

View file

@ -39,6 +39,7 @@ type Querier interface {
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
UpdatePassword(ctx context.Context, username string, password string) error
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults
}
var _ Querier = (*Queries)(nil)

View file

@ -57,7 +57,7 @@ INSERT INTO talkgroups (
) VALUES(
$1,
$2,
't'
TRUE
)
`
@ -419,8 +419,9 @@ SET
tags = COALESCE($6, tags),
alert = COALESCE($7, alert),
alert_config = COALESCE($8, alert_config),
weight = COALESCE($9, weight)
WHERE id = $10 OR (system_id = $11 AND tgid = $12)
weight = COALESCE($9, weight),
learned = COALESCE($10, learned)
WHERE id = $11 OR (system_id = $12 AND tgid = $13)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
`
@ -434,6 +435,7 @@ type UpdateTalkgroupParams struct {
Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
ID *int32 `json:"id"`
SystemID *int32 `json:"system_id"`
TGID *int32 `json:"tgid"`
@ -450,6 +452,7 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams
arg.Alert,
arg.AlertConfig,
arg.Weight,
arg.Learned,
arg.ID,
arg.SystemID,
arg.TGID,

View file

@ -42,7 +42,6 @@ type errResponse struct {
func (e *errResponse) Render(w http.ResponseWriter, r *http.Request) error {
switch e.Code {
case http.StatusNotFound:
default:
log.Error().Str("path", r.URL.Path).Err(e.Err).Int("code", e.Code).Str("msg", e.Error).Msg("request failed")
}
@ -71,7 +70,7 @@ func recordNotFound(err error) render.Renderer {
func internalError(err error) render.Renderer {
return &errResponse{
Err: err,
Code: http.StatusNotFound,
Code: http.StatusInternalServerError,
Error: "Internal server error",
}
}

View file

@ -17,9 +17,10 @@ type talkgroupAPI struct {
func (tga *talkgroupAPI) Subrouter() http.Handler {
r := chi.NewMux()
r.Get("/{system:\\d+}/{id:\\d+}", tga.get)
r.Put("/{system:\\d+}/{id:\\d+}", tga.put)
r.Get("/{system:\\d+}/", tga.get)
r.Get(`/{system:\d+}/{id:\d+}`, tga.get)
r.Put(`/{system:\d+}/{id:\d+}`, tga.put)
r.Put(`/{system:\d+}`, tga.putTalkgroups)
r.Get(`/{system:\d+}/`, tga.get)
r.Get("/", tga.get)
r.Post("/import", tga.tgImport)
@ -99,6 +100,8 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) {
return
}
input.Learned = nil // ignore for this call
record, err := tgs.UpdateTG(ctx, input)
if err != nil {
wErr(w, r, autoError(err))
@ -123,3 +126,36 @@ func (tga *talkgroupAPI) tgImport(w http.ResponseWriter, r *http.Request) {
respond(w, r, recs)
}
func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) {
var id tgParams
err := decodeParams(&id, r)
if err != nil {
wErr(w, r, badRequest(err))
return
}
if id.System == nil { // don't think this would ever happen
wErr(w, r, badRequest(talkgroups.ErrNoSuchSystem))
return
}
ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx)
var input []database.UpsertTalkgroupParams
err = forms.Unmarshal(r, &input, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty())
if err != nil {
wErr(w, r, badRequest(err))
return
}
record, err := tgs.UpsertTGs(ctx, *id.System, input)
if err != nil {
wErr(w, r, autoError(err))
return
}
respond(w, r, record)
}

View file

@ -110,6 +110,7 @@ func (rr *radioReferenceImporter) importTalkgroups(ctx context.Context, sys int,
gn := groupName // must take a copy
tgs = append(tgs, talkgroups.Talkgroup{
Talkgroup: database.Talkgroup{
ID: len(tgs), // need unique ID for the UI to track
TGID: int32(tgt.Talkgroup),
SystemID: int32(tgt.System),
Name: &fields[4],

File diff suppressed because one or more lines are too long

View file

@ -3,9 +3,11 @@ package talkgroups
import (
"context"
"errors"
"strings"
"sync"
"time"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
@ -24,6 +26,9 @@ type Store interface {
// UpdateTG updates a talkgroup record.
UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error)
// UpsertTGs upserts a slice of talkgroups.
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error)
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (*Talkgroup, error)
@ -305,3 +310,66 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
return record, nil
}
func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) {
db := database.FromCtx(ctx)
sysName, hasSys := t.SystemName(ctx, system)
if !hasSys {
return nil, ErrNoSuchSystem
}
sys := database.System{
ID: system,
Name: sysName,
}
tgs := make([]*Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error {
for i := range input {
// normalize tags
for j, tag := range input[i].Tags {
input[i].Tags[j] = strings.ToLower(tag)
}
input[i].SystemID = int32(system)
input[i].Learned = common.PtrTo(false)
}
var oerr error
batch := db.UpsertTalkgroup(ctx, input)
defer batch.Close()
batch.QueryRow(func(_ int, r database.Talkgroup, err error) {
if err != nil {
oerr = err
return
}
tgs = append(tgs, &Talkgroup{
Talkgroup: r,
System: sys,
Learned: r.Learned,
})
})
if oerr != nil {
return oerr
}
return nil
}, pgx.TxOptions{})
if err != nil {
return nil, err
}
// update the cache
t.Lock()
defer t.Unlock()
for _, tg := range tgs {
t.tgs[TG(tg.SystemID, tg.TGID)] = tg
}
return tgs, nil
}

View file

@ -2,6 +2,7 @@ package talkgroups
import (
"fmt"
"strconv"
"dynatron.me/x/stillbox/pkg/database"
)
@ -12,13 +13,20 @@ type Talkgroup struct {
Learned bool `json:"learned"`
}
type Metadata map[string]interface{}
func (t Talkgroup) String() string {
if t.System.Name == "" {
t.System.Name = strconv.Itoa(int(t.Talkgroup.TGID))
}
type Names struct {
System string
Talkgroup string
if t.Talkgroup.Name != nil || t.Talkgroup.TGGroup != nil || t.Talkgroup.AlphaTag != nil {
return t.System.Name + " " + t.Talkgroup.String()
}
return fmt.Sprintf("%s:%d", t.System.Name, int(t.Talkgroup.TGID))
}
type Metadata map[string]interface{}
type ID struct {
System uint32 `json:"sys"`
Talkgroup uint32 `json:"tg"`

View file

@ -86,10 +86,42 @@ SET
tags = COALESCE(sqlc.narg('tags'), tags),
alert = COALESCE(sqlc.narg('alert'), alert),
alert_config = COALESCE(sqlc.narg('alert_config'), alert_config),
weight = COALESCE(sqlc.narg('weight'), weight)
weight = COALESCE(sqlc.narg('weight'), weight),
learned = COALESCE(sqlc.narg('learned'), learned)
WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid'))
RETURNING *;
-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
@system_id,
@tgid,
sqlc.narg('name'),
sqlc.narg('alpha_tag'),
sqlc.narg('tg_group'),
sqlc.narg('frequency'),
sqlc.narg('metadata'),
sqlc.narg('tags'),
sqlc.narg('alert'),
sqlc.narg('alert_config'),
sqlc.narg('weight'),
sqlc.narg('learned')
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE(sqlc.narg('name'), tg.name),
alpha_tag = COALESCE(sqlc.narg('alpha_tag'), tg.alpha_tag),
tg_group = COALESCE(sqlc.narg('tg_group'), tg.tg_group),
frequency = COALESCE(sqlc.narg('frequency'), tg.frequency),
metadata = COALESCE(sqlc.narg('metadata'), tg.metadata),
tags = COALESCE(sqlc.narg('tags'), tg.tags),
alert = COALESCE(sqlc.narg('alert'), tg.alert),
alert_config = COALESCE(sqlc.narg('alert_config'), tg.alert_config),
weight = COALESCE(sqlc.narg('weight'), tg.weight),
learned = COALESCE(sqlc.narg('learned'), tg.learned)
RETURNING *;
-- name: AddTalkgroupWithLearnedFlag :exec
INSERT INTO talkgroups (
system_id,
@ -98,7 +130,7 @@ INSERT INTO talkgroups (
) VALUES(
@system_id,
@tgid,
't'
TRUE
);
-- name: AddLearnedTalkgroup :one