Delete talkgroup and system (#69)

Reviewed-on: #69
Co-authored-by: Daniel Ponte <amigan@gmail.com>
Co-committed-by: Daniel Ponte <amigan@gmail.com>
This commit is contained in:
Daniel Ponte 2024-12-10 19:10:25 -05:00 committed by amigan
parent 0608166d46
commit 1664c26e14
13 changed files with 526 additions and 31 deletions

View file

@ -12,6 +12,7 @@ import (
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/alerting/rules"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
var (
@ -106,11 +107,11 @@ INSERT INTO talkgroups AS tg (
$5,
$6,
$7,
$8,
$9,
COALESCE($8, '{}'::text[])::text[],
COALESCE($9, TRUE),
$10,
$11,
$12
COALESCE($11, 1.0)::numeric,
COALESCE($12, FALSE)::boolean
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
@ -142,9 +143,9 @@ type UpsertTalkgroupParams struct {
Frequency *int32 `json:"frequency"`
Metadata jsontypes.Metadata `json:"metadata"`
Tags []string `json:"tags"`
Alert *bool `json:"alert"`
Alert interface{} `json:"alert"`
AlertConfig rules.AlertRules `json:"alert_config"`
Weight *float32 `json:"weight"`
Weight pgtype.Numeric `json:"weight"`
Learned *bool `json:"learned"`
}

View file

@ -396,6 +396,54 @@ func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, str
return _c
}
// CreateSystem provides a mock function with given fields: ctx, iD, name
func (_m *Store) CreateSystem(ctx context.Context, iD int, name string) error {
ret := _m.Called(ctx, iD, name)
if len(ret) == 0 {
panic("no return value specified for CreateSystem")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int, string) error); ok {
r0 = rf(ctx, iD, name)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_CreateSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateSystem'
type Store_CreateSystem_Call struct {
*mock.Call
}
// CreateSystem is a helper method to define mock.On call
// - ctx context.Context
// - iD int
// - name string
func (_e *Store_Expecter) CreateSystem(ctx interface{}, iD interface{}, name interface{}) *Store_CreateSystem_Call {
return &Store_CreateSystem_Call{Call: _e.mock.On("CreateSystem", ctx, iD, name)}
}
func (_c *Store_CreateSystem_Call) Run(run func(ctx context.Context, iD int, name string)) *Store_CreateSystem_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int), args[2].(string))
})
return _c
}
func (_c *Store_CreateSystem_Call) Return(_a0 error) *Store_CreateSystem_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_CreateSystem_Call) RunAndReturn(run func(context.Context, int, string) error) *Store_CreateSystem_Call {
_c.Call.Return(run)
return _c
}
// CreateUser provides a mock function with given fields: ctx, arg
func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) {
ret := _m.Called(ctx, arg)
@ -594,6 +642,101 @@ func (_c *Store_DeleteAPIKey_Call) RunAndReturn(run func(context.Context, string
return _c
}
// DeleteSystem provides a mock function with given fields: ctx, id
func (_m *Store) DeleteSystem(ctx context.Context, id int) error {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for DeleteSystem")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int) error); ok {
r0 = rf(ctx, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_DeleteSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteSystem'
type Store_DeleteSystem_Call struct {
*mock.Call
}
// DeleteSystem is a helper method to define mock.On call
// - ctx context.Context
// - id int
func (_e *Store_Expecter) DeleteSystem(ctx interface{}, id interface{}) *Store_DeleteSystem_Call {
return &Store_DeleteSystem_Call{Call: _e.mock.On("DeleteSystem", ctx, id)}
}
func (_c *Store_DeleteSystem_Call) Run(run func(ctx context.Context, id int)) *Store_DeleteSystem_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int))
})
return _c
}
func (_c *Store_DeleteSystem_Call) Return(_a0 error) *Store_DeleteSystem_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_DeleteSystem_Call) RunAndReturn(run func(context.Context, int) error) *Store_DeleteSystem_Call {
_c.Call.Return(run)
return _c
}
// DeleteTalkgroup provides a mock function with given fields: ctx, systemID, tGID
func (_m *Store) DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error {
ret := _m.Called(ctx, systemID, tGID)
if len(ret) == 0 {
panic("no return value specified for DeleteTalkgroup")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int32, int32) error); ok {
r0 = rf(ctx, systemID, tGID)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_DeleteTalkgroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteTalkgroup'
type Store_DeleteTalkgroup_Call struct {
*mock.Call
}
// DeleteTalkgroup is a helper method to define mock.On call
// - ctx context.Context
// - systemID int32
// - tGID int32
func (_e *Store_Expecter) DeleteTalkgroup(ctx interface{}, systemID interface{}, tGID interface{}) *Store_DeleteTalkgroup_Call {
return &Store_DeleteTalkgroup_Call{Call: _e.mock.On("DeleteTalkgroup", ctx, systemID, tGID)}
}
func (_c *Store_DeleteTalkgroup_Call) Run(run func(ctx context.Context, systemID int32, tGID int32)) *Store_DeleteTalkgroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int32), args[2].(int32))
})
return _c
}
func (_c *Store_DeleteTalkgroup_Call) Return(_a0 error) *Store_DeleteTalkgroup_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_DeleteTalkgroup_Call) RunAndReturn(run func(context.Context, int32, int32) error) *Store_DeleteTalkgroup_Call {
_c.Call.Return(run)
return _c
}
// DeleteUser provides a mock function with given fields: ctx, username
func (_m *Store) DeleteUser(ctx context.Context, username string) error {
ret := _m.Called(ctx, username)
@ -2278,6 +2421,55 @@ func (_c *Store_SetTalkgroupTags_Call) RunAndReturn(run func(context.Context, []
return _c
}
// StoreDeletedTGVersion provides a mock function with given fields: ctx, systemID, tGID, submitter
func (_m *Store) StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error {
ret := _m.Called(ctx, systemID, tGID, submitter)
if len(ret) == 0 {
panic("no return value specified for StoreDeletedTGVersion")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *int32, *int32, *int32) error); ok {
r0 = rf(ctx, systemID, tGID, submitter)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_StoreDeletedTGVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreDeletedTGVersion'
type Store_StoreDeletedTGVersion_Call struct {
*mock.Call
}
// StoreDeletedTGVersion is a helper method to define mock.On call
// - ctx context.Context
// - systemID *int32
// - tGID *int32
// - submitter *int32
func (_e *Store_Expecter) StoreDeletedTGVersion(ctx interface{}, systemID interface{}, tGID interface{}, submitter interface{}) *Store_StoreDeletedTGVersion_Call {
return &Store_StoreDeletedTGVersion_Call{Call: _e.mock.On("StoreDeletedTGVersion", ctx, systemID, tGID, submitter)}
}
func (_c *Store_StoreDeletedTGVersion_Call) Run(run func(ctx context.Context, systemID *int32, tGID *int32, submitter *int32)) *Store_StoreDeletedTGVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*int32), args[2].(*int32), args[3].(*int32))
})
return _c
}
func (_c *Store_StoreDeletedTGVersion_Call) Return(_a0 error) *Store_StoreDeletedTGVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_StoreDeletedTGVersion_Call) RunAndReturn(run func(context.Context, *int32, *int32, *int32) error) *Store_StoreDeletedTGVersion_Call {
_c.Call.Return(run)
return _c
}
// StoreTGVersion provides a mock function with given fields: ctx, arg
func (_m *Store) StoreTGVersion(ctx context.Context, arg []database.StoreTGVersionParams) *database.StoreTGVersionBatchResults {
ret := _m.Called(ctx, arg)

View file

@ -127,6 +127,7 @@ type TalkgroupVersion struct {
ID int `json:"id,omitempty"`
Time pgtype.Timestamptz `json:"time,omitempty"`
CreatedBy *int32 `json:"created_by,omitempty"`
Deleted *bool `json:"deleted,omitempty"`
SystemID *int32 `json:"system_id,omitempty"`
TGID *int32 `json:"tgid,omitempty"`
Name *string `json:"name,omitempty"`

View file

@ -22,6 +22,8 @@ import (
const (
CallsTable = "calls"
CheckInterval = time.Hour // every 1h
preProvisionDefault = 1
)
@ -132,7 +134,7 @@ func New(db database.Store, cfg config.Partition) (*partman, error) {
var _ PartitionManager = (*partman)(nil)
func (pm *partman) Go(ctx context.Context) {
tick := time.NewTicker(60 * time.Minute)
tick := time.NewTicker(CheckInterval)
select {
case now := <-tick.C:

View file

@ -17,8 +17,11 @@ type Querier interface {
AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error)
CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error)
CreateSystem(ctx context.Context, iD int, name string) error
CreateUser(ctx context.Context, arg CreateUserParams) (User, error)
DeleteAPIKey(ctx context.Context, apiKey string) error
DeleteSystem(ctx context.Context, id int) error
DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error
DeleteUser(ctx context.Context, username string) error
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetCallAudioByID(ctx context.Context, id uuid.UUID) (GetCallAudioByIDRow, error)
@ -43,6 +46,7 @@ type Querier interface {
RestoreTalkgroupVersion(ctx context.Context, versionIds int) (Talkgroup, error)
SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error
StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults
SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
UpdatePassword(ctx context.Context, username string, password string) error

View file

@ -15,11 +15,22 @@ type talkgroupQuerier interface {
type TGTuples [2][]uint32
const TGConstraintName = "calls_system_talkgroup_fkey"
const (
TGConstraintName = "calls_system_talkgroup_fkey"
SysConstraintName = "talkgroups_system_id_fkey"
)
func IsTGConstraintViolation(e error) bool {
return IsConstraintViolation(e, TGConstraintName)
}
func IsSystemConstraintViolation(e error) bool {
return IsConstraintViolation(e, SysConstraintName)
}
func IsConstraintViolation(e error, constraintName string) bool {
var err *pgconn.PgError
if errors.As(e, &err) && err.Code == "23503" && err.ConstraintName == TGConstraintName {
if errors.As(e, &err) && err.Code == "23503" && err.ConstraintName == constraintName {
return true
}

View file

@ -66,6 +66,33 @@ func (q *Queries) AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgro
return i, err
}
const createSystem = `-- name: CreateSystem :exec
INSERT INTO systems(id, name) VALUES($1, $2)
`
func (q *Queries) CreateSystem(ctx context.Context, iD int, name string) error {
_, err := q.db.Exec(ctx, createSystem, iD, name)
return err
}
const deleteSystem = `-- name: DeleteSystem :exec
DELETE FROM systems WHERE id = $1
`
func (q *Queries) DeleteSystem(ctx context.Context, id int) error {
_, err := q.db.Exec(ctx, deleteSystem, id)
return err
}
const deleteTalkgroup = `-- name: DeleteTalkgroup :exec
DELETE FROM talkgroups WHERE system_id = $1 AND tgid = $2
`
func (q *Queries) DeleteTalkgroup(ctx context.Context, systemID int32, tGID int32) error {
_, err := q.db.Exec(ctx, deleteTalkgroup, systemID, tGID)
return err
}
const getSystemName = `-- name: GetSystemName :one
SELECT name FROM systems WHERE id = $1
`
@ -584,6 +611,21 @@ func (q *Queries) SetTalkgroupTags(ctx context.Context, tags []string, systemID
return err
}
const storeDeletedTGVersion = `-- name: StoreDeletedTGVersion :exec
INSERT INTO talkgroup_versions(
system_id,
tgid,
time,
created_by,
deleted
) VALUES($1, $2, NOW(), $3, TRUE)
`
func (q *Queries) StoreDeletedTGVersion(ctx context.Context, systemID *int32, tGID *int32, submitter *int32) error {
_, err := q.db.Exec(ctx, storeDeletedTGVersion, systemID, tGID, submitter)
return err
}
const updateTalkgroup = `-- name: UpdateTalkgroup :one
UPDATE talkgroups
SET

View file

@ -68,6 +68,14 @@ func badRequestErrText(err error) render.Renderer {
}
}
func constraintErrText(err error) render.Renderer {
return &errResponse{
Err: err,
Code: http.StatusConflict,
Error: "Constraint violation: " + err.Error(),
}
}
func recordNotFound(err error) render.Renderer {
return &errResponse{
Err: err,
@ -99,6 +107,10 @@ var statusMapping = map[error]errResponder{
tgstore.ErrNotFound: notFoundErrText,
tgstore.ErrInvalidOrderBy: badRequestErrText,
pgx.ErrNoRows: recordNotFound,
ErrMissingTGSys: badRequestErrText,
ErrTGIDMismatch: badRequestErrText,
ErrSysMismatch: badRequestErrText,
tgstore.ErrReference: constraintErrText,
}
func autoError(err error) render.Renderer {

View file

@ -1,6 +1,7 @@
package rest
import (
"errors"
"fmt"
"net/http"
@ -13,6 +14,14 @@ import (
"github.com/go-chi/chi/v5"
)
var (
ErrMissingTGSys = errors.New("missing talkgroup ID and system ID")
ErrTGIDMismatch = errors.New("url talkgroup ID and document talkgroup ID mismatch")
ErrSysMismatch = errors.New("url system ID and document system ID mismatch")
ErrNoSuchSystem = tgstore.ErrNoSuchSystem
ErrBadSystem = errors.New("invalid system")
)
const DefaultPerPage = 20
type talkgroupAPI struct {
@ -26,11 +35,15 @@ func (tga *talkgroupAPI) Subrouter() http.Handler {
r.Get("/", tga.get)
r.Put(`/{system:\d+}/{id:\d+}`, tga.put)
r.Put(`/{system:\d+}`, tga.putTalkgroups)
r.Put(`/{system:\d+}/`, tga.putTalkgroups)
r.Put(`/{system:\d+}`, tga.putSystem)
r.Post(`/{system:\d+}/`, tga.postPaginated)
r.Post(`/`, tga.postPaginated)
r.Delete(`/{system:\d+}`, tga.deleteSystem)
r.Delete(`/{system:\d+}/{id:\d+}`, tga.deleteTalkgroup)
r.Post("/import", tga.tgImport)
r.Post("/export", tga.tgExport)
@ -79,7 +92,7 @@ func (tga *talkgroupAPI) get(w http.ResponseWriter, r *http.Request) {
case p.hasBoth():
res, err = tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID))
case p.System != nil:
res, err = tgs.SystemTGs(ctx, int32(*p.System))
res, err = tgs.SystemTGs(ctx, *p.System)
default:
// get all talkgroups
res, err = tgs.TGs(ctx, nil)
@ -118,7 +131,7 @@ func (tga *talkgroupAPI) postPaginated(w http.ResponseWriter, r *http.Request) {
}{}
switch {
case p.System != nil:
res.Talkgroups, err = tgs.SystemTGs(ctx, int32(*p.System), tgstore.WithPagination(input, DefaultPerPage, &res.Count))
res.Talkgroups, err = tgs.SystemTGs(ctx, *p.System, tgstore.WithPagination(input, DefaultPerPage, &res.Count))
default:
// get all talkgroups
res.Talkgroups, err = tgs.TGs(ctx, nil, tgstore.WithPagination(input, DefaultPerPage, &res.Count))
@ -143,7 +156,7 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
tgs := tgstore.FromCtx(ctx)
input := database.UpdateTalkgroupParams{}
input := database.UpsertTalkgroupParams{}
err = forms.Unmarshal(r, &input, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty())
if err != nil {
@ -151,15 +164,83 @@ func (tga *talkgroupAPI) put(w http.ResponseWriter, r *http.Request) {
return
}
if !id.hasBoth() {
wErr(w, r, autoError(ErrMissingTGSys))
return
}
if input.TGID != 0 && input.TGID != int32(*id.ID) {
wErr(w, r, autoError(ErrTGIDMismatch))
return
}
if input.SystemID != 0 && input.SystemID != int32(*id.System) {
wErr(w, r, autoError(ErrSysMismatch))
return
}
input.SystemID = int32(*id.System)
input.TGID = int32(*id.ID)
input.Learned = nil // ignore for this call
record, err := tgs.UpdateTG(ctx, input)
record, err := tgs.UpsertTGs(ctx, *id.System, []database.UpsertTalkgroupParams{input})
if err != nil {
wErr(w, r, autoError(err))
return
}
respond(w, r, record)
respond(w, r, record[0])
}
func (tga *talkgroupAPI) deleteTalkgroup(w http.ResponseWriter, r *http.Request) {
var id tgParams
err := decodeParams(&id, r)
if err != nil {
wErr(w, r, badRequest(err))
return
}
if !id.hasBoth() {
wErr(w, r, badRequest(ErrMissingTGSys))
return
}
ctx := r.Context()
tgs := tgstore.FromCtx(ctx)
err = tgs.DeleteTG(ctx, id.ToID())
if err != nil {
wErr(w, r, autoError(err))
return
}
w.WriteHeader(http.StatusNoContent)
}
func (tga *talkgroupAPI) deleteSystem(w http.ResponseWriter, r *http.Request) {
var id tgParams
err := decodeParams(&id, r)
if err != nil {
wErr(w, r, badRequest(err))
return
}
if id.System == nil {
wErr(w, r, badRequest(ErrNoSuchSystem))
return
}
ctx := r.Context()
tgs := tgstore.FromCtx(ctx)
err = tgs.DeleteSystem(ctx, *id.System)
if err != nil {
wErr(w, r, autoError(err))
return
}
w.WriteHeader(http.StatusNoContent)
}
func (tga *talkgroupAPI) tgExport(w http.ResponseWriter, r *http.Request) {
@ -230,3 +311,36 @@ func (tga *talkgroupAPI) putTalkgroups(w http.ResponseWriter, r *http.Request) {
respond(w, r, record)
}
func (tga *talkgroupAPI) putSystem(w http.ResponseWriter, r *http.Request) {
var id tgParams
err := decodeParams(&id, r)
if err != nil {
wErr(w, r, badRequest(err))
return
}
if id.System == nil { // don't think this would ever happen
wErr(w, r, badRequest(ErrBadSystem))
return
}
ctx := r.Context()
tgs := tgstore.FromCtx(ctx)
var sysName string
err = forms.Unmarshal(r, &sysName, forms.WithTag("json"), forms.WithAcceptBlank())
if err != nil {
wErr(w, r, badRequest(err))
return
}
err = tgs.CreateSystem(ctx, *id.System, sysName)
if err != nil {
wErr(w, r, autoError(err))
return
}
w.WriteHeader(http.StatusNoContent)
}

View file

@ -24,6 +24,7 @@ var (
ErrNotFound = errors.New("talkgroup not found")
ErrNoSuchSystem = errors.New("no such system")
ErrInvalidOrderBy = errors.New("invalid pagination orderBy value")
ErrReference = errors.New("item is still referenced, cannot delete")
)
type Store interface {
@ -33,6 +34,9 @@ type Store interface {
// UpsertTGs upserts a slice of talkgroups.
UpsertTGs(ctx context.Context, system int, input []database.UpsertTalkgroupParams) ([]*tgsp.Talkgroup, error)
// CreateSystem creates a new system with the specified name and ID.
CreateSystem(ctx context.Context, id int, name string) error
// TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error)
@ -43,7 +47,13 @@ type Store interface {
LearnTG(ctx context.Context, call *calls.Call) (*tgsp.Talkgroup, error)
// SystemTGs retrieves all Talkgroups associated with a System.
SystemTGs(ctx context.Context, systemID int32, opts ...option) ([]*tgsp.Talkgroup, error)
SystemTGs(ctx context.Context, systemID int, opts ...option) ([]*tgsp.Talkgroup, error)
// DeleteTG deletes a talkgroup record.
DeleteTG(ctx context.Context, id tgsp.ID) error
// DeleteSystem deletes a system. The system must have no talkgroups or incidents.
DeleteSystem(ctx context.Context, id int) error
// SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool)
@ -139,6 +149,10 @@ func (t *cache) HUP(_ *config.Config) {
func (t *cache) Invalidate() {
t.Lock()
defer t.Unlock()
t.invalidate()
}
func (t *cache) invalidate() {
clear(t.tgs)
clear(t.systems)
}
@ -146,20 +160,24 @@ func (t *cache) Invalidate() {
type cache struct {
sync.RWMutex
tgs tgMap
systems map[int32]string
systems map[int]string
}
// NewCache returns a new cache Store.
func NewCache() *cache {
tgc := &cache{
tgs: make(tgMap),
systems: make(map[int32]string),
systems: make(map[int]string),
}
return tgc
}
func (t *cache) Hint(ctx context.Context, tgs []tgsp.ID) error {
if len(tgs) < 1 {
return nil
}
t.RLock()
var toLoad database.TGTuples
if len(t.tgs) > len(tgs)/2 { // TODO: instrument this
@ -206,7 +224,11 @@ func (t *cache) add(rec *tgsp.Talkgroup) {
func (t *cache) addNoLock(rec *tgsp.Talkgroup) {
tg := tgsp.TG(rec.System.ID, rec.Talkgroup.TGID)
t.tgs[tg] = rec
t.systems[int32(rec.System.ID)] = rec.System.Name
t.systems[rec.System.ID] = rec.System.Name
}
func (t *cache) addSysNoLock(id int, name string) {
t.systems[id] = name
}
type rowType interface {
@ -346,20 +368,20 @@ func (t *cache) Weight(ctx context.Context, id tgsp.ID, tm time.Time) float64 {
return float64(m)
}
func (t *cache) SystemTGs(ctx context.Context, systemID int32, opts ...option) ([]*tgsp.Talkgroup, error) {
func (t *cache) SystemTGs(ctx context.Context, systemID int, opts ...option) ([]*tgsp.Talkgroup, error) {
db := database.FromCtx(ctx)
opt := sOpt(opts)
var err error
if opt.pagination != nil {
offset, perPage := opt.pagination.OffsetPerPage(opt.perPageDefault)
recs, err := db.GetTalkgroupsWithLearnedBySystemP(ctx, systemID, offset, perPage)
recs, err := db.GetTalkgroupsWithLearnedBySystemP(ctx, int32(systemID), offset, perPage)
if err != nil {
return nil, err
}
return addToRowList(t, recs), nil
}
recs, err := db.GetTalkgroupsWithLearnedBySystem(ctx, systemID)
recs, err := db.GetTalkgroupsWithLearnedBySystem(ctx, int32(systemID))
if err != nil {
return nil, err
}
@ -391,7 +413,7 @@ func (t *cache) TG(ctx context.Context, tg tgsp.ID) (*tgsp.Talkgroup, error) {
func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool) {
t.RLock()
n, has := t.systems[int32(id)]
n, has := t.systems[id]
t.RUnlock()
if !has {
@ -401,7 +423,7 @@ func (t *cache) SystemName(ctx context.Context, id int) (name string, has bool)
}
t.Lock()
t.systems[int32(id)] = sys
t.systems[id] = sys
t.Unlock()
return sys, true
@ -415,8 +437,30 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
if !has {
return nil, ErrNoSuchSystem
}
db := database.FromCtx(ctx)
var tg database.Talkgroup
err := db.InTx(ctx, func(db database.Store) error {
var oerr error
tg, oerr = db.UpdateTalkgroup(ctx, input)
if oerr != nil {
return oerr
}
versionBatch := db.StoreTGVersion(ctx, []database.StoreTGVersionParams{{
Submitter: auth.UIDFrom(ctx),
TGID: *input.TGID,
}})
defer versionBatch.Close()
versionBatch.Exec(func(_ int, err error) {
if err != nil {
oerr = err
return
}
})
return oerr
}, pgx.TxOptions{})
tg, err := database.FromCtx(ctx).UpdateTalkgroup(ctx, input)
if err != nil {
return nil, err
}
@ -430,6 +474,49 @@ func (t *cache) UpdateTG(ctx context.Context, input database.UpdateTalkgroupPara
return record, nil
}
func (t *cache) DeleteSystem(ctx context.Context, id int) error {
t.Lock()
defer t.Unlock()
t.invalidate()
err := database.FromCtx(ctx).DeleteSystem(ctx, id)
switch {
case err == nil:
return nil
case database.IsSystemConstraintViolation(err):
return ErrReference
}
return err
}
func (t *cache) DeleteTG(ctx context.Context, id tgsp.ID) error {
t.Lock()
defer t.Unlock()
err := database.FromCtx(ctx).InTx(ctx, func(db database.Store) error {
err := db.StoreDeletedTGVersion(ctx, common.PtrTo(int32(id.System)), common.PtrTo(int32(id.Talkgroup)), auth.UIDFrom(ctx))
if err != nil {
return err
}
return db.DeleteTalkgroup(ctx, int32(id.System), int32(id.Talkgroup))
}, pgx.TxOptions{})
switch {
case err == nil:
case database.IsTGConstraintViolation(err):
return ErrReference
default:
return err
}
delete(t.tgs, id)
return nil
}
func (t *cache) LearnTG(ctx context.Context, c *calls.Call) (*tgsp.Talkgroup, error) {
db := database.FromCtx(ctx)
@ -538,3 +625,12 @@ func (t *cache) UpsertTGs(ctx context.Context, system int, input []database.Upse
return tgs, nil
}
func (t *cache) CreateSystem(ctx context.Context, id int, name string) error {
t.Lock()
defer t.Unlock()
t.addSysNoLock(id, name)
return database.FromCtx(ctx).CreateSystem(ctx, id, name)
}

View file

@ -29,7 +29,7 @@ func (ej *ExportJob) Export(ctx context.Context, w io.Writer) error {
var err error
tgst := tgstore.FromCtx(ctx)
if ej.TalkgroupFilter.IsEmpty() {
tgs, err = tgst.SystemTGs(ctx, int32(ej.SystemID))
tgs, err = tgst.SystemTGs(ctx, ej.SystemID)
if err != nil {
return err
}

View file

@ -23,6 +23,7 @@ CREATE TABLE IF NOT EXISTS systems(
name TEXT NOT NULL
);
-- NB: if the column defaults are updated here, they must also be updated in the UpsertTalkgroup query
CREATE TABLE IF NOT EXISTS talkgroups(
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
system_id INT4 REFERENCES systems(id) NOT NULL,
@ -50,8 +51,9 @@ CREATE TABLE IF NOT EXISTS talkgroup_versions(
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
time TIMESTAMPTZ NOT NULL,
created_by INTEGER REFERENCES users(id),
deleted BOOLEAN,
-- talkgroup snapshot
system_id INT4 REFERENCES systems(id),
system_id INT4,
tgid INT4,
name TEXT,
alpha_tag TEXT,

View file

@ -103,11 +103,11 @@ INSERT INTO talkgroups AS tg (
sqlc.narg('tg_group'),
sqlc.narg('frequency'),
sqlc.narg('metadata'),
sqlc.narg('tags'),
sqlc.narg('alert'),
COALESCE(sqlc.narg('tags'), '{}'::text[])::text[],
COALESCE(sqlc.narg('alert'), TRUE),
sqlc.narg('alert_config'),
sqlc.narg('weight'),
sqlc.narg('learned')
COALESCE(sqlc.narg('weight'), 1.0)::numeric,
COALESCE(sqlc.narg('learned'), FALSE)::boolean
)
ON CONFLICT (system_id, tgid) DO UPDATE
SET
@ -212,3 +212,21 @@ FROM talkgroup_versions tgv ON CONFLICT (system_id, tgid) DO UPDATE SET
ignored = excluded.ignored
WHERE tgv.id = ANY(@version_ids)
RETURNING *;
-- name: DeleteTalkgroup :exec
DELETE FROM talkgroups WHERE system_id = @system_id AND tgid = @tg_id;
-- name: StoreDeletedTGVersion :exec
INSERT INTO talkgroup_versions(
system_id,
tgid,
time,
created_by,
deleted
) VALUES(@system_id, @tg_id, NOW(), @submitter, TRUE);
-- name: CreateSystem :exec
INSERT INTO systems(id, name) VALUES(@id, @name);
-- name: DeleteSystem :exec
DELETE FROM systems WHERE id = @id;