From f909723f7d57c21c643141536b395d69416a5206 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Wed, 20 Nov 2024 07:26:59 -0500 Subject: [PATCH] wip pre-batch --- pkg/database/mocks/Store.go | 57 +++++++++++++++++ pkg/database/querier.go | 1 + pkg/database/talkgroups.sql.go | 90 ++++++++++++++++++++++++++- pkg/rest/api.go | 3 +- pkg/rest/talkgroups.go | 43 ++++++++++++- pkg/talkgroups/importer/import.go | 1 + pkg/talkgroups/{cache.go => store.go} | 56 +++++++++++++++++ sql/postgres/queries/talkgroups.sql | 36 ++++++++++- 8 files changed, 277 insertions(+), 10 deletions(-) rename pkg/talkgroups/{cache.go => store.go} (84%) diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 2c769a4..7f9f52e 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -1771,6 +1771,63 @@ func (_c *Store_UpdateTalkgroup_Call) RunAndReturn(run func(context.Context, dat return _c } +// UpsertTalkgroup provides a mock function with given fields: ctx, arg +func (_m *Store) UpsertTalkgroup(ctx context.Context, arg database.UpsertTalkgroupParams) (database.Talkgroup, error) { + ret := _m.Called(ctx, arg) + + if len(ret) == 0 { + panic("no return value specified for UpsertTalkgroup") + } + + var r0 database.Talkgroup + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)); ok { + return rf(ctx, arg) + } + if rf, ok := ret.Get(0).(func(context.Context, database.UpsertTalkgroupParams) database.Talkgroup); ok { + r0 = rf(ctx, arg) + } else { + r0 = ret.Get(0).(database.Talkgroup) + } + + if rf, ok := ret.Get(1).(func(context.Context, database.UpsertTalkgroupParams) error); ok { + r1 = rf(ctx, arg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_UpsertTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTalkgroup' +type Store_UpsertTalkgroup_Call struct { + *mock.Call +} + +// UpsertTalkgroup is a helper method to define mock.On call +// - ctx context.Context +// - arg database.UpsertTalkgroupParams +func (_e *Store_Expecter) UpsertTalkgroup(ctx interface{}, arg interface{}) *Store_UpsertTalkgroup_Call { + return &Store_UpsertTalkgroup_Call{Call: _e.mock.On("UpsertTalkgroup", ctx, arg)} +} + +func (_c *Store_UpsertTalkgroup_Call) Run(run func(ctx context.Context, arg database.UpsertTalkgroupParams)) *Store_UpsertTalkgroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(database.UpsertTalkgroupParams)) + }) + return _c +} + +func (_c *Store_UpsertTalkgroup_Call) Return(_a0 database.Talkgroup, _a1 error) *Store_UpsertTalkgroup_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_UpsertTalkgroup_Call) RunAndReturn(run func(context.Context, database.UpsertTalkgroupParams) (database.Talkgroup, error)) *Store_UpsertTalkgroup_Call { + _c.Call.Return(run) + return _c +} + // NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewStore(t interface { diff --git a/pkg/database/querier.go b/pkg/database/querier.go index aa42757..d9a381a 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -39,6 +39,7 @@ type Querier interface { SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error UpdatePassword(ctx context.Context, username string, password string) error UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) + UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) } var _ Querier = (*Queries)(nil) diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index 05ba01d..acb4ffa 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -57,7 +57,7 @@ INSERT INTO talkgroups ( ) VALUES( $1, $2, - 't' + TRUE ) ` @@ -419,8 +419,9 @@ SET tags = COALESCE($6, tags), alert = COALESCE($7, alert), alert_config = COALESCE($8, alert_config), - weight = COALESCE($9, weight) -WHERE id = $10 OR (system_id = $11 AND tgid = $12) + weight = COALESCE($9, weight), + learned = COALESCE($10, learned) +WHERE id = $11 OR (system_id = $12 AND tgid = $13) RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned ` @@ -434,6 +435,7 @@ type UpdateTalkgroupParams struct { Alert *bool `json:"alert"` AlertConfig rules.AlertRules `json:"alert_config"` Weight *float32 `json:"weight"` + Learned *bool `json:"learned"` ID *int32 `json:"id"` SystemID *int32 `json:"system_id"` TGID *int32 `json:"tgid"` @@ -450,6 +452,7 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams arg.Alert, arg.AlertConfig, arg.Weight, + arg.Learned, arg.ID, arg.SystemID, arg.TGID, @@ -472,3 +475,84 @@ func (q *Queries) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams ) return i, err } + +const upsertTalkgroup = `-- name: UpsertTalkgroup :one +INSERT INTO talkgroups AS tg ( + system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9, + $10, + $11, + $12 +) +ON CONFLICT (system_id, tgid) DO UPDATE +SET + name = COALESCE($3, tg.name), + alpha_tag = COALESCE($4, tg.alpha_tag), + tg_group = COALESCE($5, tg.tg_group), + frequency = COALESCE($6, tg.frequency), + metadata = COALESCE($7, tg.metadata), + tags = COALESCE($8, tg.tags), + alert = COALESCE($9, tg.alert), + alert_config = COALESCE($10, tg.alert_config), + weight = COALESCE($11, tg.weight), + learned = COALESCE($12, tg.learned) +RETURNING id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned +` + +type UpsertTalkgroupParams struct { + SystemID int32 `json:"system_id"` + TGID int32 `json:"tg_id"` + Name *string `json:"name"` + AlphaTag *string `json:"alpha_tag"` + TGGroup *string `json:"tg_group"` + Frequency *int32 `json:"frequency"` + Metadata jsontypes.Metadata `json:"metadata"` + Tags []string `json:"tags"` + Alert *bool `json:"alert"` + AlertConfig rules.AlertRules `json:"alert_config"` + Weight *float32 `json:"weight"` + Learned *bool `json:"learned"` +} + +func (q *Queries) UpsertTalkgroup(ctx context.Context, arg UpsertTalkgroupParams) (Talkgroup, error) { + row := q.db.QueryRow(ctx, upsertTalkgroup, + arg.SystemID, + arg.TGID, + arg.Name, + arg.AlphaTag, + arg.TGGroup, + arg.Frequency, + arg.Metadata, + arg.Tags, + arg.Alert, + arg.AlertConfig, + arg.Weight, + arg.Learned, + ) + var i Talkgroup + err := row.Scan( + &i.ID, + &i.SystemID, + &i.TGID, + &i.Name, + &i.AlphaTag, + &i.TGGroup, + &i.Frequency, + &i.Metadata, + &i.Tags, + &i.Alert, + &i.AlertConfig, + &i.Weight, + &i.Learned, + ) + return i, err +} diff --git a/pkg/rest/api.go b/pkg/rest/api.go index cfbd24e..3ed9adc 100644 --- a/pkg/rest/api.go +++ b/pkg/rest/api.go @@ -42,7 +42,6 @@ type errResponse struct { func (e *errResponse) Render(w http.ResponseWriter, r *http.Request) error { switch e.Code { - case http.StatusNotFound: default: log.Error().Str("path", r.URL.Path).Err(e.Err).Int("code", e.Code).Str("msg", e.Error).Msg("request failed") } @@ -71,7 +70,7 @@ func recordNotFound(err error) render.Renderer { func internalError(err error) render.Renderer { return &errResponse{ Err: err, - Code: http.StatusNotFound, + Code: http.StatusInternalServerError, Error: "Internal server error", } } diff --git a/pkg/rest/talkgroups.go b/pkg/rest/talkgroups.go index a392ff1..8df86f9 100644 --- a/pkg/rest/talkgroups.go +++ b/pkg/rest/talkgroups.go @@ -17,9 +17,10 @@ type talkgroupAPI struct { func (tga *talkgroupAPI) Subrouter() http.Handler { r := chi.NewMux() - r.Get("/{system:\\d+}/{id:\\d+}", tga.get) - r.Put("/{system:\\d+}/{id:\\d+}", tga.put) - r.Get("/{system:\\d+}/", tga.get) + r.Get(`/{system:\d+}/{id:\d+}`, tga.get) + r.Put(`/{system:\d+}/{id:\d+}`, tga.put) + r.Put(`/{system:\d+}`, tga.putTalkgroups); + r.Get(`/{system:\d+}/`, tga.get) r.Get("/", tga.get) r.Post("/import", tga.tgImport) @@ -50,6 +51,7 @@ func (t tgParams) ToID() talkgroups.ID { } } + func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) { ctx := r.Context() tgs := talkgroups.StoreFrom(ctx) @@ -99,6 +101,8 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) { return } + input.Learned = nil // ignore for this call + record, err := tgs.UpdateTG(ctx, input) if err != nil { wErr(w, r, autoError(err)) @@ -123,3 +127,36 @@ func (tga *talkgroupAPI) tgImport(w http.ResponseWriter, r *http.Request) { respond(w, r, recs) } + +func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) { + var id tgParams + err := decodeParams(&id, r) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + if id.System == nil { // don't think this would ever happen + wErr(w, r, badRequest(talkgroups.ErrNoSuchSystem)) + return + } + + ctx := r.Context() + tgs := talkgroups.StoreFrom(ctx) + + var input []database.UpsertTalkgroupParams + + err = forms.Unmarshal(r, &input, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty()) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + record, err := tgs.UpsertTGs(ctx, *id.System, input) + if err != nil { + wErr(w, r, autoError(err)) + return + } + + respond(w, r, record) +} diff --git a/pkg/talkgroups/importer/import.go b/pkg/talkgroups/importer/import.go index b616a3b..86d5190 100644 --- a/pkg/talkgroups/importer/import.go +++ b/pkg/talkgroups/importer/import.go @@ -110,6 +110,7 @@ func (rr *radioReferenceImporter) importTalkgroups(ctx context.Context, sys int, gn := groupName // must take a copy tgs = append(tgs, talkgroups.Talkgroup{ Talkgroup: database.Talkgroup{ + ID: len(tgs), // need unique ID for the UI to track TGID: int32(tgt.Talkgroup), SystemID: int32(tgt.System), Name: &fields[4], diff --git a/pkg/talkgroups/cache.go b/pkg/talkgroups/store.go similarity index 84% rename from pkg/talkgroups/cache.go rename to pkg/talkgroups/store.go index 0d58a16..a2b3593 100644 --- a/pkg/talkgroups/cache.go +++ b/pkg/talkgroups/store.go @@ -3,9 +3,11 @@ package talkgroups import ( "context" "errors" + "strings" "sync" "time" + "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" @@ -24,6 +26,9 @@ type Store interface { // UpdateTG updates a talkgroup record. UpdateTG(ctx context.Context, input database.UpdateTalkgroupParams) (*Talkgroup, error) + // UpsertTGs upserts a slice of talkgroups. + UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) + // TG retrieves a Talkgroup from the Store. TG(ctx context.Context, tg ID) (*Talkgroup, error) @@ -305,3 +310,54 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara return record, nil } + +func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*Talkgroup, error) { + db := database.FromCtx(ctx) + sysName, hasSys := t.SystemName(ctx, system) + if !hasSys { + return nil, ErrNoSuchSystem + } + sys := database.System{ + ID: system, + Name: sysName, + } + + tgs := make([]*Talkgroup, 0, len(input)) + + err := db.InTx(ctx, func(db database.Store) error { + for _, tgu := range input { + // normalize tags + for i, tag := range tgu.Tags { + tgu.Tags[i] = strings.ToLower(tag) + } + + tgu.SystemID = int32(system) + tgu.Learned = common.PtrTo(false) + tg, err := db.UpsertTalkgroup(ctx, tgu) + if err != nil { + return err + } + + tgs = append(tgs, &Talkgroup{ + Talkgroup: tg, + System: sys, + Learned: tg.Learned, + }) + } + + return nil + }, pgx.TxOptions{}) + + if err != nil { + return nil, err + } + + // update the cache + t.Lock() + defer t.Unlock() + for _, tg := range tgs { + t.tgs[TG(tg.SystemID, tg.TGID)] = tg + } + + return tgs, nil +} diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index 14b6ff2..7b9c7cc 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -86,10 +86,42 @@ SET tags = COALESCE(sqlc.narg('tags'), tags), alert = COALESCE(sqlc.narg('alert'), alert), alert_config = COALESCE(sqlc.narg('alert_config'), alert_config), - weight = COALESCE(sqlc.narg('weight'), weight) + weight = COALESCE(sqlc.narg('weight'), weight), + learned = COALESCE(sqlc.narg('learned'), learned) WHERE id = sqlc.narg('id') OR (system_id = sqlc.narg('system_id') AND tgid = sqlc.narg('tgid')) RETURNING *; +-- name: UpsertTalkgroup :one +INSERT INTO talkgroups AS tg ( + system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, alert, alert_config, weight, learned +) VALUES ( + @system_id, + @tg_id, + sqlc.narg('name'), + sqlc.narg('alpha_tag'), + sqlc.narg('tg_group'), + sqlc.narg('frequency'), + sqlc.narg('metadata'), + sqlc.narg('tags'), + sqlc.narg('alert'), + sqlc.narg('alert_config'), + sqlc.narg('weight'), + sqlc.narg('learned') +) +ON CONFLICT (system_id, tgid) DO UPDATE +SET + name = COALESCE(sqlc.narg('name'), tg.name), + alpha_tag = COALESCE(sqlc.narg('alpha_tag'), tg.alpha_tag), + tg_group = COALESCE(sqlc.narg('tg_group'), tg.tg_group), + frequency = COALESCE(sqlc.narg('frequency'), tg.frequency), + metadata = COALESCE(sqlc.narg('metadata'), tg.metadata), + tags = COALESCE(sqlc.narg('tags'), tg.tags), + alert = COALESCE(sqlc.narg('alert'), tg.alert), + alert_config = COALESCE(sqlc.narg('alert_config'), tg.alert_config), + weight = COALESCE(sqlc.narg('weight'), tg.weight), + learned = COALESCE(sqlc.narg('learned'), tg.learned) +RETURNING *; + -- name: AddTalkgroupWithLearnedFlag :exec INSERT INTO talkgroups ( system_id, @@ -98,7 +130,7 @@ INSERT INTO talkgroups ( ) VALUES( @system_id, @tgid, - 't' + TRUE ); -- name: AddLearnedTalkgroup :one