partition #60

Merged
amigan merged 15 commits from partition into trunk 2024-12-01 03:01:11 -05:00
27 changed files with 2450 additions and 29 deletions

View file

@ -8,3 +8,4 @@ packages:
config: config:
interfaces: interfaces:
Store: Store:
DBTX:

View file

@ -1,5 +1,18 @@
db: db:
connect: 'postgres://postgres:password@localhost:5432/example' connect: 'postgres://postgres:password@localhost:5432/example'
partition:
# whether to enable the built-in partition manager
enabled: true
# the postgres schema containing our tables
schema: public
# daily|weekly|monthly|quarterly|yearly
interval: monthly
# number of partitions to retain, -1 to keep all
retain: 3
# whether to drop or simply detach
drop: true
# number of partitions to prepare ahead
preProvision: 3
cors: cors:
allowedOrigins: allowedOrigins:
- 'http://localhost:*' - 'http://localhost:*'

22
internal/isoweek/LICENSE Normal file
View file

@ -0,0 +1,22 @@
License for github.com/snabb/isoweek:
Copyright © 2016-2023 Janne Snabb snabb AT epipe.com
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -0,0 +1,93 @@
// Package isoweek calculates a starting date and time of [ISO 8601] week.
//
// ISO 8601 standard defines the common [week number] system used in Europe
// and many other countries. Monday is the first day of a week.
//
// The Go standard library [time] package has [time.Time.ISOWeek] function
// for getting ISO 8601 week number of a given [time.Time], but there is no
// reverse functionality for getting a date from a week number. This package
// implements that.
//
// Invalid input is silently accepted. There is a separate [Validate]
// function if week number validation is needed.
//
// There are also functions for working with [Julian day numbers]. Using Julian
// day numbers is often the easiest and fastest way to do date calculations.
//
// This package does not work with the "traditional" week system used in
// US/Canada/Japan/etc. (weeks starting on Sundays). However the Julian day
// number functions may be still useful.
//
// [ISO 8601]: https://en.wikipedia.org/wiki/ISO_8601
// [week number]: https://en.wikipedia.org/wiki/ISO_week_date
// [Julian day numbers]: https://en.wikipedia.org/wiki/Julian_day
package isoweek
import "time"
// ISOWeekday returns the ISO 8601 weekday number of given day.
// (1 = Mon, 2 = Tue,.. 7 = Sun)
//
// This is different from Go's standard [time.Weekday].
func ISOWeekday(year int, month time.Month, day int) (weekday int) {
// Richards, E. G. (2013) pp. 592, 618
return DateToJulian(year, month, day)%7 + 1
}
// startOffset returns the offset (in days) from the start of a year to
// Monday of the given week. Offset may be negative.
func startOffset(y, week int) (offset int) {
// This is optimized version of the following:
//
// return week*7 - ISOWeekday(y, 1, 4) - 3
//
// Uses Tomohiko Sakamoto's algorithm for calculating the weekday.
y = y - 1
return week*7 - (y+y/4-y/100+y/400+3)%7 - 4
}
// StartTime returns the starting time (Monday 00:00) of the given
// ISO 8601 week.
func StartTime(wyear, week int, loc *time.Location) (start time.Time) {
y, m, d := StartDate(wyear, week)
return time.Date(y, m, d, 0, 0, 0, 0, loc)
}
// StartDate returns the starting date (Monday) of the given ISO 8601 week.
func StartDate(wyear, week int) (year int, month time.Month, day int) {
return JulianToDate(
DateToJulian(wyear, 1, 1) + startOffset(wyear, week))
}
// ordinalInYear returns the ordinal (within a year) day number.
func ordinalInYear(year int, month time.Month, day int) (dayNo int) {
return DateToJulian(year, month, day) - DateToJulian(year, 1, 1) + 1
}
// FromDate returns ISO 8601 week number of a date.
func FromDate(year int, month time.Month, day int) (wyear, week int) {
week = (ordinalInYear(year, month, day) - ISOWeekday(year, month, day) + 10) / 7
if week < 1 {
return FromDate(year-1, 12, 31) // last week of preceding year
}
if week == 53 &&
DateToJulian(StartDate(year+1, 1)) <= DateToJulian(year, month, day) {
return year + 1, 1 // first week of following year
}
return year, week
}
// Validate checks if a week number is valid. Returns true if it is valid.
func Validate(wyear, week int) (ok bool) {
if week < 1 || week > 53 {
return false
}
wyear2, week2 := FromDate(StartDate(wyear, week))
if wyear == wyear2 && week == week2 {
return true
}
return false
}

View file

@ -0,0 +1,96 @@
package isoweek_test
import (
"fmt"
"testing"
"time"
"dynatron.me/x/stillbox/internal/isoweek"
)
// TestISOWeekday tests all days from year 1 until year 4000.
// Ensures that behaviour matches the Go standard library Weekday.
func TestISOWeekday(test *testing.T) {
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
var wd1, wd2 int
for t.Year() < 4000 {
wd1 = int(t.Weekday())
wd2 = isoweek.ISOWeekday(t.Date())
if wd2 == 7 {
wd2 = 0
}
if wd1 != wd2 {
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
}
t = t.AddDate(0, 0, 1)
}
}
func ExampleISOWeekday() {
fmt.Println(isoweek.ISOWeekday(1984, 1, 1))
// Output: 7
}
// TestStartTime tests all weeks from year 1 until year 4000.
// Ensures that behaviour matches the Go standard library ISOWeek.
func TestStartTime(test *testing.T) {
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
for t.Weekday() != time.Monday {
t = t.AddDate(0, 0, 1)
}
for t.Year() < 4000 {
wy, ww := t.ISOWeek()
wst := isoweek.StartTime(wy, ww, time.UTC)
if !wst.Equal(t) {
test.Errorf("mismatch: %v != %v (wy = %d, ww = %d)",
t, wst, wy, ww)
}
t = t.AddDate(0, 0, 7)
}
}
func ExampleStartTime() {
t := isoweek.StartTime(1985, 1, time.UTC)
fmt.Println(t)
// Output: 1984-12-31 00:00:00 +0000 UTC
}
func ExampleStartDate() {
y, m, d := isoweek.StartDate(2000, 1)
fmt.Println(d, m, y)
// Output: 3 January 2000
}
// TestFromDate tests all days from year 1 until year 4000.
// Ensures that behaviour matches the Go standard library ISOWeek.
func TestFromDate(test *testing.T) {
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
for t.Year() < 4000 {
wy, ww := t.ISOWeek()
wy2, ww2 := isoweek.FromDate(t.Date())
if wy != wy2 || ww != ww2 {
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
}
t = t.AddDate(0, 0, 1)
}
}
func ExampleFromDate() {
fmt.Println(isoweek.FromDate(1984, 1, 1))
// Output: 1983 52
}
func ExampleValidate() {
fmt.Println(
isoweek.Validate(2015, 52), isoweek.Validate(2015, 53),
isoweek.Validate(2015, 54), isoweek.Validate(2016, 0),
isoweek.Validate(2016, 1))
fmt.Println(
isoweek.Validate(2016, 52), isoweek.Validate(2016, 53),
isoweek.Validate(2016, 54), isoweek.Validate(2017, 0),
isoweek.Validate(2017, 1))
// Output:
// true true false false true
// true false false false true
}

View file

@ -0,0 +1,32 @@
package isoweek
import "time"
// DateToJulian converts a date to a Julian day number.
func DateToJulian(year int, month time.Month, day int) (jdn int) {
// Claus Tøndering's Calendar FAQ
// http://www.tondering.dk/claus/cal/julperiod.php#formula
if month < 3 {
year = year - 1
month = month + 12
}
year = year + 4800
return day + (153*(int(month)-3)+2)/5 + 365*year +
year/4 - year/100 + year/400 - 32045
}
// JulianToDate converts a Julian day number to a date.
func JulianToDate(jdn int) (year int, month time.Month, day int) {
// Richards, E. G. (2013) pp. 585624
e := 4*(jdn+1401+(4*jdn+274277)/146097*3/4-38) + 3
h := e%1461/4*5 + 2
day = h%153/5 + 1
month = time.Month((h/153+2)%12 + 1)
year = e/1461 - 4716 + (14-int(month))/12
return year, month, day
}

View file

@ -0,0 +1,52 @@
package isoweek_test
import (
"fmt"
"testing"
"time"
"dynatron.me/x/stillbox/internal/isoweek"
)
func TestJulianToDate(test *testing.T) {
j := isoweek.DateToJulian(1, time.January, 1)
for {
y, m, d := isoweek.JulianToDate(j)
if y >= 4000 {
break
}
if j != isoweek.DateToJulian(y, m, d) {
test.Errorf("mismatch on %04d-%02d-%02d", y, m, d)
}
j++
}
}
func TestDateToJulian(test *testing.T) {
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
for t.Year() < 4000 {
j := isoweek.DateToJulian(t.Date())
y, m, d := isoweek.JulianToDate(j)
if y != t.Year() || m != t.Month() || d != t.Day() {
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
}
if j+1 != isoweek.DateToJulian(y, m, d+1) {
test.Errorf("mismatch 2 on %s", t.Format("2006-01-02"))
}
t = t.AddDate(0, 0, 1)
}
}
func ExampleDateToJulian() {
fmt.Println(isoweek.DateToJulian(2006, 1, 2))
// Output: 2453738
}
func ExampleJulianToDate() {
fmt.Println(isoweek.JulianToDate(2453738))
// Output: 2006 January 2
}

View file

@ -39,8 +39,18 @@ type CORS struct {
} }
type DB struct { type DB struct {
Connect string `yaml:"connect"` Connect string `yaml:"connect"`
LogQueries bool `yaml:"logQueries"` LogQueries bool `yaml:"logQueries"`
Partition Partition `yaml:"partition"`
}
type Partition struct {
Enabled bool `yaml:"enabled"`
Schema string `yaml:"schema"`
Interval string `yaml:"interval"`
Retain int `yaml:"retain"`
PreProvision *int `yaml:"preProvision"`
Drop bool `yaml:"detach"`
} }
type Logger struct { type Logger struct {

View file

@ -40,10 +40,13 @@ func (c *Configuration) read() error {
return err return err
} }
k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string { err = k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string {
return strings.Replace(strings.ToLower( return strings.Replace(strings.ToLower(
strings.TrimPrefix(s, common.EnvPrefix)), "_", ".", -1) strings.TrimPrefix(s, common.EnvPrefix)), "_", ".", -1)
}), nil) }), nil)
if err != nil {
return err
}
err = k.UnmarshalWithConf("", &c.Config, err = k.UnmarshalWithConf("", &c.Config,
koanf.UnmarshalConf{ koanf.UnmarshalConf{

View file

@ -135,6 +135,26 @@ func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) error {
return err return err
} }
const cleanupSweptCalls = `-- name: CleanupSweptCalls :execrows
WITH to_sweep AS (
SELECT id FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= $1 AND calls.call_date < $2
) UPDATE incidents_calls
SET
swept_call_id = call_id,
calls_tbl_id = NULL
WHERE call_id IN (SELECT id FROM to_sweep)
`
func (q *Queries) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
result, err := q.db.Exec(ctx, cleanupSweptCalls, rangeStart, rangeEnd)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const getDatabaseSize = `-- name: GetDatabaseSize :one const getDatabaseSize = `-- name: GetDatabaseSize :one
SELECT pg_size_pretty(pg_database_size(current_database())) SELECT pg_size_pretty(pg_database_size(current_database()))
` `
@ -154,3 +174,20 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip
_, err := q.db.Exec(ctx, setCallTranscript, iD, transcript) _, err := q.db.Exec(ctx, setCallTranscript, iD, transcript)
return err return err
} }
const sweepCalls = `-- name: SweepCalls :execrows
WITH to_sweep AS (
SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= $1 AND calls.call_date < $2
) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript FROM to_sweep
`
func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
result, err := q.db.Exec(ctx, sweepCalls, rangeStart, rangeEnd)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}

View file

@ -23,21 +23,27 @@ import (
type Store interface { type Store interface {
Querier Querier
talkgroupQuerier talkgroupQuerier
partitionsQuerier
DB() *Database DB() *Postgres
DBTX() DBTX
InTx(context.Context, func(Store) error, pgx.TxOptions) error InTx(context.Context, func(Store) error, pgx.TxOptions) error
} }
type Database struct { type Postgres struct {
*pgxpool.Pool *pgxpool.Pool
*Queries *Queries
} }
func (db *Database) DB() *Database { func (q *Queries) DBTX() DBTX {
return q.db
}
func (db *Postgres) DB() *Postgres {
return db return db
} }
func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error { func (db *Postgres) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error {
tx, err := db.DB().Pool.BeginTx(ctx, opts) tx, err := db.DB().Pool.BeginTx(ctx, opts)
if err != nil { if err != nil {
return fmt.Errorf("Tx begin: %w", err) return fmt.Errorf("Tx begin: %w", err)
@ -46,7 +52,7 @@ func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOp
//nolint:errcheck //nolint:errcheck
defer tx.Rollback(ctx) defer tx.Rollback(ctx)
dbtx := &Database{Pool: db.Pool, Queries: db.Queries.WithTx(tx)} dbtx := &Postgres{Pool: db.Pool, Queries: db.Queries.WithTx(tx)}
err = f(dbtx) err = f(dbtx)
if err != nil { if err != nil {
@ -68,11 +74,11 @@ func (m dbLogger) Log(ctx context.Context, level tracelog.LogLevel, msg string,
} }
func Close(c Store) { func Close(c Store) {
c.(*Database).Pool.Close() c.(*Postgres).Pool.Close()
} }
// NewClient creates a new DB using the provided config. // NewClient creates a new DB using the provided config.
func NewClient(ctx context.Context, conf config.DB) (Store, error) { func NewClient(ctx context.Context, conf config.DB) (*Postgres, error) {
dir, err := iofs.New(sqlembed.Migrations, "postgres/migrations") dir, err := iofs.New(sqlembed.Migrations, "postgres/migrations")
if err != nil { if err != nil {
return nil, err return nil, err
@ -88,6 +94,8 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) {
return nil, err return nil, err
} }
log.Debug().Err(err).Msg("migrations done")
m.Close() m.Close()
pgConf, err := pgxpool.ParseConfig(conf.Connect) pgConf, err := pgxpool.ParseConfig(conf.Connect)
@ -107,7 +115,7 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) {
return nil, err return nil, err
} }
db := &Database{ db := &Postgres{
Pool: pool, Pool: pool,
Queries: New(pool), Queries: New(pool),
} }

287
pkg/database/mocks/DBTX.go Normal file
View file

@ -0,0 +1,287 @@
// Code generated by mockery v2.47.0. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
pgconn "github.com/jackc/pgx/v5/pgconn"
pgx "github.com/jackc/pgx/v5"
)
// DBTX is an autogenerated mock type for the DBTX type
type DBTX struct {
mock.Mock
}
type DBTX_Expecter struct {
mock *mock.Mock
}
func (_m *DBTX) EXPECT() *DBTX_Expecter {
return &DBTX_Expecter{mock: &_m.Mock}
}
// Exec provides a mock function with given fields: _a0, _a1, _a2
func (_m *DBTX) Exec(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgconn.CommandTag, error) {
var _ca []interface{}
_ca = append(_ca, _a0, _a1)
_ca = append(_ca, _a2...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for Exec")
}
var r0 pgconn.CommandTag
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)); ok {
return rf(_a0, _a1, _a2...)
}
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgconn.CommandTag); ok {
r0 = rf(_a0, _a1, _a2...)
} else {
r0 = ret.Get(0).(pgconn.CommandTag)
}
if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
r1 = rf(_a0, _a1, _a2...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DBTX_Exec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exec'
type DBTX_Exec_Call struct {
*mock.Call
}
// Exec is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 string
// - _a2 ...interface{}
func (_e *DBTX_Expecter) Exec(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Exec_Call {
return &DBTX_Exec_Call{Call: _e.mock.On("Exec",
append([]interface{}{_a0, _a1}, _a2...)...)}
}
func (_c *DBTX_Exec_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Exec_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]interface{}, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(interface{})
}
}
run(args[0].(context.Context), args[1].(string), variadicArgs...)
})
return _c
}
func (_c *DBTX_Exec_Call) Return(_a0 pgconn.CommandTag, _a1 error) *DBTX_Exec_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *DBTX_Exec_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)) *DBTX_Exec_Call {
_c.Call.Return(run)
return _c
}
// Query provides a mock function with given fields: _a0, _a1, _a2
func (_m *DBTX) Query(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgx.Rows, error) {
var _ca []interface{}
_ca = append(_ca, _a0, _a1)
_ca = append(_ca, _a2...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for Query")
}
var r0 pgx.Rows
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgx.Rows, error)); ok {
return rf(_a0, _a1, _a2...)
}
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Rows); ok {
r0 = rf(_a0, _a1, _a2...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(pgx.Rows)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
r1 = rf(_a0, _a1, _a2...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DBTX_Query_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Query'
type DBTX_Query_Call struct {
*mock.Call
}
// Query is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 string
// - _a2 ...interface{}
func (_e *DBTX_Expecter) Query(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Query_Call {
return &DBTX_Query_Call{Call: _e.mock.On("Query",
append([]interface{}{_a0, _a1}, _a2...)...)}
}
func (_c *DBTX_Query_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Query_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]interface{}, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(interface{})
}
}
run(args[0].(context.Context), args[1].(string), variadicArgs...)
})
return _c
}
func (_c *DBTX_Query_Call) Return(_a0 pgx.Rows, _a1 error) *DBTX_Query_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *DBTX_Query_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgx.Rows, error)) *DBTX_Query_Call {
_c.Call.Return(run)
return _c
}
// QueryRow provides a mock function with given fields: _a0, _a1, _a2
func (_m *DBTX) QueryRow(_a0 context.Context, _a1 string, _a2 ...interface{}) pgx.Row {
var _ca []interface{}
_ca = append(_ca, _a0, _a1)
_ca = append(_ca, _a2...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for QueryRow")
}
var r0 pgx.Row
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Row); ok {
r0 = rf(_a0, _a1, _a2...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(pgx.Row)
}
}
return r0
}
// DBTX_QueryRow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRow'
type DBTX_QueryRow_Call struct {
*mock.Call
}
// QueryRow is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 string
// - _a2 ...interface{}
func (_e *DBTX_Expecter) QueryRow(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_QueryRow_Call {
return &DBTX_QueryRow_Call{Call: _e.mock.On("QueryRow",
append([]interface{}{_a0, _a1}, _a2...)...)}
}
func (_c *DBTX_QueryRow_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_QueryRow_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]interface{}, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(interface{})
}
}
run(args[0].(context.Context), args[1].(string), variadicArgs...)
})
return _c
}
func (_c *DBTX_QueryRow_Call) Return(_a0 pgx.Row) *DBTX_QueryRow_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *DBTX_QueryRow_Call) RunAndReturn(run func(context.Context, string, ...interface{}) pgx.Row) *DBTX_QueryRow_Call {
_c.Call.Return(run)
return _c
}
// SendBatch provides a mock function with given fields: _a0, _a1
func (_m *DBTX) SendBatch(_a0 context.Context, _a1 *pgx.Batch) pgx.BatchResults {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for SendBatch")
}
var r0 pgx.BatchResults
if rf, ok := ret.Get(0).(func(context.Context, *pgx.Batch) pgx.BatchResults); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(pgx.BatchResults)
}
}
return r0
}
// DBTX_SendBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendBatch'
type DBTX_SendBatch_Call struct {
*mock.Call
}
// SendBatch is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *pgx.Batch
func (_e *DBTX_Expecter) SendBatch(_a0 interface{}, _a1 interface{}) *DBTX_SendBatch_Call {
return &DBTX_SendBatch_Call{Call: _e.mock.On("SendBatch", _a0, _a1)}
}
func (_c *DBTX_SendBatch_Call) Run(run func(_a0 context.Context, _a1 *pgx.Batch)) *DBTX_SendBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*pgx.Batch))
})
return _c
}
func (_c *DBTX_SendBatch_Call) Return(_a0 pgx.BatchResults) *DBTX_SendBatch_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *DBTX_SendBatch_Call) RunAndReturn(run func(context.Context, *pgx.Batch) pgx.BatchResults) *DBTX_SendBatch_Call {
_c.Call.Return(run)
return _c
}
// NewDBTX creates a new instance of DBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewDBTX(t interface {
mock.TestingT
Cleanup(func())
}) *DBTX {
mock := &DBTX{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -12,6 +12,8 @@ import (
pgx "github.com/jackc/pgx/v5" pgx "github.com/jackc/pgx/v5"
time "time"
uuid "github.com/google/uuid" uuid "github.com/google/uuid"
) )
@ -227,6 +229,64 @@ func (_c *Store_BulkSetTalkgroupTags_Call) RunAndReturn(run func(context.Context
return _c return _c
} }
// CleanupSweptCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd
func (_m *Store) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
ret := _m.Called(ctx, rangeStart, rangeEnd)
if len(ret) == 0 {
panic("no return value specified for CleanupSweptCalls")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok {
return rf(ctx, rangeStart, rangeEnd)
}
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok {
r0 = rf(ctx, rangeStart, rangeEnd)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok {
r1 = rf(ctx, rangeStart, rangeEnd)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Store_CleanupSweptCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupSweptCalls'
type Store_CleanupSweptCalls_Call struct {
*mock.Call
}
// CleanupSweptCalls is a helper method to define mock.On call
// - ctx context.Context
// - rangeStart pgtype.Timestamptz
// - rangeEnd pgtype.Timestamptz
func (_e *Store_Expecter) CleanupSweptCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_CleanupSweptCalls_Call {
return &Store_CleanupSweptCalls_Call{Call: _e.mock.On("CleanupSweptCalls", ctx, rangeStart, rangeEnd)}
}
func (_c *Store_CleanupSweptCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_CleanupSweptCalls_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz))
})
return _c
}
func (_c *Store_CleanupSweptCalls_Call) Return(_a0 int64, _a1 error) *Store_CleanupSweptCalls_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Store_CleanupSweptCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_CleanupSweptCalls_Call {
_c.Call.Return(run)
return _c
}
// CreateAPIKey provides a mock function with given fields: ctx, owner, expires, disabled // CreateAPIKey provides a mock function with given fields: ctx, owner, expires, disabled
func (_m *Store) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (database.ApiKey, error) { func (_m *Store) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (database.ApiKey, error) {
ret := _m.Called(ctx, owner, expires, disabled) ret := _m.Called(ctx, owner, expires, disabled)
@ -286,6 +346,56 @@ func (_c *Store_CreateAPIKey_Call) RunAndReturn(run func(context.Context, int, p
return _c return _c
} }
// CreatePartition provides a mock function with given fields: ctx, parentTable, partitionName, start, end
func (_m *Store) CreatePartition(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time) error {
ret := _m.Called(ctx, parentTable, partitionName, start, end)
if len(ret) == 0 {
panic("no return value specified for CreatePartition")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, time.Time, time.Time) error); ok {
r0 = rf(ctx, parentTable, partitionName, start, end)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_CreatePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreatePartition'
type Store_CreatePartition_Call struct {
*mock.Call
}
// CreatePartition is a helper method to define mock.On call
// - ctx context.Context
// - parentTable string
// - partitionName string
// - start time.Time
// - end time.Time
func (_e *Store_Expecter) CreatePartition(ctx interface{}, parentTable interface{}, partitionName interface{}, start interface{}, end interface{}) *Store_CreatePartition_Call {
return &Store_CreatePartition_Call{Call: _e.mock.On("CreatePartition", ctx, parentTable, partitionName, start, end)}
}
func (_c *Store_CreatePartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time)) *Store_CreatePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(time.Time), args[4].(time.Time))
})
return _c
}
func (_c *Store_CreatePartition_Call) Return(_a0 error) *Store_CreatePartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, string, string, time.Time, time.Time) error) *Store_CreatePartition_Call {
_c.Call.Return(run)
return _c
}
// CreateUser provides a mock function with given fields: ctx, arg // CreateUser provides a mock function with given fields: ctx, arg
func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) { func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) {
ret := _m.Called(ctx, arg) ret := _m.Called(ctx, arg)
@ -344,19 +454,19 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, database
} }
// DB provides a mock function with given fields: // DB provides a mock function with given fields:
func (_m *Store) DB() *database.Database { func (_m *Store) DB() *database.Postgres {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for DB") panic("no return value specified for DB")
} }
var r0 *database.Database var r0 *database.Postgres
if rf, ok := ret.Get(0).(func() *database.Database); ok { if rf, ok := ret.Get(0).(func() *database.Postgres); ok {
r0 = rf() r0 = rf()
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(*database.Database) r0 = ret.Get(0).(*database.Postgres)
} }
} }
@ -380,12 +490,59 @@ func (_c *Store_DB_Call) Run(run func()) *Store_DB_Call {
return _c return _c
} }
func (_c *Store_DB_Call) Return(_a0 *database.Database) *Store_DB_Call { func (_c *Store_DB_Call) Return(_a0 *database.Postgres) *Store_DB_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *Store_DB_Call) RunAndReturn(run func() *database.Database) *Store_DB_Call { func (_c *Store_DB_Call) RunAndReturn(run func() *database.Postgres) *Store_DB_Call {
_c.Call.Return(run)
return _c
}
// DBTX provides a mock function with given fields:
func (_m *Store) DBTX() database.DBTX {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for DBTX")
}
var r0 database.DBTX
if rf, ok := ret.Get(0).(func() database.DBTX); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(database.DBTX)
}
}
return r0
}
// Store_DBTX_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DBTX'
type Store_DBTX_Call struct {
*mock.Call
}
// DBTX is a helper method to define mock.On call
func (_e *Store_Expecter) DBTX() *Store_DBTX_Call {
return &Store_DBTX_Call{Call: _e.mock.On("DBTX")}
}
func (_c *Store_DBTX_Call) Run(run func()) *Store_DBTX_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Store_DBTX_Call) Return(_a0 database.DBTX) *Store_DBTX_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_DBTX_Call) RunAndReturn(run func() database.DBTX) *Store_DBTX_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
@ -484,6 +641,101 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string)
return _c return _c
} }
// DetachPartition provides a mock function with given fields: ctx, parentTable, partitionName
func (_m *Store) DetachPartition(ctx context.Context, parentTable string, partitionName string) error {
ret := _m.Called(ctx, parentTable, partitionName)
if len(ret) == 0 {
panic("no return value specified for DetachPartition")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, parentTable, partitionName)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_DetachPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetachPartition'
type Store_DetachPartition_Call struct {
*mock.Call
}
// DetachPartition is a helper method to define mock.On call
// - ctx context.Context
// - parentTable string
// - partitionName string
func (_e *Store_Expecter) DetachPartition(ctx interface{}, parentTable interface{}, partitionName interface{}) *Store_DetachPartition_Call {
return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, parentTable, partitionName)}
}
func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string)) *Store_DetachPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string, string) error) *Store_DetachPartition_Call {
_c.Call.Return(run)
return _c
}
// DropPartition provides a mock function with given fields: ctx, partitionName
func (_m *Store) DropPartition(ctx context.Context, partitionName string) error {
ret := _m.Called(ctx, partitionName)
if len(ret) == 0 {
panic("no return value specified for DropPartition")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, partitionName)
} else {
r0 = ret.Error(0)
}
return r0
}
// Store_DropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartition'
type Store_DropPartition_Call struct {
*mock.Call
}
// DropPartition is a helper method to define mock.On call
// - ctx context.Context
// - partitionName string
func (_e *Store_Expecter) DropPartition(ctx interface{}, partitionName interface{}) *Store_DropPartition_Call {
return &Store_DropPartition_Call{Call: _e.mock.On("DropPartition", ctx, partitionName)}
}
func (_c *Store_DropPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DropPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *Store_DropPartition_Call) Return(_a0 error) *Store_DropPartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Store_DropPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DropPartition_Call {
_c.Call.Return(run)
return _c
}
// GetAPIKey provides a mock function with given fields: ctx, apiKey // GetAPIKey provides a mock function with given fields: ctx, apiKey
func (_m *Store) GetAPIKey(ctx context.Context, apiKey string) (database.ApiKey, error) { func (_m *Store) GetAPIKey(ctx context.Context, apiKey string) (database.ApiKey, error) {
ret := _m.Called(ctx, apiKey) ret := _m.Called(ctx, apiKey)
@ -654,6 +906,66 @@ func (_c *Store_GetSystemName_Call) RunAndReturn(run func(context.Context, int)
return _c return _c
} }
// GetTablePartitions provides a mock function with given fields: ctx, schemaName, tableName
func (_m *Store) GetTablePartitions(ctx context.Context, schemaName string, tableName string) ([]database.PartitionResult, error) {
ret := _m.Called(ctx, schemaName, tableName)
if len(ret) == 0 {
panic("no return value specified for GetTablePartitions")
}
var r0 []database.PartitionResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) ([]database.PartitionResult, error)); ok {
return rf(ctx, schemaName, tableName)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string) []database.PartitionResult); ok {
r0 = rf(ctx, schemaName, tableName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]database.PartitionResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
r1 = rf(ctx, schemaName, tableName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Store_GetTablePartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTablePartitions'
type Store_GetTablePartitions_Call struct {
*mock.Call
}
// GetTablePartitions is a helper method to define mock.On call
// - ctx context.Context
// - schemaName string
// - tableName string
func (_e *Store_Expecter) GetTablePartitions(ctx interface{}, schemaName interface{}, tableName interface{}) *Store_GetTablePartitions_Call {
return &Store_GetTablePartitions_Call{Call: _e.mock.On("GetTablePartitions", ctx, schemaName, tableName)}
}
func (_c *Store_GetTablePartitions_Call) Run(run func(ctx context.Context, schemaName string, tableName string)) *Store_GetTablePartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *Store_GetTablePartitions_Call) Return(_a0 []database.PartitionResult, _a1 error) *Store_GetTablePartitions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Store_GetTablePartitions_Call) RunAndReturn(run func(context.Context, string, string) ([]database.PartitionResult, error)) *Store_GetTablePartitions_Call {
_c.Call.Return(run)
return _c
}
// GetTalkgroup provides a mock function with given fields: ctx, systemID, tGID // GetTalkgroup provides a mock function with given fields: ctx, systemID, tGID
func (_m *Store) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (database.GetTalkgroupRow, error) { func (_m *Store) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (database.GetTalkgroupRow, error) {
ret := _m.Called(ctx, systemID, tGID) ret := _m.Called(ctx, systemID, tGID)
@ -1958,6 +2270,64 @@ func (_c *Store_StoreTGVersion_Call) RunAndReturn(run func(context.Context, []da
return _c return _c
} }
// SweepCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd
func (_m *Store) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
ret := _m.Called(ctx, rangeStart, rangeEnd)
if len(ret) == 0 {
panic("no return value specified for SweepCalls")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok {
return rf(ctx, rangeStart, rangeEnd)
}
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok {
r0 = rf(ctx, rangeStart, rangeEnd)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok {
r1 = rf(ctx, rangeStart, rangeEnd)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Store_SweepCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SweepCalls'
type Store_SweepCalls_Call struct {
*mock.Call
}
// SweepCalls is a helper method to define mock.On call
// - ctx context.Context
// - rangeStart pgtype.Timestamptz
// - rangeEnd pgtype.Timestamptz
func (_e *Store_Expecter) SweepCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_SweepCalls_Call {
return &Store_SweepCalls_Call{Call: _e.mock.On("SweepCalls", ctx, rangeStart, rangeEnd)}
}
func (_c *Store_SweepCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_SweepCalls_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz))
})
return _c
}
func (_c *Store_SweepCalls_Call) Return(_a0 int64, _a1 error) *Store_SweepCalls_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Store_SweepCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_SweepCalls_Call {
_c.Call.Return(run)
return _c
}
// UpdatePassword provides a mock function with given fields: ctx, username, password // UpdatePassword provides a mock function with given fields: ctx, username, password
func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error { func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error {
ret := _m.Called(ctx, username, password) ret := _m.Called(ctx, username, password)

View file

@ -55,6 +55,111 @@ type Call struct {
Transcript *string `json:"transcript,omitempty"` Transcript *string `json:"transcript,omitempty"`
} }
type CallsP202407 struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type CallsP202408 struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type CallsP202409 struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type CallsP202410 struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type CallsP202411 struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type Incident struct { type Incident struct {
ID uuid.UUID `json:"id,omitempty"` ID uuid.UUID `json:"id,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
@ -66,9 +171,12 @@ type Incident struct {
} }
type IncidentsCall struct { type IncidentsCall struct {
IncidentID uuid.UUID `json:"incident_id,omitempty"` IncidentID uuid.UUID `json:"incident_id,omitempty"`
CallID uuid.UUID `json:"call_id,omitempty"` CallID uuid.UUID `json:"call_id,omitempty"`
Notes []byte `json:"notes,omitempty"` CallsTblID pgtype.UUID `json:"calls_tbl_id,omitempty"`
SweptCallID pgtype.UUID `json:"swept_call_id,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
Notes []byte `json:"notes,omitempty"`
} }
type Setting struct { type Setting struct {
@ -77,6 +185,27 @@ type Setting struct {
Value []byte `json:"value,omitempty"` Value []byte `json:"value,omitempty"`
} }
type SweptCall struct {
ID uuid.UUID `json:"id,omitempty"`
Submitter *int32 `json:"submitter,omitempty"`
System int `json:"system,omitempty"`
Talkgroup int `json:"talkgroup,omitempty"`
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
AudioName *string `json:"audio_name,omitempty"`
AudioBlob []byte `json:"audio_blob,omitempty"`
Duration *int32 `json:"duration,omitempty"`
AudioType *string `json:"audio_type,omitempty"`
AudioUrl *string `json:"audio_url,omitempty"`
Frequency int `json:"frequency,omitempty"`
Frequencies []int `json:"frequencies,omitempty"`
Patches []int `json:"patches,omitempty"`
TGLabel *string `json:"tg_label,omitempty"`
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
TGGroup *string `json:"tg_group,omitempty"`
Source int `json:"source,omitempty"`
Transcript *string `json:"transcript,omitempty"`
}
type System struct { type System struct {
ID int `json:"id,omitempty"` ID int `json:"id,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`

160
pkg/database/partitions.go Normal file
View file

@ -0,0 +1,160 @@
package database
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
)
var (
ErrLowerBoundAfterUpperBound = errors.New("lower bound after upper bound")
ErrCantDecodePartitionBounds = errors.New("cannot decode partition bounds")
)
type PartitionResult struct {
ParentTable string
Schema string
Name string
LowerBound string
UpperBound string
}
type partitionsQuerier interface {
GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error)
CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error
DetachPartition(ctx context.Context, parentTable, partitionName string) error
DropPartition(ctx context.Context, partitionName string) error
}
func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName string) (partitions []PartitionResult, err error) {
query := fmt.Sprintf(`
WITH parts as (
SELECT
relnamespace::regnamespace as schema,
c.oid::pg_catalog.regclass AS part_name,
regexp_match(pg_get_expr(c.relpartbound, c.oid),
'FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)') AS bounds
FROM
pg_catalog.pg_class c JOIN pg_catalog.pg_inherits i ON (c.oid = i.inhrelid)
WHERE i.inhparent = '%s.%s'::regclass
AND c.relkind='r'
)
SELECT
schema,
part_name as name,
'%s' as parentTable,
bounds[1]::text AS lowerBound,
bounds[2]::text AS upperBound
FROM parts
ORDER BY part_name;`, schemaName, tableName, tableName)
rows, err := q.db.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to get partitions: %w", err)
}
partitions, err = pgx.CollectRows(rows, pgx.RowToStructByName[PartitionResult])
if err != nil {
return nil, fmt.Errorf("failed to cast list: %w", err)
}
return partitions, nil
}
func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error {
const boundFmt = "2006-01-02 00:00:00Z00"
_, err := q.db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');`, partitionName, parentTable, start.Format(boundFmt), end.Format(boundFmt)))
return err
}
func (q *Queries) DropPartition(ctx context.Context, partitionName string) error {
_, err := q.db.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, partitionName))
return err
}
func (q *Queries) DetachPartition(ctx context.Context, parentTable, partitionName string) error {
_, err := q.db.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s;`, parentTable, partitionName))
return err
}
func (partition PartitionResult) ParseBounds() (lowerBound time.Time, upperBound time.Time, err error) {
lowerBound, upperBound, err = parseBoundAsDate(partition)
if err == nil {
return lowerBound, upperBound, nil
}
lowerBound, upperBound, err = parseBoundAsDateTime(partition)
if err == nil {
return lowerBound, upperBound, nil
}
lowerBound, upperBound, err = parseBoundAsDateTimeWithTimezone(partition)
if err == nil {
return lowerBound, upperBound, nil
}
if lowerBound.After(lowerBound) {
return time.Time{}, time.Time{}, ErrLowerBoundAfterUpperBound
}
return time.Time{}, time.Time{}, ErrCantDecodePartitionBounds
}
func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.ParseInLocation("2006-01-02", partition.LowerBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err)
}
upperBound, err = time.ParseInLocation("2006-01-02", partition.UpperBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err)
}
return lowerBound, upperBound, nil
}
func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.LowerBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err)
}
upperBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.UpperBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err)
}
return lowerBound, upperBound, nil
}
func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.LowerBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err)
}
upperBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.UpperBound, time.UTC)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err)
}
lowerBound = convertToDateTimeWithoutTimezone(lowerBound)
upperBound = convertToDateTimeWithoutTimezone(upperBound)
return lowerBound, upperBound, nil
}
func convertToDateTimeWithoutTimezone(bound time.Time) time.Time {
parsedTime, err := time.Parse("2006-01-02 15:04:05", bound.UTC().Format("2006-01-02 15:04:05"))
if err != nil {
return time.Time{}
}
return parsedTime
}

View file

@ -0,0 +1,152 @@
package partman
import (
"fmt"
"time"
)
const (
daysInWeek = 7
monthsInQuarter = 3
)
func getDailyBounds(date time.Time) (lowerBound, upperBound time.Time) {
lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
upperBound = lowerBound.AddDate(0, 0, 1)
return
}
func getWeeklyBounds(date time.Time) (lowerBound, upperBound time.Time) {
lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -int(date.Weekday()-time.Monday))
upperBound = lowerBound.AddDate(0, 0, daysInWeek)
return
}
func getMonthlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
lowerBound = time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.UTC)
upperBound = lowerBound.AddDate(0, 1, 0)
return
}
func getQuarterlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
year, _, _ := date.Date()
quarter := (int(date.Month()) - 1) / monthsInQuarter
firstMonthOfTheQuarter := time.Month(quarter*monthsInQuarter + 1)
lowerBound = time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC)
upperBound = lowerBound.AddDate(0, monthsInQuarter, 0)
return
}
func getYearlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
lowerBound = time.Date(date.Year(), 1, 1, 0, 0, 0, 0, time.UTC)
upperBound = lowerBound.AddDate(1, 0, 0)
return
}
func (p partition) Next(i int) partition {
var t time.Time
switch p.Interval {
case Daily:
t = p.Time.AddDate(0, 0, i)
case Weekly:
t = p.Time.AddDate(0, 0, i*daysInWeek)
case Monthly:
year, month, _ := p.Time.Date()
t = time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, p.Time.Location())
case Quarterly:
t = p.Time.AddDate(0, i*monthsInQuarter, 0)
case Yearly:
year, _, _ := p.Time.Date()
t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location())
}
np := partition{
ParentTable: p.ParentTable,
Name: p.Name,
Schema: p.Schema,
Interval: p.Interval,
Time: t,
}
np.setName()
return np
}
func (p *partition) setName() {
t := p.Time
var suffix string
switch p.Interval {
case Daily:
suffix = t.Format("2006_01_02")
case Weekly:
year, week := t.ISOWeek()
suffix = fmt.Sprintf("%d_w%02d", year, week)
case Monthly:
suffix = t.Format("2006_01")
case Quarterly:
year, month, _ := t.Date()
var quarter int
switch {
case month >= 1 && month <= 3:
quarter = 1
case month >= 4 && month <= 6:
quarter = 2
case month >= 7 && month <= 9:
quarter = 3
case month >= 10 && month <= 12:
quarter = 4
}
suffix = fmt.Sprintf("%d_q%d", year, quarter)
case Yearly:
suffix = t.Format("2006")
default:
panic(ErrInvalidInterval(p.Interval))
}
p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix)
}
func (p partition) Prev(i int) partition {
var t time.Time
switch p.Interval {
case Daily:
t = p.Time.AddDate(0, 0, -i)
case Weekly:
t = p.Time.AddDate(0, 0, -i*daysInWeek)
case Monthly:
year, month, _ := p.Time.Date()
t = time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, p.Time.Location())
case Quarterly:
t = p.Time.AddDate(0, -i*monthsInQuarter, 0)
case Yearly:
year, _, _ := p.Time.Date()
t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location())
}
pp := partition{
ParentTable: p.ParentTable,
Name: p.Name,
Schema: p.Schema,
Interval: p.Interval,
Time: t,
}
pp.setName()
return pp
}

View file

@ -0,0 +1,449 @@
package partman
// portions lifted gratefully from github.com/qonto/postgresql-partition-manager, MIT license.
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"dynatron.me/x/stillbox/internal/isoweek"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"
)
const (
callsTable = "calls"
preProvisionDefault = 1
)
var (
ErrWrongSchema = errors.New("wrong schema name")
ErrDifferentInterval = errors.New("stored partition interval differs from configured")
)
type PartitionErr struct {
p string
err error
}
func (pe PartitionErr) Error() string {
r := fmt.Sprintf("bad partition '%s'", pe.p)
if pe.err != nil {
r += ": " + pe.err.Error()
}
return r
}
func (pe PartitionErr) Unwrap() error {
return pe.err
}
type ParsedIntvlErr struct {
parsed, start time.Time
}
func (pie ParsedIntvlErr) Error() string {
return fmt.Sprintf("parsed interval (%s) does not match start (%s)", pie.parsed, pie.start)
}
func PartitionError(pname string, err ...error) PartitionErr {
if len(err) > 0 {
return PartitionErr{p: pname, err: err[0]}
}
return PartitionErr{p: pname}
}
type ErrInvalidInterval string
func (in ErrInvalidInterval) Error() string {
return fmt.Sprintf("invalid interval '%s'", string(in))
}
type Interval string
const (
Unknown Interval = ""
Daily Interval = "daily"
Weekly Interval = "weekly"
Monthly Interval = "monthly"
Quarterly Interval = "quarterly"
Yearly Interval = "yearly"
)
func (p Interval) IsValid() bool {
switch p {
case Daily, Weekly, Monthly, Quarterly, Yearly:
return true
}
return false
}
type PartitionManager interface {
Go(ctx context.Context)
Check(ctx context.Context, now time.Time) error
}
type partman struct {
db database.Store
cfg config.Partition
intv Interval
}
type partition struct {
ParentTable string
Schema string
Name string
Interval Interval
Time time.Time
}
func New(db database.Store, cfg config.Partition) (*partman, error) {
pm := &partman{
cfg: cfg,
db: db,
intv: Interval(cfg.Interval),
}
if !pm.intv.IsValid() {
return nil, ErrInvalidInterval(pm.intv)
}
return pm, nil
}
var _ PartitionManager = (*partman)(nil)
func (pm *partman) Go(ctx context.Context) {
tick := time.NewTicker(60 * time.Minute)
select {
case now := <-tick.C:
err := pm.Check(ctx, now)
if err != nil {
log.Error().Err(err).Msg("partman check failed")
}
case <-ctx.Done():
return
}
}
func (pm *partman) newPartition(t time.Time) partition {
p := partition{
ParentTable: callsTable,
Schema: pm.cfg.Schema,
Interval: Interval(pm.cfg.Interval),
Time: t,
}
p.setName()
return p
}
func (pm *partman) retentionPartitions(cur partition) []partition {
partitions := make([]partition, 0, pm.cfg.Retain)
for i := 1; i <= pm.cfg.Retain; i++ {
prev := cur.Prev(i)
partitions = append(partitions, prev)
}
return partitions
}
func (pm *partman) futurePartitions(cur partition) []partition {
preProv := preProvisionDefault
if pm.cfg.PreProvision != nil {
preProv = *pm.cfg.PreProvision
}
partitions := make([]partition, 0, preProv)
for i := 1; i <= preProv; i++ {
next := cur.Next(i)
partitions = append(partitions, next)
}
return partitions
}
func (pm *partman) expectedPartitions(now time.Time) []partition {
curPart := pm.newPartition(now)
shouldExist := []partition{curPart}
if pm.cfg.Retain > -1 {
retain := pm.retentionPartitions(curPart)
shouldExist = append(shouldExist, retain...)
}
future := pm.futurePartitions(curPart)
shouldExist = append(shouldExist, future...)
return shouldExist
}
func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) {
existing := make(map[string]partition)
expectedAndExists := make(map[string]bool)
for _, t := range existingTables {
existing[t.PartitionName()] = t
}
for _, t := range expectedTables {
if _, found := existing[t.PartitionName()]; found {
expectedAndExists[t.PartitionName()] = true
} else {
missingTables = append(missingTables, t)
}
}
for _, t := range existingTables {
if _, found := expectedAndExists[t.PartitionName()]; !found {
// Only in existingTables and not in both
unexpectedTables = append(unexpectedTables, t)
}
}
return unexpectedTables, missingTables
}
func (pm *partman) existingPartitions(parts []database.PartitionResult) ([]partition, error) {
existing := make([]partition, 0, len(parts))
for _, v := range parts {
if v.Schema != pm.cfg.Schema {
return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema)
}
p, err := pm.verifyPartName(v)
if err != nil {
return nil, err
}
if p.Interval != Interval(pm.cfg.Interval) {
return nil, PartitionError(v.Schema+"."+v.Name, ErrDifferentInterval)
}
existing = append(existing, p)
}
return existing, nil
}
func (pm *partman) fullTableName(s string) string {
return fmt.Sprintf("%s.%s", pm.cfg.Schema, s)
}
func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error {
s, e := p.Range()
start := pgtype.Timestamptz{Time: s, Valid: true}
end := pgtype.Timestamptz{Time: e, Valid: true}
fullPartName := pm.fullTableName(p.PartitionName())
swept, err := tx.SweepCalls(ctx, start, end)
if err != nil {
return err
}
log.Info().Int64("rows", swept).Time("start", s).Time("end", e).Msg("swept calls")
swept, err = tx.CleanupSweptCalls(ctx, start, end)
if err != nil {
return err
}
log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls")
log.Info().Str("partition", fullPartName).Msg("detaching partition")
err = tx.DetachPartition(ctx, callsTable, fullPartName)
if err != nil {
return err
}
if pm.cfg.Drop {
log.Info().Str("partition", fullPartName).Msg("dropping partition")
return tx.DropPartition(ctx, fullPartName)
}
return nil
}
func (pm *partman) Check(ctx context.Context, now time.Time) error {
return pm.db.InTx(ctx, func(db database.Store) error {
// by default, we want to make sure a partition exists for this and next month
// since we run this at startup, it's safe to do only that.
partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable)
if err != nil {
return err
}
existing, err := pm.existingPartitions(partitions)
if err != nil {
return err
}
expected := pm.expectedPartitions(now)
unexpected, missing := pm.comparePartitions(existing, expected)
if pm.cfg.Retain > -1 {
for _, p := range unexpected {
err := pm.prunePartition(ctx, db, p)
if err != nil {
return err
}
}
}
for _, p := range missing {
err := pm.createPartition(ctx, db, p)
if err != nil {
return err
}
}
return nil
}, pgx.TxOptions{})
}
func (p partition) Range() (time.Time, time.Time) {
switch p.Interval {
case Daily:
return getDailyBounds(p.Time)
case Weekly:
return getWeeklyBounds(p.Time)
case Monthly:
return getMonthlyBounds(p.Time)
case Quarterly:
return getQuarterlyBounds(p.Time)
case Yearly:
return getYearlyBounds(p.Time)
}
panic("unknown interval!")
}
func (p partition) PartitionName() string {
return p.Name
}
func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error {
start, end := part.Range()
name := part.PartitionName()
log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition")
return tx.CreatePartition(ctx, callsTable, name, start, end)
}
/*
* Partition scheme names:
* daily: calls_p_2024_11_28
* weekly: calls_p_2024_w48
* monthly: calls_p_2024_11
* quarterly: calls_p_2024_q4
* yearly: calls_p_2024
*/
func (pm *partman) verifyPartName(pr database.PartitionResult) (p partition, err error) {
pn := pr.Name
low, _, err := pr.ParseBounds()
if err != nil {
return
}
p = partition{
ParentTable: pr.ParentTable,
Name: pr.Name,
Schema: pr.Schema,
Time: low,
}
dateAr := strings.Split(pn, "calls_p_")
if len(dateAr) != 2 {
return p, PartitionError(pn)
}
dateAr = strings.Split(dateAr[1], "_")
switch len(dateAr) {
case 3: // daily
p.Interval = Daily
ymd := [3]int{}
for i := 0; i < 3; i++ {
r, err := strconv.Atoi(dateAr[i])
if err != nil {
return p, PartitionError(pn, err)
}
ymd[i] = r
}
parsed := time.Date(ymd[0], time.Month(ymd[1]), ymd[2], 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
case 2:
year, err := strconv.Atoi(dateAr[0])
if err != nil {
return p, PartitionError(pn, err)
}
if strings.HasPrefix(dateAr[1], "w") {
p.Interval = Weekly
weekNum, err := strconv.Atoi(dateAr[1][1:])
if err != nil {
return p, PartitionError(pn, err)
}
parsed := isoweek.StartTime(year, weekNum, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
} else if strings.HasPrefix(dateAr[1], "q") {
p.Interval = Quarterly
quarterNum, err := strconv.Atoi(dateAr[1][1:])
if err != nil {
return p, PartitionError(pn, err)
}
if quarterNum > 4 {
return p, PartitionError(pn, errors.New("invalid quarter"))
}
firstMonthOfTheQuarter := time.Month((quarterNum-1)*monthsInQuarter + 1)
parsed := time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
}
// monthly
p.Interval = Monthly
month, err := strconv.Atoi(dateAr[1])
if err != nil {
return p, PartitionError(pn)
}
parsed := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
case 1: // yearly
p.Interval = Yearly
year, err := strconv.Atoi(dateAr[0])
if err != nil {
return p, PartitionError(pn, err)
}
parsed := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
}
return p, PartitionError(pn)
}

View file

@ -0,0 +1,431 @@
package partman_test
import (
"context"
"testing"
"time"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/database/mocks"
"dynatron.me/x/stillbox/pkg/database/partman"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
var mctx = mock.Anything
func inTx(s *mocks.Store) {
s.EXPECT().InTx(mctx, mock.AnythingOfType("func(database.Store) error"), mock.AnythingOfType("pgx.TxOptions")).RunAndReturn(func(ctx context.Context, f func(db database.Store) error, po pgx.TxOptions) error {
return f(s)
})
}
type timeRange struct {
start time.Time
end time.Time
}
type partSpec struct {
name string
timeRange
}
func TestPartman(t *testing.T) {
ctx := context.Background()
timeInUTC := func(s string) time.Time {
t, err := time.ParseInLocation("2006-01-02 15:04:05", s, time.UTC)
if err != nil {
panic(err)
}
return t
}
dateInUTC := func(s string) time.Time {
t, err := time.ParseInLocation("2006-01-02", s, time.UTC)
if err != nil {
panic(err)
}
return t
}
partResultWithSchema := func(schema, name, low, up string) database.PartitionResult {
return database.PartitionResult{
ParentTable: "calls",
Schema: schema,
Name: name,
LowerBound: low,
UpperBound: up,
}
}
partResult := func(name, low, up string) database.PartitionResult {
return partResultWithSchema("public", name, low, up)
}
dailyTR := func(tr string) timeRange {
dtr := dateInUTC(tr)
etr := dtr.AddDate(0, 0, 1)
return timeRange{start: dtr, end: etr}
}
tests := []struct {
name string
now time.Time
cfg config.Partition
extant []database.PartitionResult
expectCreate []partSpec
expectDrop []string
expectDetach []string
expectSweep []timeRange
expectCleanup []timeRange
expectErr error
}{
{
name: "monthly base",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "monthly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectCreate: []partSpec{
{name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}},
{name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}},
{name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}},
},
expectDrop: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectSweep: []timeRange{
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectCleanup: []timeRange{
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectDetach: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
},
{
name: "monthly retain all",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "monthly",
Retain: -1,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectCreate: []partSpec{
{name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}},
{name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}},
{name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}},
},
expectDrop: []string{},
expectSweep: []timeRange{},
expectCleanup: []timeRange{},
expectDetach: []string{},
},
{
name: "weekly base",
now: timeInUTC("2024-11-28 11:37:04"), // week 48
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "weekly",
Retain: 2,
Drop: false,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_w44", "2024-10-28", "2024-11-04"),
partResult("calls_p_2024_w45", "2024-11-04", "2024-11-11"),
partResult("calls_p_2024_w46", "2024-11-11", "2024-11-18"),
// missing week 47
},
expectCreate: []partSpec{
{name: "calls_p_2024_w47", timeRange: timeRange{start: dateInUTC("2024-11-18"), end: dateInUTC("2024-11-25")}},
{name: "calls_p_2024_w48", timeRange: timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-12-02")}},
{name: "calls_p_2024_w49", timeRange: timeRange{start: dateInUTC("2024-12-02"), end: dateInUTC("2024-12-09")}},
{name: "calls_p_2024_w50", timeRange: timeRange{start: dateInUTC("2024-12-09"), end: dateInUTC("2024-12-16")}},
},
expectSweep: []timeRange{
{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
},
expectCleanup: []timeRange{
{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
},
expectDetach: []string{
"public.calls_p_2024_w44",
"public.calls_p_2024_w45",
},
},
{
name: "daily base",
now: timeInUTC("2024-12-31 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "daily",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_12_26", "2024-12-26", "2024-12-27"),
partResult("calls_p_2024_12_27", "2024-12-27", "2024-12-28"),
partResult("calls_p_2024_12_30", "2024-12-30", "2024-12-31"),
partResult("calls_p_2024_12_31", "2024-12-31", "2025-01-01"),
},
expectCreate: []partSpec{
{name: "calls_p_2024_12_29", timeRange: dailyTR("2024-12-29")},
{name: "calls_p_2025_01_01", timeRange: dailyTR("2025-01-01")},
{name: "calls_p_2025_01_02", timeRange: dailyTR("2025-01-02")},
},
expectDrop: []string{
"public.calls_p_2024_12_26",
"public.calls_p_2024_12_27",
},
expectSweep: []timeRange{
{start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")},
{start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")},
},
expectCleanup: []timeRange{
{start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")},
{start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")},
},
expectDetach: []string{
"public.calls_p_2024_12_26",
"public.calls_p_2024_12_27",
},
},
{
name: "quarterly base",
now: timeInUTC("2025-07-28 11:37:04"), // q3
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "quarterly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_q3", "2024-07-01", "2024-10-01"),
partResult("calls_p_2024_q4", "2024-10-01", "2025-01-01"),
partResult("calls_p_2025_q1", "2025-01-01", "2024-04-01"),
partResult("calls_p_2025_q2", "2025-04-01", "2024-07-01"),
},
expectDrop: []string{
"public.calls_p_2024_q3",
"public.calls_p_2024_q4",
},
expectSweep: []timeRange{
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")},
{start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")},
},
expectCleanup: []timeRange{
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")},
{start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")},
},
expectCreate: []partSpec{
{name: "calls_p_2025_q3", timeRange: timeRange{dateInUTC("2025-07-01"), dateInUTC("2025-10-01")}},
{name: "calls_p_2025_q4", timeRange: timeRange{dateInUTC("2025-10-01"), dateInUTC("2026-01-01")}},
{name: "calls_p_2026_q1", timeRange: timeRange{dateInUTC("2026-01-01"), dateInUTC("2026-04-01")}},
},
expectDetach: []string{
"public.calls_p_2024_q3",
"public.calls_p_2024_q4",
},
},
{
name: "yearly base",
now: timeInUTC("2023-04-28 11:37:04"), // q3
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "yearly",
Retain: 3,
Drop: false,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2019", "2019-01-01", "2020-01-01"),
partResult("calls_p_2020", "2020-01-01", "2021-01-01"),
partResult("calls_p_2021", "2021-01-01", "2022-01-01"),
partResult("calls_p_2022", "2022-01-01", "2023-01-01"),
partResult("calls_p_2023", "2023-01-01", "2024-01-01"),
},
expectDrop: []string{},
expectSweep: []timeRange{
{start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")},
},
expectCleanup: []timeRange{
{start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")},
},
expectCreate: []partSpec{
{name: "calls_p_2024", timeRange: timeRange{dateInUTC("2024-01-01"), dateInUTC("2025-01-01")}},
{name: "calls_p_2025", timeRange: timeRange{dateInUTC("2025-01-01"), dateInUTC("2026-01-01")}},
},
expectDetach: []string{
"public.calls_p_2019",
},
},
{
name: "changed monthly to daily",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "daily",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectErr: partman.ErrDifferentInterval,
},
{
name: "monthly wrong schema",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "monthly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
partResultWithSchema("reid", "calls_p_2024_09", "2024-09-01", "2024-10-01"),
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectErr: partman.ErrWrongSchema,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
db := mocks.NewStore(t)
createdPartitions := make([]partSpec, 0, len(tc.expectCreate))
sweptRanges := make([]timeRange, 0, len(tc.expectSweep))
droppedPartitions := make([]string, 0, len(tc.expectDrop))
cleanupRanges := make([]timeRange, 0, len(tc.expectCleanup))
detachedPartitions := make([]string, 0, len(tc.expectDetach))
sweepMap := make(map[timeRange]struct{})
if len(tc.expectCreate) > 0 {
db.EXPECT().
CreatePartition(
mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Time"),
mock.AnythingOfType("time.Time"),
).
Run(func(ctx context.Context, tableName, partitionName string, start, end time.Time) {
ps := partSpec{name: partitionName, timeRange: timeRange{start: start, end: end}}
createdPartitions = append(createdPartitions, ps)
}).Return(nil)
}
if len(tc.expectSweep) > 0 {
db.EXPECT().
SweepCalls(
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
).
Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
tr := timeRange{start: start.Time, end: end.Time}
sweepMap[tr] = struct{}{}
sweptRanges = append(sweptRanges, tr)
}).Return(30, nil)
}
if len(tc.expectCleanup) > 0 {
db.EXPECT().
CleanupSweptCalls(
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
).Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
tr := timeRange{start: start.Time, end: end.Time}
require.Contains(t, sweepMap, tr)
cleanupRanges = append(cleanupRanges, tr)
}).Return(30, nil)
}
if tc.cfg.Drop && len(tc.expectDrop) > 0 {
db.EXPECT().
DropPartition(mctx, mock.AnythingOfType("string")).
Run(func(ctx context.Context, partName string) {
droppedPartitions = append(droppedPartitions, partName)
}).Return(nil)
}
if len(tc.expectDetach) > 0 {
db.EXPECT().
DetachPartition(
mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string")).
Run(func(ctx context.Context, parentTable, partName string) {
detachedPartitions = append(detachedPartitions, partName)
}).Return(nil)
}
inTx(db)
db.EXPECT().GetTablePartitions(mctx, "public", "calls").Return(tc.extant, nil)
pm, err := partman.New(db, tc.cfg)
require.NoError(t, err)
err = pm.Check(ctx, tc.now)
if tc.expectErr != nil {
assert.ErrorIs(t, err, tc.expectErr)
} else {
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectCreate, createdPartitions, "created partitions")
assert.ElementsMatch(t, tc.expectSweep, sweptRanges, "swept ranges")
assert.ElementsMatch(t, tc.expectDrop, droppedPartitions, "dropped partitions")
assert.ElementsMatch(t, tc.expectCleanup, cleanupRanges, "cleaned up ranges")
assert.ElementsMatch(t, tc.expectDetach, detachedPartitions, "detached partitions")
}
})
}
}

View file

@ -15,6 +15,7 @@ type Querier interface {
AddAlert(ctx context.Context, arg AddAlertParams) error AddAlert(ctx context.Context, arg AddAlertParams) error
AddCall(ctx context.Context, arg AddCallParams) error AddCall(ctx context.Context, arg AddCallParams) error
AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error) AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error)
CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error)
CreateUser(ctx context.Context, arg CreateUserParams) (User, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error)
DeleteAPIKey(ctx context.Context, apiKey string) error DeleteAPIKey(ctx context.Context, apiKey string) error
@ -42,6 +43,7 @@ type Querier interface {
SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults
SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
UpdatePassword(ctx context.Context, username string, password string) error UpdatePassword(ctx context.Context, username string, password string) error
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults

View file

@ -205,7 +205,7 @@ func (n *notifier) Send(ctx context.Context, alerts []alert.Alert) error {
return nil return nil
} }
func New(cfg config.Notify) (Notifier, error) { func New(cfg config.Notify) (*notifier, error) {
n := new(notifier) n := new(notifier)
for _, s := range cfg { for _, s := range cfg {

View file

@ -20,7 +20,7 @@ type API interface {
type api struct { type api struct {
} }
func New() API { func New() *api {
s := new(api) s := new(api)
return s return s

View file

@ -10,6 +10,7 @@ import (
"dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/database/partman"
"dynatron.me/x/stillbox/pkg/nexus" "dynatron.me/x/stillbox/pkg/nexus"
"dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/notify"
"dynatron.me/x/stillbox/pkg/rest" "dynatron.me/x/stillbox/pkg/rest"
@ -40,6 +41,7 @@ type Server struct {
hup chan os.Signal hup chan os.Signal
tgs tgstore.Store tgs tgstore.Store
rest rest.API rest rest.API
partman partman.PartitionManager
} }
func New(ctx context.Context, cfg *config.Configuration) (*Server, error) { func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
@ -79,6 +81,18 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
rest: api, rest: api,
} }
if cfg.DB.Partition.Enabled {
srv.partman, err = partman.New(db, cfg.DB.Partition)
if err != nil {
return nil, err
}
err = srv.partman.Check(ctx, time.Now())
if err != nil {
return nil, err
}
}
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true)
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false) srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false)
@ -128,6 +142,10 @@ func (s *Server) Go(ctx context.Context) error {
go s.nex.Go(ctx) go s.nex.Go(ctx)
go s.alerter.Go(ctx) go s.alerter.Go(ctx)
if pm := s.partman; pm != nil {
go pm.Go(ctx)
}
var err error var err error
go func() { go func() {
err = httpSrv.ListenAndServe() err = httpSrv.ListenAndServe()

View file

@ -36,7 +36,7 @@ type sinks struct {
sinks map[string]sinkInstance sinks map[string]sinkInstance
} }
func NewSinkManager() Sinks { func NewSinkManager() *sinks {
return &sinks{ return &sinks{
sinks: make(map[string]sinkInstance), sinks: make(map[string]sinkInstance),
} }

View file

@ -150,7 +150,7 @@ type cache struct {
} }
// NewCache returns a new cache Store. // NewCache returns a new cache Store.
func NewCache() Store { func NewCache() *cache {
tgc := &cache{ tgc := &cache{
tgs: make(tgMap), tgs: make(tgMap),
systems: make(map[int32]string), systems: make(map[int32]string),

View file

@ -78,7 +78,33 @@ CREATE TABLE IF NOT EXISTS alerts(
metadata JSONB metadata JSONB
); );
CREATE TABLE IF NOT EXISTS calls( CREATE TABLE calls (
id UUID,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL,
talkgroup INTEGER NOT NULL,
call_date TIMESTAMPTZ NOT NULL,
audio_name TEXT,
audio_blob BYTEA,
duration INTEGER,
audio_type TEXT,
audio_url TEXT,
frequency INTEGER NOT NULL,
frequencies INTEGER[],
patches INTEGER[],
tg_label TEXT,
tg_alpha_tag TEXT,
tg_group TEXT,
source INTEGER NOT NULL,
transcript TEXT,
PRIMARY KEY (id, call_date),
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
) PARTITION BY RANGE (call_date);
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript));
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date);
CREATE TABLE swept_calls (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL, system INTEGER NOT NULL,
@ -100,8 +126,9 @@ CREATE TABLE IF NOT EXISTS calls(
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
); );
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); CREATE INDEX IF NOT EXISTS swept_calls_transcript_idx ON swept_calls USING GIN (to_tsvector('english', transcript));
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); CREATE INDEX IF NOT EXISTS swept_calls_call_date_tg_idx ON swept_calls(system, talkgroup, call_date);
CREATE TABLE IF NOT EXISTS settings( CREATE TABLE IF NOT EXISTS settings(
name TEXT PRIMARY KEY, name TEXT PRIMARY KEY,
@ -125,8 +152,14 @@ CREATE INDEX IF NOT EXISTS incidents_name_description_idx ON incidents USING GIN
); );
CREATE TABLE IF NOT EXISTS incidents_calls( CREATE TABLE IF NOT EXISTS incidents_calls(
incident_id UUID REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE,
call_id UUID REFERENCES calls(id) ON UPDATE CASCADE, call_id UUID NOT NULL,
calls_tbl_id UUID NULL,
swept_call_id UUID NULL REFERENCES swept_calls(id),
call_date TIMESTAMPTZ NULL,
notes JSONB, notes JSONB,
FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date),
PRIMARY KEY (incident_id, call_id) PRIMARY KEY (incident_id, call_id)
); );

View file

@ -56,3 +56,23 @@ VALUES
-- name: GetDatabaseSize :one -- name: GetDatabaseSize :one
SELECT pg_size_pretty(pg_database_size(current_database())); SELECT pg_size_pretty(pg_database_size(current_database()));
-- name: SweepCalls :execrows
WITH to_sweep AS (
SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type,
audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= @range_start AND calls.call_date < @range_end
) INSERT INTO swept_calls SELECT * FROM to_sweep;
-- name: CleanupSweptCalls :execrows
WITH to_sweep AS (
SELECT id FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= @range_start AND calls.call_date < @range_end
) UPDATE incidents_calls
SET
swept_call_id = call_id,
calls_tbl_id = NULL
WHERE call_id IN (SELECT id FROM to_sweep);

View file

@ -37,3 +37,6 @@ sql:
import: "dynatron.me/x/stillbox/internal/jsontypes" import: "dynatron.me/x/stillbox/internal/jsontypes"
type: "Metadata" type: "Metadata"
nullable: true nullable: true
- column: "pg_catalog.pg_tables.tablename"
go_type: string
nullable: false