Call store and list calls endpoint (#78)

Reviewed-on: #78
Co-authored-by: Daniel Ponte <amigan@gmail.com>
Co-committed-by: Daniel Ponte <amigan@gmail.com>
This commit is contained in:
Daniel Ponte 2024-12-19 16:14:41 -05:00 committed by amigan
parent b4cf5550d7
commit 77a08679d4
12 changed files with 486 additions and 28 deletions

View file

@ -13,3 +13,32 @@ func (p Pagination) OffsetPerPage(perPageDefault int) (offset int32, perPage int
return
}
type SortDirection string
const (
DirAsc SortDirection = "asc"
DirDesc SortDirection = "desc"
)
func (t *SortDirection) DirString(def SortDirection) string {
if t == nil {
return string(def)
}
return string(*t)
}
func (t *SortDirection) IsValid() bool {
if t == nil {
return true
}
switch *t {
case DirAsc, DirDesc:
return true
}
return false
}

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/araddon/dateparse"
"github.com/jackc/pgx/v5/pgtype"
"gopkg.in/yaml.v3"
)
@ -27,6 +28,17 @@ func (t *Time) UnmarshalYAML(n *yaml.Node) error {
return nil
}
func (t *Time) PGTypeTSTZ() pgtype.Timestamptz {
if t == nil {
return pgtype.Timestamptz{Valid: false}
}
return pgtype.Timestamptz{
Time: time.Time(*t),
Valid: true,
}
}
func (t *Time) UnmarshalJSON(b []byte) error {
s := strings.Trim(string(b), `"`)
tm, err := dateparse.ParseAny(s)

View file

@ -5,6 +5,7 @@ import (
"time"
"dynatron.me/x/stillbox/internal/audio"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/pb"
"dynatron.me/x/stillbox/pkg/talkgroups"
@ -32,6 +33,14 @@ func (d CallDuration) Seconds() int32 {
return int32(time.Duration(d).Seconds())
}
// CallAudio is a skinny Call used for audio API calls.
type CallAudio struct {
CallDate jsontypes.Time `json:"callDate"`
AudioName *string `json:"audioName"`
AudioType *string `json:"audioType"`
AudioBlob []byte `json:"audioBlob"`
}
type Call struct {
ID uuid.UUID `form:"-"`
Audio []byte `form:"audio" filenameField:"AudioName"`

View file

@ -0,0 +1,109 @@
package callstore
import (
"context"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontypes"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/database"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
type Store interface {
// CallAudio returns a CallAudio struct
CallAudio(ctx context.Context, id uuid.UUID) (*calls.CallAudio, error)
// Calls gets paginated Calls.
Calls(ctx context.Context, p CallsParams) (calls []database.ListCallsPRow, totalCount int, err error)
}
type store struct {
}
func New() *store {
return new(store)
}
type storeCtxKey string
const StoreCtxKey storeCtxKey = "store"
func CtxWithStore(ctx context.Context, s Store) context.Context {
return context.WithValue(ctx, StoreCtxKey, s)
}
func FromCtx(ctx context.Context) Store {
s, ok := ctx.Value(StoreCtxKey).(Store)
if !ok {
return New()
}
return s
}
func (s *store) CallAudio(ctx context.Context, id uuid.UUID) (*calls.CallAudio, error) {
db := database.FromCtx(ctx)
dbCall, err := db.GetCallAudioByID(ctx, id)
if err != nil {
return nil, err
}
return &calls.CallAudio{
CallDate: jsontypes.Time(dbCall.CallDate.Time),
AudioName: dbCall.AudioName,
AudioType: dbCall.AudioType,
AudioBlob: dbCall.AudioBlob,
}, nil
}
type CallsParams struct {
common.Pagination
Direction *common.SortDirection `json:"dir"`
Start *jsontypes.Time `json:"start"`
End *jsontypes.Time `json:"end"`
TagsAny []string `json:"tagsAny"`
TagsNot []string `json:"tagsNot"`
}
func (s *store) Calls(ctx context.Context, p CallsParams) (rows []database.ListCallsPRow, totalCount int, err error) {
db := database.FromCtx(ctx)
offset, perPage := p.Pagination.OffsetPerPage(100)
par := database.ListCallsPParams{
Start: p.Start.PGTypeTSTZ(),
End: p.End.PGTypeTSTZ(),
TagsAny: p.TagsAny,
TagsNot: p.TagsNot,
Offset: offset,
PerPage: perPage,
Direction: p.Direction.DirString(common.DirAsc),
}
var count int64
txErr := db.InTx(ctx, func(db database.Store) error {
var err error
count, err = db.ListCallsCount(ctx, database.ListCallsCountParams{
Start: par.Start,
End: par.End,
TagsAny: par.TagsAny,
TagsNot: par.TagsNot,
})
if err != nil {
return err
}
rows, err = db.ListCallsP(ctx, par)
return err
}, pgx.TxOptions{})
if txErr != nil {
return nil, 0, txErr
}
return rows, int(count), err
}

View file

@ -147,6 +147,7 @@ WITH to_sweep AS (
WHERE call_id IN (SELECT id FROM to_sweep)
`
// This is used to sweep calls that are part of an incident prior to pruning a partition.
func (q *Queries) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
result, err := q.db.Exec(ctx, cleanupSweptCalls, rangeStart, rangeEnd)
if err != nil {
@ -189,6 +190,125 @@ func (q *Queries) GetDatabaseSize(ctx context.Context) (string, error) {
return pg_size_pretty, err
}
const listCallsCount = `-- name: ListCallsCount :one
SELECT
COUNT(*)
FROM calls c
JOIN talkgroups tgs ON c.talkgroup = tgs.tgid AND c.system = tgs.system_id
WHERE
CASE WHEN $1::TIMESTAMPTZ IS NOT NULL THEN
c.call_date >= $1 ELSE TRUE END AND
CASE WHEN $2::TIMESTAMPTZ IS NOT NULL THEN
c.call_date <= $2 ELSE TRUE END AND
CASE WHEN $3::TEXT[] IS NOT NULL THEN
tgs.tags @> ARRAY[$3] ELSE TRUE END AND
CASE WHEN $4::TEXT[] IS NOT NULL THEN
(NOT (tgs.tags @> ARRAY[$4])) ELSE TRUE END
`
type ListCallsCountParams struct {
Start pgtype.Timestamptz `json:"start"`
End pgtype.Timestamptz `json:"end"`
TagsAny []string `json:"tags_any"`
TagsNot []string `json:"tags_not"`
}
func (q *Queries) ListCallsCount(ctx context.Context, arg ListCallsCountParams) (int64, error) {
row := q.db.QueryRow(ctx, listCallsCount,
arg.Start,
arg.End,
arg.TagsAny,
arg.TagsNot,
)
var count int64
err := row.Scan(&count)
return count, err
}
const listCallsP = `-- name: ListCallsP :many
SELECT
c.id,
c.call_date,
c.duration,
tgs.system_id,
tgs.tgid,
sys.name system_name,
tgs.name tg_name
FROM calls c
JOIN talkgroups tgs ON c.talkgroup = tgs.tgid AND c.system = tgs.system_id
JOIN systems sys ON sys.id = tgs.system_id
WHERE
CASE WHEN $1::TIMESTAMPTZ IS NOT NULL THEN
c.call_date >= $1 ELSE TRUE END AND
CASE WHEN $2::TIMESTAMPTZ IS NOT NULL THEN
c.call_date <= $2 ELSE TRUE END AND
CASE WHEN $3::TEXT[] IS NOT NULL THEN
tgs.tags @> ARRAY[$3] ELSE TRUE END AND
CASE WHEN $4::TEXT[] IS NOT NULL THEN
(NOT (tgs.tags @> ARRAY[$4])) ELSE TRUE END
ORDER BY
CASE WHEN $5::TEXT = 'asc' THEN c.call_date END ASC,
CASE WHEN $5 = 'desc' THEN c.call_date END DESC
OFFSET $6 ROWS
FETCH NEXT $7 ROWS ONLY
`
type ListCallsPParams struct {
Start pgtype.Timestamptz `json:"start"`
End pgtype.Timestamptz `json:"end"`
TagsAny []string `json:"tags_any"`
TagsNot []string `json:"tags_not"`
Direction string `json:"direction"`
Offset int32 `json:"offset"`
PerPage int32 `json:"per_page"`
}
type ListCallsPRow struct {
ID uuid.UUID `json:"id"`
CallDate pgtype.Timestamptz `json:"call_date"`
Duration *int32 `json:"duration"`
SystemID int32 `json:"system_id"`
TGID int32 `json:"tgid"`
SystemName string `json:"system_name"`
TGName *string `json:"tg_name"`
}
func (q *Queries) ListCallsP(ctx context.Context, arg ListCallsPParams) ([]ListCallsPRow, error) {
rows, err := q.db.Query(ctx, listCallsP,
arg.Start,
arg.End,
arg.TagsAny,
arg.TagsNot,
arg.Direction,
arg.Offset,
arg.PerPage,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ListCallsPRow
for rows.Next() {
var i ListCallsPRow
if err := rows.Scan(
&i.ID,
&i.CallDate,
&i.Duration,
&i.SystemID,
&i.TGID,
&i.SystemName,
&i.TGName,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const setCallTranscript = `-- name: SetCallTranscript :exec
UPDATE calls SET transcript = $2 WHERE id = $1
`

View file

@ -2384,6 +2384,122 @@ func (_c *Store_InTx_Call) RunAndReturn(run func(context.Context, func(database.
return _c
}
// ListCallsCount provides a mock function with given fields: ctx, arg
func (_m *Store) ListCallsCount(ctx context.Context, arg database.ListCallsCountParams) (int64, error) {
ret := _m.Called(ctx, arg)
if len(ret) == 0 {
panic("no return value specified for ListCallsCount")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, database.ListCallsCountParams) (int64, error)); ok {
return rf(ctx, arg)
}
if rf, ok := ret.Get(0).(func(context.Context, database.ListCallsCountParams) int64); ok {
r0 = rf(ctx, arg)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, database.ListCallsCountParams) error); ok {
r1 = rf(ctx, arg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Store_ListCallsCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListCallsCount'
type Store_ListCallsCount_Call struct {
*mock.Call
}
// ListCallsCount is a helper method to define mock.On call
// - ctx context.Context
// - arg database.ListCallsCountParams
func (_e *Store_Expecter) ListCallsCount(ctx interface{}, arg interface{}) *Store_ListCallsCount_Call {
return &Store_ListCallsCount_Call{Call: _e.mock.On("ListCallsCount", ctx, arg)}
}
func (_c *Store_ListCallsCount_Call) Run(run func(ctx context.Context, arg database.ListCallsCountParams)) *Store_ListCallsCount_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(database.ListCallsCountParams))
})
return _c
}
func (_c *Store_ListCallsCount_Call) Return(_a0 int64, _a1 error) *Store_ListCallsCount_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Store_ListCallsCount_Call) RunAndReturn(run func(context.Context, database.ListCallsCountParams) (int64, error)) *Store_ListCallsCount_Call {
_c.Call.Return(run)
return _c
}
// ListCallsP provides a mock function with given fields: ctx, arg
func (_m *Store) ListCallsP(ctx context.Context, arg database.ListCallsPParams) ([]database.ListCallsPRow, error) {
ret := _m.Called(ctx, arg)
if len(ret) == 0 {
panic("no return value specified for ListCallsP")
}
var r0 []database.ListCallsPRow
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, database.ListCallsPParams) ([]database.ListCallsPRow, error)); ok {
return rf(ctx, arg)
}
if rf, ok := ret.Get(0).(func(context.Context, database.ListCallsPParams) []database.ListCallsPRow); ok {
r0 = rf(ctx, arg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]database.ListCallsPRow)
}
}
if rf, ok := ret.Get(1).(func(context.Context, database.ListCallsPParams) error); ok {
r1 = rf(ctx, arg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Store_ListCallsP_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListCallsP'
type Store_ListCallsP_Call struct {
*mock.Call
}
// ListCallsP is a helper method to define mock.On call
// - ctx context.Context
// - arg database.ListCallsPParams
func (_e *Store_Expecter) ListCallsP(ctx interface{}, arg interface{}) *Store_ListCallsP_Call {
return &Store_ListCallsP_Call{Call: _e.mock.On("ListCallsP", ctx, arg)}
}
func (_c *Store_ListCallsP_Call) Run(run func(ctx context.Context, arg database.ListCallsPParams)) *Store_ListCallsP_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(database.ListCallsPParams))
})
return _c
}
func (_c *Store_ListCallsP_Call) Return(_a0 []database.ListCallsPRow, _a1 error) *Store_ListCallsP_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Store_ListCallsP_Call) RunAndReturn(run func(context.Context, database.ListCallsPParams) ([]database.ListCallsPRow, error)) *Store_ListCallsP_Call {
_c.Call.Return(run)
return _c
}
// RestoreTalkgroupVersion provides a mock function with given fields: ctx, versionIds
func (_m *Store) RestoreTalkgroupVersion(ctx context.Context, versionIds int) (database.Talkgroup, error) {
ret := _m.Called(ctx, versionIds)

View file

@ -257,6 +257,7 @@ func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p Part
end := pgtype.Timestamptz{Time: e, Valid: true}
fullPartName := pm.fullTableName(p.PartitionName())
// sweep calls that are referenced by an incident into swept_calls
swept, err := tx.SweepCalls(ctx, start, end)
if err != nil {
return err

View file

@ -15,6 +15,7 @@ type Querier interface {
AddAlert(ctx context.Context, arg AddAlertParams) error
AddCall(ctx context.Context, arg AddCallParams) error
AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error)
// This is used to sweep calls that are part of an incident prior to pruning a partition.
CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error)
CreateSystem(ctx context.Context, iD int, name string) error
@ -45,6 +46,8 @@ type Querier interface {
GetUserByUID(ctx context.Context, id int) (User, error)
GetUserByUsername(ctx context.Context, username string) (User, error)
GetUsers(ctx context.Context) ([]User, error)
ListCallsCount(ctx context.Context, arg ListCallsCountParams) (int64, error)
ListCallsP(ctx context.Context, arg ListCallsPParams) ([]ListCallsPRow, error)
RestoreTalkgroupVersion(ctx context.Context, versionIds int) (Talkgroup, error)
SetAppPrefs(ctx context.Context, appName string, prefs []byte, uid int) error
SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error

View file

@ -8,6 +8,8 @@ import (
"path/filepath"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/forms"
"dynatron.me/x/stillbox/pkg/calls/callstore"
"dynatron.me/x/stillbox/pkg/database"
"github.com/go-chi/chi/v5"
@ -28,13 +30,15 @@ type callsAPI struct {
func (ca *callsAPI) Subrouter() http.Handler {
r := chi.NewMux()
r.Get(`/{call:[a-f0-9-]+}`, ca.get)
r.Get(`/{call:[a-f0-9-]+}/{download:download}`, ca.get)
r.Get(`/{call:[a-f0-9-]+}`, ca.getAudio)
r.Get(`/{call:[a-f0-9-]+}/{download:download}`, ca.getAudio)
r.Post(`/list`, ca.listCalls)
return r
}
func (ca *callsAPI) get(w http.ResponseWriter, r *http.Request) {
func (ca *callsAPI) getAudio(w http.ResponseWriter, r *http.Request) {
p := struct {
CallID *uuid.UUID `param:"call"`
Download *string `param:"download"`
@ -52,9 +56,9 @@ func (ca *callsAPI) get(w http.ResponseWriter, r *http.Request) {
}
ctx := r.Context()
db := database.FromCtx(ctx)
calls := callstore.FromCtx(ctx)
call, err := db.GetCallAudioByID(ctx, *p.CallID)
call, err := calls.CallAudio(ctx, *p.CallID)
if err != nil {
wErr(w, r, autoError(err))
return
@ -77,7 +81,7 @@ func (ca *callsAPI) get(w http.ResponseWriter, r *http.Request) {
}
if call.AudioName == nil {
call.AudioName = common.PtrTo(call.CallDate.Time.Format(fileNameDateFmt))
call.AudioName = common.PtrTo(call.CallDate.Time().Format(fileNameDateFmt))
}
disposition := "inline"
@ -91,3 +95,31 @@ func (ca *callsAPI) get(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write(call.AudioBlob)
}
func (ca *callsAPI) listCalls(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
cSt := callstore.FromCtx(ctx)
var par callstore.CallsParams
err := forms.Unmarshal(r, &par, forms.WithTag("json"), forms.WithAcceptBlank(), forms.WithOmitEmpty())
if err != nil {
wErr(w, r, badRequest(err))
return
}
calls, count, err := cSt.Calls(ctx, par)
if err != nil {
wErr(w, r, autoError(err))
return
}
res := struct {
Calls []database.ListCallsPRow `json:"calls"`
Count int `json:"count"`
}{
Calls: calls,
Count: count,
}
respond(w, r, res)
}

View file

@ -106,7 +106,7 @@ func WithPagination(p *Pagination, defPerPage int, totalDest *int) Option {
func (p *Pagination) SortDir() (string, error) {
order := TGOrderTGID
dir := TGDirAsc
dir := common.DirAsc
if p != nil {
if p.OrderBy != nil {
@ -136,7 +136,6 @@ func WithFilter(f *string) Option {
}
type TGOrder string
type TGDirection string
const (
TGOrderID TGOrder = "id"
@ -144,25 +143,8 @@ const (
TGOrderGroup TGOrder = "group"
TGOrderName TGOrder = "name"
TGOrderAlpha TGOrder = "alpha"
TGDirAsc TGDirection = "asc"
TGDirDesc TGDirection = "desc"
)
func (t *TGDirection) IsValid() bool {
if t == nil {
return true
}
switch *t {
case TGDirAsc, TGDirDesc:
return true
}
return false
}
func (t *TGOrder) IsValid() bool {
if t == nil {
return true
@ -179,8 +161,8 @@ func (t *TGOrder) IsValid() bool {
type Pagination struct {
common.Pagination
OrderBy *TGOrder `json:"orderBy"`
Direction *TGDirection `json:"dir"`
OrderBy *TGOrder `json:"orderBy"`
Direction *common.SortDirection `json:"dir"`
}
type storeCtxKey string

View file

@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS talkgroups(
weight REAL NOT NULL DEFAULT 1.0,
learned BOOLEAN NOT NULL DEFAULT FALSE,
ignored BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE (system_id, tgid)
UNIQUE (system_id, tgid, learned)
);
CREATE INDEX talkgroups_system_tgid_idx ON talkgroups (system_id, tgid);

View file

@ -70,6 +70,7 @@ WITH to_sweep AS (
) INSERT INTO swept_calls SELECT * FROM to_sweep;
-- name: CleanupSweptCalls :execrows
-- This is used to sweep calls that are part of an incident prior to pruning a partition.
WITH to_sweep AS (
SELECT id FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id
@ -79,3 +80,47 @@ WITH to_sweep AS (
swept_call_id = call_id,
calls_tbl_id = NULL
WHERE call_id IN (SELECT id FROM to_sweep);
-- name: ListCallsP :many
SELECT
c.id,
c.call_date,
c.duration,
tgs.system_id,
tgs.tgid,
sys.name system_name,
tgs.name tg_name
FROM calls c
JOIN talkgroups tgs ON c.talkgroup = tgs.tgid AND c.system = tgs.system_id
JOIN systems sys ON sys.id = tgs.system_id
WHERE
CASE WHEN sqlc.narg('start')::TIMESTAMPTZ IS NOT NULL THEN
c.call_date >= @start ELSE TRUE END AND
CASE WHEN sqlc.narg('end')::TIMESTAMPTZ IS NOT NULL THEN
c.call_date <= sqlc.narg('end') ELSE TRUE END AND
CASE WHEN sqlc.narg('tags_any')::TEXT[] IS NOT NULL THEN
tgs.tags @> ARRAY[@tags_any] ELSE TRUE END AND
CASE WHEN sqlc.narg('tags_not')::TEXT[] IS NOT NULL THEN
(NOT (tgs.tags @> ARRAY[@tags_not])) ELSE TRUE END
ORDER BY
CASE WHEN @direction::TEXT = 'asc' THEN c.call_date END ASC,
CASE WHEN @direction = 'desc' THEN c.call_date END DESC
OFFSET sqlc.arg('offset') ROWS
FETCH NEXT sqlc.arg('per_page') ROWS ONLY
;
-- name: ListCallsCount :one
SELECT
COUNT(*)
FROM calls c
JOIN talkgroups tgs ON c.talkgroup = tgs.tgid AND c.system = tgs.system_id
WHERE
CASE WHEN sqlc.narg('start')::TIMESTAMPTZ IS NOT NULL THEN
c.call_date >= @start ELSE TRUE END AND
CASE WHEN sqlc.narg('end')::TIMESTAMPTZ IS NOT NULL THEN
c.call_date <= sqlc.narg('end') ELSE TRUE END AND
CASE WHEN sqlc.narg('tags_any')::TEXT[] IS NOT NULL THEN
tgs.tags @> ARRAY[@tags_any] ELSE TRUE END AND
CASE WHEN sqlc.narg('tags_not')::TEXT[] IS NOT NULL THEN
(NOT (tgs.tags @> ARRAY[@tags_not])) ELSE TRUE END
;