diff --git a/pkg/database/batch.go b/pkg/database/batch.go index d614580..ea9beb3 100644 --- a/pkg/database/batch.go +++ b/pkg/database/batch.go @@ -12,6 +12,7 @@ import ( "dynatron.me/x/stillbox/internal/jsontypes" "dynatron.me/x/stillbox/pkg/alerting/rules" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" ) var ( @@ -106,11 +107,11 @@ INSERT INTO talkgroups AS tg ( $5, $6, $7, - $8, - $9, + COALESCE($8, '{}'::text[])::text[], + COALESCE($9, TRUE), $10, - $11, - $12 + COALESCE($11, 1.0)::numeric, + COALESCE($12, FALSE)::boolean ) ON CONFLICT (system_id, tgid) DO UPDATE SET @@ -142,9 +143,9 @@ type UpsertTalkgroupParams struct { Frequency *int32 `json:"frequency"` Metadata jsontypes.Metadata `json:"metadata"` Tags []string `json:"tags"` - Alert *bool `json:"alert"` + Alert interface{} `json:"alert"` AlertConfig rules.AlertRules `json:"alert_config"` - Weight *float32 `json:"weight"` + Weight pgtype.Numeric `json:"weight"` Learned *bool `json:"learned"` } diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 26f5517..fa2a9e1 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -396,6 +396,54 @@ func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, str return _c } +// CreateSystem provides a mock function with given fields: ctx, iD, name +func (_m *Store) CreateSystem(ctx context.Context, iD int, name string) error { + ret := _m.Called(ctx, iD, name) + + if len(ret) == 0 { + panic("no return value specified for CreateSystem") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int, string) error); ok { + r0 = rf(ctx, iD, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_CreateSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateSystem' +type Store_CreateSystem_Call struct { + *mock.Call +} + +// CreateSystem is a helper method to define mock.On call +// - ctx context.Context +// - iD int +// - name string +func (_e *Store_Expecter) CreateSystem(ctx interface{}, iD interface{}, name interface{}) *Store_CreateSystem_Call { + return &Store_CreateSystem_Call{Call: _e.mock.On("CreateSystem", ctx, iD, name)} +} + +func (_c *Store_CreateSystem_Call) Run(run func(ctx context.Context, iD int, name string)) *Store_CreateSystem_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int), args[2].(string)) + }) + return _c +} + +func (_c *Store_CreateSystem_Call) Return(_a0 error) *Store_CreateSystem_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_CreateSystem_Call) RunAndReturn(run func(context.Context, int, string) error) *Store_CreateSystem_Call { + _c.Call.Return(run) + return _c +} + // CreateUser provides a mock function with given fields: ctx, arg func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) { ret := _m.Called(ctx, arg) @@ -594,6 +642,101 @@ func (_c *Store_DeleteAPIKey_Call) RunAndReturn(run func(context.Context, string return _c } +// DeleteSystem provides a mock function with given fields: ctx, id +func (_m *Store) DeleteSystem(ctx context.Context, id int) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for DeleteSystem") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DeleteSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteSystem' +type Store_DeleteSystem_Call struct { + *mock.Call +} + +// DeleteSystem is a helper method to define mock.On call +// - ctx context.Context +// - id int +func (_e *Store_Expecter) DeleteSystem(ctx interface{}, id interface{}) *Store_DeleteSystem_Call { + return &Store_DeleteSystem_Call{Call: _e.mock.On("DeleteSystem", ctx, id)} +} + +func (_c *Store_DeleteSystem_Call) Run(run func(ctx context.Context, id int)) *Store_DeleteSystem_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int)) + }) + return _c +} + +func (_c *Store_DeleteSystem_Call) Return(_a0 error) *Store_DeleteSystem_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DeleteSystem_Call) RunAndReturn(run func(context.Context, int) error) *Store_DeleteSystem_Call { + _c.Call.Return(run) + return _c +} + +// DeleteTalkgroup provides a mock function with given fields: ctx, systemID, tGID +func (_m *Store) DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error { + ret := _m.Called(ctx, systemID, tGID) + + if len(ret) == 0 { + panic("no return value specified for DeleteTalkgroup") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int32, int32) error); ok { + r0 = rf(ctx, systemID, tGID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DeleteTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteTalkgroup' +type Store_DeleteTalkgroup_Call struct { + *mock.Call +} + +// DeleteTalkgroup is a helper method to define mock.On call +// - ctx context.Context +// - systemID int32 +// - tGID int32 +func (_e *Store_Expecter) DeleteTalkgroup(ctx interface{}, systemID interface{}, tGID interface{}) *Store_DeleteTalkgroup_Call { + return &Store_DeleteTalkgroup_Call{Call: _e.mock.On("DeleteTalkgroup", ctx, systemID, tGID)} +} + +func (_c *Store_DeleteTalkgroup_Call) Run(run func(ctx context.Context, systemID int32, tGID int32)) *Store_DeleteTalkgroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int32), args[2].(int32)) + }) + return _c +} + +func (_c *Store_DeleteTalkgroup_Call) Return(_a0 error) *Store_DeleteTalkgroup_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DeleteTalkgroup_Call) RunAndReturn(run func(context.Context, int32, int32) error) *Store_DeleteTalkgroup_Call { + _c.Call.Return(run) + return _c +} + // DeleteUser provides a mock function with given fields: ctx, username func (_m *Store) DeleteUser(ctx context.Context, username string) error { ret := _m.Called(ctx, username) @@ -2278,6 +2421,55 @@ func (_c *Store_SetTalkgroupTags_Call) RunAndReturn(run func(context.Context, [] return _c } +// StoreDeletedTGVersion provides a mock function with given fields: ctx, systemID, tGID, submitter +func (_m *Store) StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error { + ret := _m.Called(ctx, systemID, tGID, submitter) + + if len(ret) == 0 { + panic("no return value specified for StoreDeletedTGVersion") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *int32, *int32, *int32) error); ok { + r0 = rf(ctx, systemID, tGID, submitter) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_StoreDeletedTGVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreDeletedTGVersion' +type Store_StoreDeletedTGVersion_Call struct { + *mock.Call +} + +// StoreDeletedTGVersion is a helper method to define mock.On call +// - ctx context.Context +// - systemID *int32 +// - tGID *int32 +// - submitter *int32 +func (_e *Store_Expecter) StoreDeletedTGVersion(ctx interface{}, systemID interface{}, tGID interface{}, submitter interface{}) *Store_StoreDeletedTGVersion_Call { + return &Store_StoreDeletedTGVersion_Call{Call: _e.mock.On("StoreDeletedTGVersion", ctx, systemID, tGID, submitter)} +} + +func (_c *Store_StoreDeletedTGVersion_Call) Run(run func(ctx context.Context, systemID *int32, tGID *int32, submitter *int32)) *Store_StoreDeletedTGVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*int32), args[2].(*int32), args[3].(*int32)) + }) + return _c +} + +func (_c *Store_StoreDeletedTGVersion_Call) Return(_a0 error) *Store_StoreDeletedTGVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_StoreDeletedTGVersion_Call) RunAndReturn(run func(context.Context, *int32, *int32, *int32) error) *Store_StoreDeletedTGVersion_Call { + _c.Call.Return(run) + return _c +} + // StoreTGVersion provides a mock function with given fields: ctx, arg func (_m *Store) StoreTGVersion(ctx context.Context, arg []database.StoreTGVersionParams) *database.StoreTGVersionBatchResults { ret := _m.Called(ctx, arg) diff --git a/pkg/database/models.go b/pkg/database/models.go index c61e4f9..6a6f56b 100644 --- a/pkg/database/models.go +++ b/pkg/database/models.go @@ -127,6 +127,7 @@ type TalkgroupVersion struct { ID int `json:"id,omitempty"` Time pgtype.Timestamptz `json:"time,omitempty"` CreatedBy *int32 `json:"created_by,omitempty"` + Deleted *bool `json:"deleted,omitempty"` SystemID *int32 `json:"system_id,omitempty"` TGID *int32 `json:"tgid,omitempty"` Name *string `json:"name,omitempty"` diff --git a/pkg/database/partman/partman.go b/pkg/database/partman/partman.go index a4b4cd1..ba63ea8 100644 --- a/pkg/database/partman/partman.go +++ b/pkg/database/partman/partman.go @@ -22,6 +22,8 @@ import ( const ( CallsTable = "calls" + CheckInterval = time.Hour // every 1h + preProvisionDefault = 1 ) @@ -132,7 +134,7 @@ func New(db database.Store, cfg config.Partition) (*partman, error) { var _ PartitionManager = (*partman)(nil) func (pm *partman) Go(ctx context.Context) { - tick := time.NewTicker(60 * time.Minute) + tick := time.NewTicker(CheckInterval) select { case now := <-tick.C: diff --git a/pkg/database/querier.go b/pkg/database/querier.go index 9b15132..243e87b 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -17,8 +17,11 @@ type Querier interface { AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error) + CreateSystem(ctx context.Context, iD int, name string) error CreateUser(ctx context.Context, arg CreateUserParams) (User, error) DeleteAPIKey(ctx context.Context, apiKey string) error + DeleteSystem(ctx context.Context, id int) error + DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error DeleteUser(ctx context.Context, username string) error GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetCallAudioByID(ctx context.Context, id uuid.UUID) (GetCallAudioByIDRow, error) @@ -43,6 +46,7 @@ type Querier interface { RestoreTalkgroupVersion(ctx context.Context, versionIds int) (Talkgroup, error) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error + StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) UpdatePassword(ctx context.Context, username string, password string) error diff --git a/pkg/database/talkgroups.go b/pkg/database/talkgroups.go index b52791c..faa2bed 100644 --- a/pkg/database/talkgroups.go +++ b/pkg/database/talkgroups.go @@ -15,11 +15,22 @@ type talkgroupQuerier interface { type TGTuples [2][]uint32 -const TGConstraintName = "calls_system_talkgroup_fkey" +const ( + TGConstraintName = "calls_system_talkgroup_fkey" + SysConstraintName = "talkgroups_system_id_fkey" +) func IsTGConstraintViolation(e error) bool { + return IsConstraintViolation(e, TGConstraintName) +} + +func IsSystemConstraintViolation(e error) bool { + return IsConstraintViolation(e, SysConstraintName) +} + +func IsConstraintViolation(e error, constraintName string) bool { var err *pgconn.PgError - if errors.As(e, &err) && err.Code == "23503" && err.ConstraintName == TGConstraintName { + if errors.As(e, &err) && err.Code == "23503" && err.ConstraintName == constraintName { return true } diff --git a/pkg/database/talkgroups.sql.go b/pkg/database/talkgroups.sql.go index b5731d2..2b75a8b 100644 --- a/pkg/database/talkgroups.sql.go +++ b/pkg/database/talkgroups.sql.go @@ -66,6 +66,33 @@ func (q *Queries) AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgro return i, err } +const createSystem = `-- name: CreateSystem :exec +INSERT INTO systems(id, name) VALUES($1, $2) +` + +func (q *Queries) CreateSystem(ctx context.Context, iD int, name string) error { + _, err := q.db.Exec(ctx, createSystem, iD, name) + return err +} + +const deleteSystem = `-- name: DeleteSystem :exec +DELETE FROM systems WHERE id = $1 +` + +func (q *Queries) DeleteSystem(ctx context.Context, id int) error { + _, err := q.db.Exec(ctx, deleteSystem, id) + return err +} + +const deleteTalkgroup = `-- name: DeleteTalkgroup :exec +DELETE FROM talkgroups WHERE system_id = $1 AND tgid = $2 +` + +func (q *Queries) DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error { + _, err := q.db.Exec(ctx, deleteTalkgroup, systemID, tGID) + return err +} + const getSystemName = `-- name: GetSystemName :one SELECT name FROM systems WHERE id = $1 ` @@ -584,6 +611,21 @@ func (q *Queries) SetTalkgroupTags(ctx context.Context, tags []string, systemID return err } +const storeDeletedTGVersion = `-- name: StoreDeletedTGVersion :exec +INSERT INTO talkgroup_versions( + system_id, + tgid, + time, + created_by, + deleted +) VALUES($1, $2, NOW(), $3, TRUE) +` + +func (q *Queries) StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error { + _, err := q.db.Exec(ctx, storeDeletedTGVersion, systemID, tGID, submitter) + return err +} + const updateTalkgroup = `-- name: UpdateTalkgroup :one UPDATE talkgroups SET diff --git a/pkg/rest/api.go b/pkg/rest/api.go index b9bcac1..931f511 100644 --- a/pkg/rest/api.go +++ b/pkg/rest/api.go @@ -68,6 +68,14 @@ func badRequestErrText(err error) render.Renderer { } } +func constraintErrText(err error) render.Renderer { + return &errResponse{ + Err: err, + Code: http.StatusConflict, + Error: "Constraint violation: " + err.Error(), + } +} + func recordNotFound(err error) render.Renderer { return &errResponse{ Err: err, @@ -99,6 +107,10 @@ var statusMapping = map[error]errResponder{ tgstore.ErrNotFound: notFoundErrText, tgstore.ErrInvalidOrderBy: badRequestErrText, pgx.ErrNoRows: recordNotFound, + ErrMissingTGSys: badRequestErrText, + ErrTGIDMismatch: badRequestErrText, + ErrSysMismatch: badRequestErrText, + tgstore.ErrReference: constraintErrText, } func autoError(err error) render.Renderer { diff --git a/pkg/rest/talkgroups.go b/pkg/rest/talkgroups.go index a10ae6d..52cdd8a 100644 --- a/pkg/rest/talkgroups.go +++ b/pkg/rest/talkgroups.go @@ -1,6 +1,7 @@ package rest import ( + "errors" "fmt" "net/http" @@ -13,6 +14,14 @@ import ( "github.com/go-chi/chi/v5" ) +var ( + ErrMissingTGSys = errors.New("missing talkgroup ID and system ID") + ErrTGIDMismatch = errors.New("url talkgroup ID and document talkgroup ID mismatch") + ErrSysMismatch = errors.New("url system ID and document system ID mismatch") + ErrNoSuchSystem = tgstore.ErrNoSuchSystem + ErrBadSystem = errors.New("invalid system") +) + const DefaultPerPage = 20 type talkgroupAPI struct { @@ -26,11 +35,15 @@ func (tga *talkgroupAPI) Subrouter() http.Handler { r.Get("/", tga.get) r.Put(`/{system:\d+}/{id:\d+}`, tga.put) - r.Put(`/{system:\d+}`, tga.putTalkgroups) + r.Put(`/{system:\d+}/`, tga.putTalkgroups) + r.Put(`/{system:\d+}`, tga.putSystem) r.Post(`/{system:\d+}/`, tga.postPaginated) r.Post(`/`, tga.postPaginated) + r.Delete(`/{system:\d+}`, tga.deleteSystem) + r.Delete(`/{system:\d+}/{id:\d+}`, tga.deleteTalkgroup) + r.Post("/import", tga.tgImport) r.Post("/export", tga.tgExport) @@ -79,7 +92,7 @@ func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) { case p.hasBoth(): res, err = tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID)) case p.System != nil: - res, err = tgs.SystemTGs(ctx, int32(*p.System)) + res, err = tgs.SystemTGs(ctx, *p.System) default: // get all talkgroups res, err = tgs.TGs(ctx, nil) @@ -118,7 +131,7 @@ func (tga *talkgroupAPI) postPaginated(w http.ResponseWriter, r *http.Request) { }{} switch { case p.System != nil: - res.Talkgroups, err = tgs.SystemTGs(ctx, int32(*p.System), tgstore.WithPagination(input, DefaultPerPage, &res.Count)) + res.Talkgroups, err = tgs.SystemTGs(ctx, *p.System, tgstore.WithPagination(input, DefaultPerPage, &res.Count)) default: // get all talkgroups res.Talkgroups, err = tgs.TGs(ctx, nil, tgstore.WithPagination(input, DefaultPerPage, &res.Count)) @@ -143,7 +156,7 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) { ctx := r.Context() tgs := tgstore.FromCtx(ctx) - input := database.UpdateTalkgroupParams{} + input := database.UpsertTalkgroupParams{} err = forms.Unmarshal(r, &input, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty()) if err != nil { @@ -151,15 +164,83 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) { return } + if !id.hasBoth() { + wErr(w, r, autoError(ErrMissingTGSys)) + return + } + + if input.TGID != 0 && input.TGID != int32(*id.ID) { + wErr(w, r, autoError(ErrTGIDMismatch)) + return + } + + if input.SystemID != 0 && input.SystemID != int32(*id.System) { + wErr(w, r, autoError(ErrSysMismatch)) + return + } + + input.SystemID = int32(*id.System) + input.TGID = int32(*id.ID) + input.Learned = nil // ignore for this call - record, err := tgs.UpdateTG(ctx, input) + record, err := tgs.UpsertTGs(ctx, *id.System, []database.UpsertTalkgroupParams{input}) if err != nil { wErr(w, r, autoError(err)) return } - respond(w, r, record) + respond(w, r, record[0]) +} + +func (tga *talkgroupAPI) deleteTalkgroup(w http.ResponseWriter, r *http.Request) { + var id tgParams + err := decodeParams(&id, r) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + if !id.hasBoth() { + wErr(w, r, badRequest(ErrMissingTGSys)) + return + } + + ctx := r.Context() + tgs := tgstore.FromCtx(ctx) + + err = tgs.DeleteTG(ctx, id.ToID()) + if err != nil { + wErr(w, r, autoError(err)) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (tga *talkgroupAPI) deleteSystem(w http.ResponseWriter, r *http.Request) { + var id tgParams + err := decodeParams(&id, r) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + if id.System == nil { + wErr(w, r, badRequest(ErrNoSuchSystem)) + return + } + + ctx := r.Context() + tgs := tgstore.FromCtx(ctx) + + err = tgs.DeleteSystem(ctx, *id.System) + if err != nil { + wErr(w, r, autoError(err)) + return + } + + w.WriteHeader(http.StatusNoContent) } func (tga *talkgroupAPI) tgExport(w http.ResponseWriter, r *http.Request) { @@ -230,3 +311,36 @@ func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) { respond(w, r, record) } + +func (tga *talkgroupAPI) putSystem(w http.ResponseWriter, r *http.Request) { + var id tgParams + err := decodeParams(&id, r) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + if id.System == nil { // don't think this would ever happen + wErr(w, r, badRequest(ErrBadSystem)) + return + } + + ctx := r.Context() + tgs := tgstore.FromCtx(ctx) + + var sysName string + + err = forms.Unmarshal(r, &sysName, forms.WithTag("json"), forms.WithAcceptBlank()) + if err != nil { + wErr(w, r, badRequest(err)) + return + } + + err = tgs.CreateSystem(ctx, *id.System, sysName) + if err != nil { + wErr(w, r, autoError(err)) + return + } + + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/talkgroups/tgstore/store.go b/pkg/talkgroups/tgstore/store.go index 0cbb1bb..07c6339 100644 --- a/pkg/talkgroups/tgstore/store.go +++ b/pkg/talkgroups/tgstore/store.go @@ -24,6 +24,7 @@ var ( ErrNotFound = errors.New("talkgroup not found") ErrNoSuchSystem = errors.New("no such system") ErrInvalidOrderBy = errors.New("invalid pagination orderBy value") + ErrReference = errors.New("item is still referenced, cannot delete") ) type Store interface { @@ -33,6 +34,9 @@ type Store interface { // UpsertTGs upserts a slice of talkgroups. UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error) + // CreateSystem creates a new system with the specified name and ID. + CreateSystem(ctx context.Context, id int, name string) error + // TG retrieves a Talkgroup from the Store. TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error) @@ -43,7 +47,13 @@ type Store interface { LearnTG(ctx context.Context, call *calls.Call) (*tgsp.Talkgroup, error) // SystemTGs retrieves all Talkgroups associated with a System. - SystemTGs(ctx context.Context, systemID int32, opts ...option) ([]*tgsp.Talkgroup, error) + SystemTGs(ctx context.Context, systemID int, opts ...option) ([]*tgsp.Talkgroup, error) + + // DeleteTG deletes a talkgroup record. + DeleteTG(ctx context.Context, id tgsp.ID) error + + // DeleteSystem deletes a system. The system must have no talkgroups or incidents. + DeleteSystem(ctx context.Context, id int) error // SystemName retrieves a system name from the store. It returns the record and whether one was found. SystemName(ctx context.Context, id int) (string, bool) @@ -139,6 +149,10 @@ func (t *cache) HUP(_ *config.Config) { func (t *cache) Invalidate() { t.Lock() defer t.Unlock() + t.invalidate() +} + +func (t *cache) invalidate() { clear(t.tgs) clear(t.systems) } @@ -146,20 +160,24 @@ func (t *cache) Invalidate() { type cache struct { sync.RWMutex tgs tgMap - systems map[int32]string + systems map[int]string } // NewCache returns a new cache Store. func NewCache() *cache { tgc := &cache{ tgs: make(tgMap), - systems: make(map[int32]string), + systems: make(map[int]string), } return tgc } func (t *cache) Hint(ctx context.Context, tgs []tgsp.ID) error { + if len(tgs) < 1 { + return nil + } + t.RLock() var toLoad database.TGTuples if len(t.tgs) > len(tgs)/2 { // TODO: instrument this @@ -206,7 +224,11 @@ func (t *cache) add(rec *tgsp.Talkgroup) { func (t *cache) addNoLock(rec *tgsp.Talkgroup) { tg := tgsp.TG(rec.System.ID, rec.Talkgroup.TGID) t.tgs[tg] = rec - t.systems[int32(rec.System.ID)] = rec.System.Name + t.systems[rec.System.ID] = rec.System.Name +} + +func (t *cache) addSysNoLock(id int, name string) { + t.systems[id] = name } type rowType interface { @@ -346,20 +368,20 @@ func (t *cache) Weight(ctx context.Context, id tgsp.ID, tm time.Time) float64 { return float64(m) } -func (t *cache) SystemTGs(ctx context.Context, systemID int32, opts ...option) ([]*tgsp.Talkgroup, error) { +func (t *cache) SystemTGs(ctx context.Context, systemID int, opts ...option) ([]*tgsp.Talkgroup, error) { db := database.FromCtx(ctx) opt := sOpt(opts) var err error if opt.pagination != nil { offset, perPage := opt.pagination.OffsetPerPage(opt.perPageDefault) - recs, err := db.GetTalkgroupsWithLearnedBySystemP(ctx, systemID, offset, perPage) + recs, err := db.GetTalkgroupsWithLearnedBySystemP(ctx, int32(systemID), offset, perPage) if err != nil { return nil, err } return addToRowList(t, recs), nil } - recs, err := db.GetTalkgroupsWithLearnedBySystem(ctx, systemID) + recs, err := db.GetTalkgroupsWithLearnedBySystem(ctx, int32(systemID)) if err != nil { return nil, err } @@ -391,7 +413,7 @@ func (t *cache) TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error) { func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) { t.RLock() - n, has := t.systems[int32(id)] + n, has := t.systems[id] t.RUnlock() if !has { @@ -401,7 +423,7 @@ func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) } t.Lock() - t.systems[int32(id)] = sys + t.systems[id] = sys t.Unlock() return sys, true @@ -415,8 +437,30 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara if !has { return nil, ErrNoSuchSystem } + db := database.FromCtx(ctx) + var tg database.Talkgroup + err := db.InTx(ctx, func(db database.Store) error { + var oerr error + tg, oerr = db.UpdateTalkgroup(ctx, input) + if oerr != nil { + return oerr + } + versionBatch := db.StoreTGVersion(ctx, []database.StoreTGVersionParams{{ + Submitter: auth.UIDFrom(ctx), + TGID: *input.TGID, + }}) + defer versionBatch.Close() + + versionBatch.Exec(func(_ int, err error) { + if err != nil { + oerr = err + return + } + }) + + return oerr + }, pgx.TxOptions{}) - tg, err := database.FromCtx(ctx).UpdateTalkgroup(ctx, input) if err != nil { return nil, err } @@ -430,6 +474,49 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara return record, nil } +func (t *cache) DeleteSystem(ctx context.Context, id int) error { + t.Lock() + defer t.Unlock() + + t.invalidate() + + err := database.FromCtx(ctx).DeleteSystem(ctx, id) + switch { + case err == nil: + return nil + case database.IsSystemConstraintViolation(err): + return ErrReference + } + + return err +} + +func (t *cache) DeleteTG(ctx context.Context, id tgsp.ID) error { + t.Lock() + defer t.Unlock() + + err := database.FromCtx(ctx).InTx(ctx, func(db database.Store) error { + err := db.StoreDeletedTGVersion(ctx, common.PtrTo(int32(id.System)), common.PtrTo(int32(id.Talkgroup)), auth.UIDFrom(ctx)) + if err != nil { + return err + } + + return db.DeleteTalkgroup(ctx, int32(id.System), int32(id.Talkgroup)) + }, pgx.TxOptions{}) + + switch { + case err == nil: + case database.IsTGConstraintViolation(err): + return ErrReference + default: + return err + } + + delete(t.tgs, id) + + return nil +} + func (t *cache) LearnTG(ctx context.Context, c *calls.Call) (*tgsp.Talkgroup, error) { db := database.FromCtx(ctx) @@ -538,3 +625,12 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse return tgs, nil } + +func (t *cache) CreateSystem(ctx context.Context, id int, name string) error { + t.Lock() + defer t.Unlock() + + t.addSysNoLock(id, name) + + return database.FromCtx(ctx).CreateSystem(ctx, id, name) +} diff --git a/pkg/talkgroups/xport/export.go b/pkg/talkgroups/xport/export.go index 330e256..c5c4863 100644 --- a/pkg/talkgroups/xport/export.go +++ b/pkg/talkgroups/xport/export.go @@ -29,7 +29,7 @@ func (ej *ExportJob) Export(ctx context.Context, w io.Writer) error { var err error tgst := tgstore.FromCtx(ctx) if ej.TalkgroupFilter.IsEmpty() { - tgs, err = tgst.SystemTGs(ctx, int32(ej.SystemID)) + tgs, err = tgst.SystemTGs(ctx, ej.SystemID) if err != nil { return err } diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index c63ab78..0b58df5 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS systems( name TEXT NOT NULL ); +-- NB: if the column defaults are updated here, they must also be updated in the UpsertTalkgroup query CREATE TABLE IF NOT EXISTS talkgroups( id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, system_id INT4 REFERENCES systems(id) NOT NULL, @@ -50,8 +51,9 @@ CREATE TABLE IF NOT EXISTS talkgroup_versions( id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY, time TIMESTAMPTZ NOT NULL, created_by INTEGER REFERENCES users(id), + deleted BOOLEAN, -- talkgroup snapshot - system_id INT4 REFERENCES systems(id), + system_id INT4, tgid INT4, name TEXT, alpha_tag TEXT, diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index d3310e0..572bb7b 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -103,11 +103,11 @@ INSERT INTO talkgroups AS tg ( sqlc.narg('tg_group'), sqlc.narg('frequency'), sqlc.narg('metadata'), - sqlc.narg('tags'), - sqlc.narg('alert'), + COALESCE(sqlc.narg('tags'), '{}'::text[])::text[], + COALESCE(sqlc.narg('alert'), TRUE), sqlc.narg('alert_config'), - sqlc.narg('weight'), - sqlc.narg('learned') + COALESCE(sqlc.narg('weight'), 1.0)::numeric, + COALESCE(sqlc.narg('learned'), FALSE)::boolean ) ON CONFLICT (system_id, tgid) DO UPDATE SET @@ -212,3 +212,21 @@ FROM talkgroup_versions tgv ON CONFLICT (system_id, tgid) DO UPDATE SET ignored = excluded.ignored WHERE tgv.id = ANY(@version_ids) RETURNING *; + +-- name: DeleteTalkgroup :exec +DELETE FROM talkgroups WHERE system_id = @system_id AND tgid = @tg_id; + +-- name: StoreDeletedTGVersion :exec +INSERT INTO talkgroup_versions( + system_id, + tgid, + time, + created_by, + deleted +) VALUES(@system_id, @tg_id, NOW(), @submitter, TRUE); + +-- name: CreateSystem :exec +INSERT INTO systems(id, name) VALUES(@id, @name); + +-- name: DeleteSystem :exec +DELETE FROM systems WHERE id = @id;