Talkgroup bulk upsert call, name improvements

This commit is contained in:
Daniel 2024-11-20 09:37:57 -05:00
parent 89446b8a58
commit da73227c79
12 changed files with 207 additions and 134 deletions

View file

@ -3,7 +3,6 @@ package alert
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"time" "time"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
@ -56,15 +55,7 @@ func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talk
switch err { switch err {
case nil: case nil:
d.Weight = tgRecord.Talkgroup.Weight d.Weight = tgRecord.Talkgroup.Weight
if tgRecord.System.Name == "" { d.TGName = tgRecord.String()
tgRecord.System.Name = strconv.Itoa(int(score.ID.System))
}
if tgRecord.Talkgroup.Name != nil {
d.TGName = fmt.Sprintf("%s %s [%d]", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup))
}
default: default:
system, has := store.SystemName(ctx, int(score.ID.System)) system, has := store.SystemName(ctx, int(score.ID.System))
if has { if has {

View file

@ -86,7 +86,7 @@
{{ $tg := (index $.TGs .ID) }} {{ $tg := (index $.TGs .ID) }}
<tr> <tr>
<td>{{ $tg.System.Name}}</td> <td>{{ $tg.System.Name}}</td>
<td>{{ $tg.Talkgroup.Name}}</td> <td>{{ $tg.Talkgroup }}</td>
<td>{{ .ID.Talkgroup }}</td> <td>{{ .ID.Talkgroup }}</td>
<td>{{ f .Count 0 }}</td> <td>{{ f .Count 0 }}</td>
<td>{{ f .RecentCount 0 }}</td> <td>{{ f .RecentCount 0 }}</td>

132
pkg/database/batch.go Normal file
View file

@ -0,0 +1,132 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.27.0
// source: batch.go
package database
import (
"context"
"errors"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/alerting/rules"
"github.com/jackc/pgx/v5"
)
var (
ErrBatchAlreadyClosed = errors.New("batch already closed")
)
const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE($3, tg.name),
alpha_tag = COALESCE($4, tg.alpha_tag),
tg_group = COALESCE($5, tg.tg_group),
frequency = COALESCE($6, tg.frequency),
metadata = COALESCE($7, tg.metadata),
tags = COALESCE($8, tg.tags),
alert = COALESCE($9, tg.alert),
alert_config = COALESCE($10, tg.alert_config),
weight = COALESCE($11, tg.weight),
learned = COALESCE($12, tg.learned)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
`
type UpsertTalkgroupBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type UpsertTalkgroupParams struct {
SystemID int32 `json:"system_id"`
TGID int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TGGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata jsontypes.Metadata `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
}
func (q *Queries) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.SystemID,
a.TGID,
a.Name,
a.AlphaTag,
a.TGGroup,
a.Frequency,
a.Metadata,
a.Tags,
a.Alert,
a.AlertConfig,
a.Weight,
a.Learned,
}
batch.Queue(upsertTalkgroup, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &UpsertTalkgroupBatchResults{br, len(arg), false}
}
func (b *UpsertTalkgroupBatchResults) QueryRow(f func(int, Talkgroup, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
var i Talkgroup
if b.closed {
if f != nil {
f(t, i, ErrBatchAlreadyClosed)
}
continue
}
row := b.br.QueryRow()
err := row.Scan(
&i.ID,
&i.SystemID,
&i.TGID,
&i.Name,
&i.AlphaTag,
&i.TGGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Learned,
)
if f != nil {
f(t, i, err)
}
}
}
func (b *UpsertTalkgroupBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View file

@ -15,6 +15,7 @@ type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row QueryRow(context.Context, string, ...interface{}) pgx.Row
SendBatch(context.Context, *pgx.Batch) pgx.BatchResults
} }
func New(db DBTX) *Queries { func New(db DBTX) *Queries {

View file

@ -1,5 +1,9 @@
package database package database
import (
"strconv"
)
func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup } func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup }
func (d GetTalkgroupsRow) GetSystem() System { return d.System } func (d GetTalkgroupsRow) GetSystem() System { return d.System }
func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned } func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned }
@ -15,3 +19,18 @@ func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g
func (g Talkgroup) GetTalkgroup() Talkgroup { return g } func (g Talkgroup) GetTalkgroup() Talkgroup { return g }
func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} } func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} }
func (g Talkgroup) GetLearned() bool { return false } func (g Talkgroup) GetLearned() bool { return false }
func (g Talkgroup) String() string {
switch {
case g.AlphaTag != nil:
return *g.AlphaTag
case g.Name != nil && g.TGGroup != nil:
return *g.TGGroup + " " + *g.Name
case g.Name != nil:
return *g.Name + " [" + strconv.Itoa(int(g.TGID)) + "]"
case g.TGGroup != nil:
return *g.TGGroup + " [" + strconv.Itoa(int(g.TGID)) + "]"
}
return strconv.Itoa(int(g.TGID))
}

View file

@ -1772,31 +1772,23 @@ func (_c *Store_UpdateTalkgroup_Call) RunAndReturn(run func(context.Context, dat
} }
// UpsertTalkgroup provides a mock function with given fields: ctx, arg // UpsertTalkgroup provides a mock function with given fields: ctx, arg
func (_m *Store) UpsertTalkgroup(ctx context.Context, arg database.UpsertTalkgroupParams) (database.Talkgroup, error) { func (_m *Store) UpsertTalkgroup(ctx context.Context, arg []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults {
ret := _m.Called(ctx, arg) ret := _m.Called(ctx, arg)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for UpsertTalkgroup") panic("no return value specified for UpsertTalkgroup")
} }
var r0 database.Talkgroup var r0 *database.UpsertTalkgroupBatchResults
var r1 error if rf, ok := ret.Get(0).(func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults); ok {
if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)); ok {
return rf(ctx, arg)
}
if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) database.Talkgroup); ok {
r0 = rf(ctx, arg) r0 = rf(ctx, arg)
} else { } else {
r0 = ret.Get(0).(database.Talkgroup) if ret.Get(0) != nil {
r0 = ret.Get(0).(*database.UpsertTalkgroupBatchResults)
}
} }
if rf, ok := ret.Get(1).(func(context.Context, database.UpsertTalkgroupParams) error); ok { return r0
r1 = rf(ctx, arg)
} else {
r1 = ret.Error(1)
}
return r0, r1
} }
// Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup' // Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup'
@ -1806,24 +1798,24 @@ type Store_UpsertTalkgroup_Call struct {
// UpsertTalkgroup is a helper method to define mock.On call // UpsertTalkgroup is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - arg database.UpsertTalkgroupParams // - arg []database.UpsertTalkgroupParams
func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call { func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call {
return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)} return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)}
} }
func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call { func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg []database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(database.UpsertTalkgroupParams)) run(args[0].(context.Context), args[1].([]database.UpsertTalkgroupParams))
}) })
return _c return _c
} }
func (_c *Store_UpsertTalkgroup_Call) Return(_a0 database.Talkgroup, _a1 error) *Store_UpsertTalkgroup_Call { func (_c *Store_UpsertTalkgroup_Call) Return(_a0 *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(_a0, _a1) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)) *Store_UpsertTalkgroup_Call { func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View file

@ -39,7 +39,7 @@ type Querier interface {
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
UpdatePassword(ctx context.Context, username string, password string) error UpdatePassword(ctx context.Context, username string, password string) error
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults
} }
var _ Querier = (*Queries)(nil) var _ Querier = (*Queries)(nil)

View file

@ -475,84 +475,3 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams
) )
return i, err return i, err
} }
const upsertTalkgroup = `-- name: UpsertTalkgroup :one
INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
name = COALESCE($3, tg.name),
alpha_tag = COALESCE($4, tg.alpha_tag),
tg_group = COALESCE($5, tg.tg_group),
frequency = COALESCE($6, tg.frequency),
metadata = COALESCE($7, tg.metadata),
tags = COALESCE($8, tg.tags),
alert = COALESCE($9, tg.alert),
alert_config = COALESCE($10, tg.alert_config),
weight = COALESCE($11, tg.weight),
learned = COALESCE($12, tg.learned)
RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
`
type UpsertTalkgroupParams struct {
SystemID int32 `json:"system_id"`
TGID int32 `json:"tg_id"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TGGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata jsontypes.Metadata `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Learned *bool `json:"learned"`
}
func (q *Queries) UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) {
row := q.db.QueryRow(ctx, upsertTalkgroup,
arg.SystemID,
arg.TGID,
arg.Name,
arg.AlphaTag,
arg.TGGroup,
arg.Frequency,
arg.Metadata,
arg.Tags,
arg.Alert,
arg.AlertConfig,
arg.Weight,
arg.Learned,
)
var i Talkgroup
err := row.Scan(
&i.ID,
&i.SystemID,
&i.TGID,
&i.Name,
&i.AlphaTag,
&i.TGGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Alert,
&i.AlertConfig,
&i.Weight,
&i.Learned,
)
return i, err
}

View file

@ -19,7 +19,7 @@ func (tga *talkgroupAPI) Subrouter() http.Handler {
r.Get(`/{system:\d+}/{id:\d+}`, tga.get) r.Get(`/{system:\d+}/{id:\d+}`, tga.get)
r.Put(`/{system:\d+}/{id:\d+}`, tga.put) r.Put(`/{system:\d+}/{id:\d+}`, tga.put)
r.Put(`/{system:\d+}`, tga.putTalkgroups); r.Put(`/{system:\d+}`, tga.putTalkgroups)
r.Get(`/{system:\d+}/`, tga.get) r.Get(`/{system:\d+}/`, tga.get)
r.Get("/", tga.get) r.Get("/", tga.get)
r.Post("/import", tga.tgImport) r.Post("/import", tga.tgImport)
@ -51,7 +51,6 @@ func (t tgParams) ToID() talkgroups.ID {
} }
} }
func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) { func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
tgs := talkgroups.StoreFrom(ctx) tgs := talkgroups.StoreFrom(ctx)

View file

@ -318,31 +318,43 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
return nil, ErrNoSuchSystem return nil, ErrNoSuchSystem
} }
sys := database.System{ sys := database.System{
ID: system, ID: system,
Name: sysName, Name: sysName,
} }
tgs := make([]*Talkgroup, 0, len(input)) tgs := make([]*Talkgroup, 0, len(input))
err := db.InTx(ctx, func(db database.Store) error { err := db.InTx(ctx, func(db database.Store) error {
for _, tgu := range input { for i := range input {
// normalize tags // normalize tags
for i, tag := range tgu.Tags { for j, tag := range input[i].Tags {
tgu.Tags[i] = strings.ToLower(tag) input[i].Tags[j] = strings.ToLower(tag)
} }
tgu.SystemID = int32(system) input[i].SystemID = int32(system)
tgu.Learned = common.PtrTo(false) input[i].Learned = common.PtrTo(false)
tg, err := db.UpsertTalkgroup(ctx, tgu)
}
var oerr error
batch := db.UpsertTalkgroup(ctx, input)
defer batch.Close()
batch.QueryRow(func(_ int, r database.Talkgroup, err error) {
if err != nil { if err != nil {
return err oerr = err
return
} }
tgs = append(tgs, &Talkgroup{ tgs = append(tgs, &Talkgroup{
Talkgroup: tg, Talkgroup: r,
System: sys, System: sys,
Learned: tg.Learned, Learned: r.Learned,
}) })
})
if oerr != nil {
return oerr
} }
return nil return nil

View file

@ -2,6 +2,7 @@ package talkgroups
import ( import (
"fmt" "fmt"
"strconv"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
) )
@ -12,13 +13,20 @@ type Talkgroup struct {
Learned bool `json:"learned"` Learned bool `json:"learned"`
} }
type Metadata map[string]interface{} func (t Talkgroup) String() string {
if t.System.Name == "" {
t.System.Name = strconv.Itoa(int(t.Talkgroup.TGID))
}
type Names struct { if t.Talkgroup.Name != nil || t.Talkgroup.TGGroup != nil || t.Talkgroup.AlphaTag != nil {
System string return t.System.Name + " " + t.Talkgroup.String()
Talkgroup string }
return fmt.Sprintf("%s:%d", t.System.Name, int(t.Talkgroup.TGID))
} }
type Metadata map[string]interface{}
type ID struct { type ID struct {
System uint32 `json:"sys"` System uint32 `json:"sys"`
Talkgroup uint32 `json:"tg"` Talkgroup uint32 `json:"tg"`

View file

@ -91,12 +91,12 @@ SET
WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid')) WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid'))
RETURNING *; RETURNING *;
-- name: UpsertTalkgroup :one -- name: UpsertTalkgroup :batchone
INSERT INTO talkgroups AS tg ( INSERT INTO talkgroups AS tg (
system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned
) VALUES ( ) VALUES (
@system_id, @system_id,
@tg_id, @tgid,
sqlc.narg('name'), sqlc.narg('name'),
sqlc.narg('alpha_tag'), sqlc.narg('alpha_tag'),
sqlc.narg('tg_group'), sqlc.narg('tg_group'),