From da73227c7957927ef4d0b9f40283df35a2443ee6 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 20 Nov 2024 09:37:57 -0500 Subject: [PATCH] Talkgroup bulk upsert call, name improvements --- pkg/alerting/alert/alert.go | 11 +-- pkg/alerting/stats.html | 2 +- pkg/database/batch.go | 132 ++++++++++++++++++++++++++++ pkg/database/db.go | 1 + pkg/database/extend.go | 19 ++++ pkg/database/mocks/Store.go | 34 +++---- pkg/database/querier.go | 2 +- pkg/database/talkgroups.sql.go | 81 ----------------- pkg/rest/talkgroups.go | 3 +- pkg/talkgroups/store.go | 36 +++++--- pkg/talkgroups/talkgroup.go | 16 +++- sql/postgres/queries/talkgroups.sql | 4 +- 12 files changed, 207 insertions(+), 134 deletions(-) create mode 100644 pkg/database/batch.go diff --git a/pkg/alerting/alert/alert.go b/pkg/alerting/alert/alert.go index 3889a61..c3b0f60 100644 --- a/pkg/alerting/alert/alert.go +++ b/pkg/alerting/alert/alert.go @@ -3,7 +3,6 @@ package alert import ( "context" "fmt" - "strconv" "time" "dynatron.me/x/stillbox/internal/trending" @@ -56,15 +55,7 @@ func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talk switch err { case nil: d.Weight = tgRecord.Talkgroup.Weight - if tgRecord.System.Name == "" { - tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) - } - - if tgRecord.Talkgroup.Name != nil { - d.TGName = fmt.Sprintf("%s %s [%d]", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup) - } else { - d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) - } + d.TGName = tgRecord.String() default: system, has := store.SystemName(ctx, int(score.ID.System)) if has { diff --git a/pkg/alerting/stats.html b/pkg/alerting/stats.html index cbaa108..01c796e 100644 --- a/pkg/alerting/stats.html +++ b/pkg/alerting/stats.html @@ -86,7 +86,7 @@ {{ $tg := (index $.TGs .ID) }} {{ $tg.System.Name}} - {{ $tg.Talkgroup.Name}} + {{ $tg.Talkgroup }} {{ .ID.Talkgroup }} {{ f .Count 0 }} {{ f .RecentCount 0 }} diff --git a/pkg/database/batch.go b/pkg/database/batch.go new file mode 100644 index 0000000..31b3fa8 --- /dev/null +++ b/pkg/database/batch.go @@ -0,0 +1,132 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: batch.go + +package database + +import ( + "context" + "errors" + + "dynatron.me/x/stillbox/internal/jsontypes" + "dynatron.me/x/stillbox/pkg/alerting/rules" + "github.com/jackc/pgx/v5" +) + +var ( + ErrBatchAlreadyClosed = errors.New("batch already closed") +) + +const upsertTalkgroup = `-- name: UpsertTalkgroup :batchone +INSERT INTO talkgroups AS tg ( + system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12 +) +ON CONFLICT (system_id, tgid) DO UPDATE +SET + name = COALESCE($3, tg.name), + alpha_tag = COALESCE($4, tg.alpha_tag), + tg_group = COALESCE($5, tg.tg_group), + frequency = COALESCE($6, tg.frequency), + metadata = COALESCE($7, tg.metadata), + tags = COALESCE($8, tg.tags), + alert = COALESCE($9, tg.alert), + alert_config = COALESCE($10, tg.alert_config), + weight = COALESCE($11, tg.weight), + learned = COALESCE($12, tg.learned) +RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned +` + +type UpsertTalkgroupBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type UpsertTalkgroupParams struct { + SystemID int32 `json:"system_id"` + TGID int32 `json:"tgid"` + Name *string `json:"name"` + AlphaTag *string `json:"alpha_tag"` + TGGroup *string `json:"tg_group"` + Frequency *int32 `json:"frequency"` + Metadata jsontypes.Metadata `json:"metadata"` + Tags []string `json:"tags"` + Alert *bool `json:"alert"` + AlertConfig rules.AlertRules `json:"alert_config"` + Weight *float32 `json:"weight"` + Learned *bool `json:"learned"` +} + +func (q *Queries) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.SystemID, + a.TGID, + a.Name, + a.AlphaTag, + a.TGGroup, + a.Frequency, + a.Metadata, + a.Tags, + a.Alert, + a.AlertConfig, + a.Weight, + a.Learned, + } + batch.Queue(upsertTalkgroup, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &UpsertTalkgroupBatchResults{br, len(arg), false} +} + +func (b *UpsertTalkgroupBatchResults) QueryRow(f func(int, Talkgroup, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + var i Talkgroup + if b.closed { + if f != nil { + f(t, i, ErrBatchAlreadyClosed) + } + continue + } + row := b.br.QueryRow() + err := row.Scan( + &i.ID, + &i.SystemID, + &i.TGID, + &i.Name, + &i.AlphaTag, + &i.TGGroup, + &i.Frequency, + &i.Metadata, + &i.Tags, + &i.Alert, + &i.AlertConfig, + &i.Weight, + &i.Learned, + ) + if f != nil { + f(t, i, err) + } + } +} + +func (b *UpsertTalkgroupBatchResults) Close() error { + b.closed = true + return b.br.Close() +} diff --git a/pkg/database/db.go b/pkg/database/db.go index 8187a2b..b305de7 100644 --- a/pkg/database/db.go +++ b/pkg/database/db.go @@ -15,6 +15,7 @@ type DBTX interface { Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error) QueryRow(context.Context, string, ...interface{}) pgx.Row + SendBatch(context.Context, *pgx.Batch) pgx.BatchResults } func New(db DBTX) *Queries { diff --git a/pkg/database/extend.go b/pkg/database/extend.go index c9ae0c2..21328e3 100644 --- a/pkg/database/extend.go +++ b/pkg/database/extend.go @@ -1,5 +1,9 @@ package database +import ( + "strconv" +) + func (d GetTalkgroupsRow) GetTalkgroup() Talkgroup { return d.Talkgroup } func (d GetTalkgroupsRow) GetSystem() System { return d.System } func (d GetTalkgroupsRow) GetLearned() bool { return d.Talkgroup.Learned } @@ -15,3 +19,18 @@ func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g func (g Talkgroup) GetTalkgroup() Talkgroup { return g } func (g Talkgroup) GetSystem() System { return System{ID: int(g.SystemID)} } func (g Talkgroup) GetLearned() bool { return false } + +func (g Talkgroup) String() string { + switch { + case g.AlphaTag != nil: + return *g.AlphaTag + case g.Name != nil && g.TGGroup != nil: + return *g.TGGroup + " " + *g.Name + case g.Name != nil: + return *g.Name + " [" + strconv.Itoa(int(g.TGID)) + "]" + case g.TGGroup != nil: + return *g.TGGroup + " [" + strconv.Itoa(int(g.TGID)) + "]" + } + + return strconv.Itoa(int(g.TGID)) +} diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 7f9f52e..b7a94f9 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -1772,31 +1772,23 @@ func (_c *Store_UpdateTalkgroup_Call) RunAndReturn(run func(context.Context, dat } // UpsertTalkgroup provides a mock function with given fields: ctx, arg -func (_m *Store) UpsertTalkgroup(ctx context.Context, arg database.UpsertTalkgroupParams) (database.Talkgroup, error) { +func (_m *Store) UpsertTalkgroup(ctx context.Context, arg []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults { ret := _m.Called(ctx, arg) if len(ret) == 0 { panic("no return value specified for UpsertTalkgroup") } - var r0 database.Talkgroup - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)); ok { - return rf(ctx, arg) - } - if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) database.Talkgroup); ok { + var r0 *database.UpsertTalkgroupBatchResults + if rf, ok := ret.Get(0).(func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults); ok { r0 = rf(ctx, arg) } else { - r0 = ret.Get(0).(database.Talkgroup) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.UpsertTalkgroupBatchResults) + } } - if rf, ok := ret.Get(1).(func(context.Context, database.UpsertTalkgroupParams) error); ok { - r1 = rf(ctx, arg) - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup' @@ -1806,24 +1798,24 @@ type Store_UpsertTalkgroup_Call struct { // UpsertTalkgroup is a helper method to define mock.On call // - ctx context.Context -// - arg database.UpsertTalkgroupParams +// - arg []database.UpsertTalkgroupParams func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call { return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)} } -func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call { +func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg []database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(database.UpsertTalkgroupParams)) + run(args[0].(context.Context), args[1].([]database.UpsertTalkgroupParams)) }) return _c } -func (_c *Store_UpsertTalkgroup_Call) Return(_a0 database.Talkgroup, _a1 error) *Store_UpsertTalkgroup_Call { - _c.Call.Return(_a0, _a1) +func (_c *Store_UpsertTalkgroup_Call) Return(_a0 *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call { + _c.Call.Return(_a0) return _c } -func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)) *Store_UpsertTalkgroup_Call { +func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, []database.UpsertTalkgroupParams) *database.UpsertTalkgroupBatchResults) *Store_UpsertTalkgroup_Call { _c.Call.Return(run) return _c } diff --git a/pkg/database/querier.go b/pkg/database/querier.go index d9a381a..43849db 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -39,7 +39,7 @@ type Querier interface { SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error UpdatePassword(ctx context.Context, username string, password string) error UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) - UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) + UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults } var _ Querier = (*Queries)(nil) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index acb4ffa..c620d38 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -475,84 +475,3 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams ) return i, err } - -const upsertTalkgroup = `-- name: UpsertTalkgroup :one -INSERT INTO talkgroups AS tg ( - system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned -) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9, - $10, - $11, - $12 -) -ON CONFLICT (system_id, tgid) DO UPDATE -SET - name = COALESCE($3, tg.name), - alpha_tag = COALESCE($4, tg.alpha_tag), - tg_group = COALESCE($5, tg.tg_group), - frequency = COALESCE($6, tg.frequency), - metadata = COALESCE($7, tg.metadata), - tags = COALESCE($8, tg.tags), - alert = COALESCE($9, tg.alert), - alert_config = COALESCE($10, tg.alert_config), - weight = COALESCE($11, tg.weight), - learned = COALESCE($12, tg.learned) -RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned -` - -type UpsertTalkgroupParams struct { - SystemID int32 `json:"system_id"` - TGID int32 `json:"tg_id"` - Name *string `json:"name"` - AlphaTag *string `json:"alpha_tag"` - TGGroup *string `json:"tg_group"` - Frequency *int32 `json:"frequency"` - Metadata jsontypes.Metadata `json:"metadata"` - Tags []string `json:"tags"` - Alert *bool `json:"alert"` - AlertConfig rules.AlertRules `json:"alert_config"` - Weight *float32 `json:"weight"` - Learned *bool `json:"learned"` -} - -func (q *Queries) UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) { - row := q.db.QueryRow(ctx, upsertTalkgroup, - arg.SystemID, - arg.TGID, - arg.Name, - arg.AlphaTag, - arg.TGGroup, - arg.Frequency, - arg.Metadata, - arg.Tags, - arg.Alert, - arg.AlertConfig, - arg.Weight, - arg.Learned, - ) - var i Talkgroup - err := row.Scan( - &i.ID, - &i.SystemID, - &i.TGID, - &i.Name, - &i.AlphaTag, - &i.TGGroup, - &i.Frequency, - &i.Metadata, - &i.Tags, - &i.Alert, - &i.AlertConfig, - &i.Weight, - &i.Learned, - ) - return i, err -} diff --git a/pkg/rest/talkgroups.go b/pkg/rest/talkgroups.go index 8df86f9..6dc34d6 100644 --- a/pkg/rest/talkgroups.go +++ b/pkg/rest/talkgroups.go @@ -19,7 +19,7 @@ func (tga *talkgroupAPI) Subrouter() http.Handler { r.Get(`/{system:\d+}/{id:\d+}`, tga.get) r.Put(`/{system:\d+}/{id:\d+}`, tga.put) - r.Put(`/{system:\d+}`, tga.putTalkgroups); + r.Put(`/{system:\d+}`, tga.putTalkgroups) r.Get(`/{system:\d+}/`, tga.get) r.Get("/", tga.get) r.Post("/import", tga.tgImport) @@ -51,7 +51,6 @@ func (t tgParams) ToID() talkgroups.ID { } } - func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) { ctx := r.Context() tgs := talkgroups.StoreFrom(ctx) diff --git a/pkg/talkgroups/store.go b/pkg/talkgroups/store.go index a2b3593..ff812e7 100644 --- a/pkg/talkgroups/store.go +++ b/pkg/talkgroups/store.go @@ -318,31 +318,43 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse return nil, ErrNoSuchSystem } sys := database.System{ - ID: system, + ID: system, Name: sysName, } tgs := make([]*Talkgroup, 0, len(input)) err := db.InTx(ctx, func(db database.Store) error { - for _, tgu := range input { + for i := range input { // normalize tags - for i, tag := range tgu.Tags { - tgu.Tags[i] = strings.ToLower(tag) + for j, tag := range input[i].Tags { + input[i].Tags[j] = strings.ToLower(tag) } - tgu.SystemID = int32(system) - tgu.Learned = common.PtrTo(false) - tg, err := db.UpsertTalkgroup(ctx, tgu) + input[i].SystemID = int32(system) + input[i].Learned = common.PtrTo(false) + + } + + var oerr error + + batch := db.UpsertTalkgroup(ctx, input) + defer batch.Close() + + batch.QueryRow(func(_ int, r database.Talkgroup, err error) { if err != nil { - return err + oerr = err + return } - tgs = append(tgs, &Talkgroup{ - Talkgroup: tg, - System: sys, - Learned: tg.Learned, + Talkgroup: r, + System: sys, + Learned: r.Learned, }) + }) + + if oerr != nil { + return oerr } return nil diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index c609f42..4301a79 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -2,6 +2,7 @@ package talkgroups import ( "fmt" + "strconv" "dynatron.me/x/stillbox/pkg/database" ) @@ -12,13 +13,20 @@ type Talkgroup struct { Learned bool `json:"learned"` } -type Metadata map[string]interface{} +func (t Talkgroup) String() string { + if t.System.Name == "" { + t.System.Name = strconv.Itoa(int(t.Talkgroup.TGID)) + } -type Names struct { - System string - Talkgroup string + if t.Talkgroup.Name != nil || t.Talkgroup.TGGroup != nil || t.Talkgroup.AlphaTag != nil { + return t.System.Name + " " + t.Talkgroup.String() + } + + return fmt.Sprintf("%s:%d", t.System.Name, int(t.Talkgroup.TGID)) } +type Metadata map[string]interface{} + type ID struct { System uint32 `json:"sys"` Talkgroup uint32 `json:"tg"` diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 7b9c7cc..fe106a4 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -91,12 +91,12 @@ SET WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid')) RETURNING *; --- name: UpsertTalkgroup :one +-- name: UpsertTalkgroup :batchone INSERT INTO talkgroups AS tg ( system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned ) VALUES ( @system_id, - @tg_id, + @tgid, sqlc.narg('name'), sqlc.narg('alpha_tag'), sqlc.narg('tg_group'),