diff --git a/.mockery.yaml b/.mockery.yaml index fa78e2c..a0b5082 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -8,3 +8,4 @@ packages: config: interfaces: Store: + DBTX: diff --git a/pkg/config/config.go b/pkg/config/config.go index ae53c66..5a8d110 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -39,50 +39,18 @@ type CORS struct { } type DB struct { - Connect string `yaml:"connect"` - LogQueries bool `yaml:"logQueries"` + Connect string `yaml:"connect"` + LogQueries bool `yaml:"logQueries"` Partition Partition `yaml:"partition"` } -type PartitionInterval string -const ( - PartDaily PartitionInterval = "daily" - PartWeekly PartitionInterval = "weekly" - PartMonthly PartitionInterval = "monthly" - PartQuarterly PartitionInterval = "quarterly" - PartYearly PartitionInterval = "yearly" -) - -func (p PartitionInterval) IsValid() bool { - switch p { - case PartDaily, PartWeekly, PartMonthly, PartQuarterly, PartYearly: - return true - } - - return false -} - -type PartitionPolicy string -const ( - PartPolDetach PartitionPolicy = "detach" - PartPolDrop PartitionPolicy = "drop" -) - -func (p PartitionPolicy) IsValid() bool { - switch p { - case PartPolDetach, PartPolDrop: - return true - } - - return false -} - type Partition struct { - Enabled bool `yaml:"enabled"` - Schema string `yaml:"schema"` - Interval PartitionInterval `yaml:"interval"` - Retain int `yaml:"retain"` - Detach bool `yaml:"detach"` + Enabled bool `yaml:"enabled"` + Schema string `yaml:"schema"` + Interval string `yaml:"interval"` + Retain int `yaml:"retain"` + PreProvision *int `yaml:"preProvision"` + Drop bool `yaml:"detach"` } type Logger struct { diff --git a/pkg/database/calls.sql.go b/pkg/database/calls.sql.go index 7e535d0..308f7f1 100644 --- a/pkg/database/calls.sql.go +++ b/pkg/database/calls.sql.go @@ -135,29 +135,21 @@ func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) error { return err } -const getCallsPartitions = `-- name: GetCallsPartitions :many -SELECT tablename FROM pg_catalog.pg_tables -WHERE schemaname = $1 AND tablename LIKE 'calls\_p\_____\___' +const cleanupSweptCalls = `-- name: CleanupSweptCalls :exec +WITH to_sweep AS ( + SELECT id FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= $1 AND calls.call_date < $2 +) UPDATE incidents_calls + SET + swept_call_id = call_id, + calls_tbl_id = NULL + WHERE call_id IN (SELECT id FROM to_sweep) ` -func (q *Queries) GetCallsPartitions(ctx context.Context, schemaName *string) ([]*string, error) { - rows, err := q.db.Query(ctx, getCallsPartitions, schemaName) - if err != nil { - return nil, err - } - defer rows.Close() - var items []*string - for rows.Next() { - var tablename *string - if err := rows.Scan(&tablename); err != nil { - return nil, err - } - items = append(items, tablename) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil +func (q *Queries) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error { + _, err := q.db.Exec(ctx, cleanupSweptCalls, rangeStart, rangeEnd) + return err } const getDatabaseSize = `-- name: GetDatabaseSize :one @@ -179,3 +171,16 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip _, err := q.db.Exec(ctx, setCallTranscript, iD, transcript) return err } + +const sweepCalls = `-- name: SweepCalls :exec +WITH to_sweep AS ( + SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, ic.call_date, notes FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= $1 AND calls.call_date < $2 +) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, to_sweep.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, to_sweep.call_date, notes FROM to_sweep +` + +func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error { + _, err := q.db.Exec(ctx, sweepCalls, rangeStart, rangeEnd) + return err +} diff --git a/pkg/database/database.go b/pkg/database/database.go index 8f84ad1..fa3a25b 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -23,9 +23,7 @@ import ( type Store interface { Querier talkgroupQuerier - - GetSearchPath(ctx context.Context) (string, error) - GetSchemaName(ctx context.Context) (string, error) + partitionsQuerier DB() *Postgres DBTX() DBTX @@ -69,37 +67,6 @@ func (db *Postgres) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOp return nil } -func (db *Postgres) GetSearchPath(ctx context.Context) (string, error) { - var searchPath *string - err := db.QueryRow(ctx, `SHOW SEARCH_PATH;`).Scan(&searchPath) - if err != nil { - return "", err - } - - if searchPath == nil { - return "", errors.New("search path was null!") - } - - return *searchPath, err -} - -func (db *Postgres) GetSchemaName(ctx context.Context) (string, error) { - searchPath, err := db.GetSearchPath(ctx) - if err != nil { - return "", err - } - // we only support either the default SEARCH_PATH or one - // with only one element in it that is not a variable substitution. - - schemaName := "public" - if searchPath != "$user, public" { - sar := strings.Split(searchPath, ",") - schemaName = sar[0] - } - - return schemaName, nil -} - type dbLogger struct{} func (m dbLogger) Log(ctx context.Context, level tracelog.LogLevel, msg string, data map[string]any) { diff --git a/pkg/database/mocks/DBTX.go b/pkg/database/mocks/DBTX.go new file mode 100644 index 0000000..4de20fe --- /dev/null +++ b/pkg/database/mocks/DBTX.go @@ -0,0 +1,287 @@ +// Code generated by mockery v2.47.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + pgconn "github.com/jackc/pgx/v5/pgconn" + + pgx "github.com/jackc/pgx/v5" +) + +// DBTX is an autogenerated mock type for the DBTX type +type DBTX struct { + mock.Mock +} + +type DBTX_Expecter struct { + mock *mock.Mock +} + +func (_m *DBTX) EXPECT() *DBTX_Expecter { + return &DBTX_Expecter{mock: &_m.Mock} +} + +// Exec provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) Exec(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgconn.CommandTag, error) { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Exec") + } + + var r0 pgconn.CommandTag + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)); ok { + return rf(_a0, _a1, _a2...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgconn.CommandTag); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + r0 = ret.Get(0).(pgconn.CommandTag) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok { + r1 = rf(_a0, _a1, _a2...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DBTX_Exec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exec' +type DBTX_Exec_Call struct { + *mock.Call +} + +// Exec is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) Exec(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Exec_Call { + return &DBTX_Exec_Call{Call: _e.mock.On("Exec", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_Exec_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Exec_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_Exec_Call) Return(_a0 pgconn.CommandTag, _a1 error) *DBTX_Exec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DBTX_Exec_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)) *DBTX_Exec_Call { + _c.Call.Return(run) + return _c +} + +// Query provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) Query(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgx.Rows, error) { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Query") + } + + var r0 pgx.Rows + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgx.Rows, error)); ok { + return rf(_a0, _a1, _a2...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Rows); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.Rows) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok { + r1 = rf(_a0, _a1, _a2...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DBTX_Query_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Query' +type DBTX_Query_Call struct { + *mock.Call +} + +// Query is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) Query(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Query_Call { + return &DBTX_Query_Call{Call: _e.mock.On("Query", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_Query_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Query_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_Query_Call) Return(_a0 pgx.Rows, _a1 error) *DBTX_Query_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DBTX_Query_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgx.Rows, error)) *DBTX_Query_Call { + _c.Call.Return(run) + return _c +} + +// QueryRow provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) QueryRow(_a0 context.Context, _a1 string, _a2 ...interface{}) pgx.Row { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for QueryRow") + } + + var r0 pgx.Row + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Row); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.Row) + } + } + + return r0 +} + +// DBTX_QueryRow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRow' +type DBTX_QueryRow_Call struct { + *mock.Call +} + +// QueryRow is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) QueryRow(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_QueryRow_Call { + return &DBTX_QueryRow_Call{Call: _e.mock.On("QueryRow", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_QueryRow_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_QueryRow_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_QueryRow_Call) Return(_a0 pgx.Row) *DBTX_QueryRow_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DBTX_QueryRow_Call) RunAndReturn(run func(context.Context, string, ...interface{}) pgx.Row) *DBTX_QueryRow_Call { + _c.Call.Return(run) + return _c +} + +// SendBatch provides a mock function with given fields: _a0, _a1 +func (_m *DBTX) SendBatch(_a0 context.Context, _a1 *pgx.Batch) pgx.BatchResults { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for SendBatch") + } + + var r0 pgx.BatchResults + if rf, ok := ret.Get(0).(func(context.Context, *pgx.Batch) pgx.BatchResults); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.BatchResults) + } + } + + return r0 +} + +// DBTX_SendBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendBatch' +type DBTX_SendBatch_Call struct { + *mock.Call +} + +// SendBatch is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *pgx.Batch +func (_e *DBTX_Expecter) SendBatch(_a0 interface{}, _a1 interface{}) *DBTX_SendBatch_Call { + return &DBTX_SendBatch_Call{Call: _e.mock.On("SendBatch", _a0, _a1)} +} + +func (_c *DBTX_SendBatch_Call) Run(run func(_a0 context.Context, _a1 *pgx.Batch)) *DBTX_SendBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*pgx.Batch)) + }) + return _c +} + +func (_c *DBTX_SendBatch_Call) Return(_a0 pgx.BatchResults) *DBTX_SendBatch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DBTX_SendBatch_Call) RunAndReturn(run func(context.Context, *pgx.Batch) pgx.BatchResults) *DBTX_SendBatch_Call { + _c.Call.Return(run) + return _c +} + +// NewDBTX creates a new instance of DBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDBTX(t interface { + mock.TestingT + Cleanup(func()) +}) *DBTX { + mock := &DBTX{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 8f01f33..29c5fbe 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -12,6 +12,8 @@ import ( pgx "github.com/jackc/pgx/v5" + time "time" + uuid "github.com/google/uuid" ) @@ -227,6 +229,54 @@ func (_c *Store_BulkSetTalkgroupTags_Call) RunAndReturn(run func(context.Context return _c } +// CleanupSweptCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd +func (_m *Store) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error { + ret := _m.Called(ctx, rangeStart, rangeEnd) + + if len(ret) == 0 { + panic("no return value specified for CleanupSweptCalls") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok { + r0 = rf(ctx, rangeStart, rangeEnd) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_CleanupSweptCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupSweptCalls' +type Store_CleanupSweptCalls_Call struct { + *mock.Call +} + +// CleanupSweptCalls is a helper method to define mock.On call +// - ctx context.Context +// - rangeStart pgtype.Timestamptz +// - rangeEnd pgtype.Timestamptz +func (_e *Store_Expecter) CleanupSweptCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_CleanupSweptCalls_Call { + return &Store_CleanupSweptCalls_Call{Call: _e.mock.On("CleanupSweptCalls", ctx, rangeStart, rangeEnd)} +} + +func (_c *Store_CleanupSweptCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_CleanupSweptCalls_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz)) + }) + return _c +} + +func (_c *Store_CleanupSweptCalls_Call) Return(_a0 error) *Store_CleanupSweptCalls_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_CleanupSweptCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error) *Store_CleanupSweptCalls_Call { + _c.Call.Return(run) + return _c +} + // CreateAPIKey provides a mock function with given fields: ctx, owner, expires, disabled func (_m *Store) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (database.ApiKey, error) { ret := _m.Called(ctx, owner, expires, disabled) @@ -286,6 +336,56 @@ func (_c *Store_CreateAPIKey_Call) RunAndReturn(run func(context.Context, int, p return _c } +// CreatePartition provides a mock function with given fields: ctx, parentTable, partitionName, start, end +func (_m *Store) CreatePartition(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time) error { + ret := _m.Called(ctx, parentTable, partitionName, start, end) + + if len(ret) == 0 { + panic("no return value specified for CreatePartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, time.Time, time.Time) error); ok { + r0 = rf(ctx, parentTable, partitionName, start, end) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_CreatePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreatePartition' +type Store_CreatePartition_Call struct { + *mock.Call +} + +// CreatePartition is a helper method to define mock.On call +// - ctx context.Context +// - parentTable string +// - partitionName string +// - start time.Time +// - end time.Time +func (_e *Store_Expecter) CreatePartition(ctx interface{}, parentTable interface{}, partitionName interface{}, start interface{}, end interface{}) *Store_CreatePartition_Call { + return &Store_CreatePartition_Call{Call: _e.mock.On("CreatePartition", ctx, parentTable, partitionName, start, end)} +} + +func (_c *Store_CreatePartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time)) *Store_CreatePartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(time.Time), args[4].(time.Time)) + }) + return _c +} + +func (_c *Store_CreatePartition_Call) Return(_a0 error) *Store_CreatePartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, string, string, time.Time, time.Time) error) *Store_CreatePartition_Call { + _c.Call.Return(run) + return _c +} + // CreateUser provides a mock function with given fields: ctx, arg func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) { ret := _m.Called(ctx, arg) @@ -531,6 +631,100 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string) return _c } +// DetachPartition provides a mock function with given fields: ctx, partitionName +func (_m *Store) DetachPartition(ctx context.Context, partitionName string) error { + ret := _m.Called(ctx, partitionName) + + if len(ret) == 0 { + panic("no return value specified for DetachPartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, partitionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DetachPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetachPartition' +type Store_DetachPartition_Call struct { + *mock.Call +} + +// DetachPartition is a helper method to define mock.On call +// - ctx context.Context +// - partitionName string +func (_e *Store_Expecter) DetachPartition(ctx interface{}, partitionName interface{}) *Store_DetachPartition_Call { + return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, partitionName)} +} + +func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DetachPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DetachPartition_Call { + _c.Call.Return(run) + return _c +} + +// DropPartition provides a mock function with given fields: ctx, partitionName +func (_m *Store) DropPartition(ctx context.Context, partitionName string) error { + ret := _m.Called(ctx, partitionName) + + if len(ret) == 0 { + panic("no return value specified for DropPartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, partitionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartition' +type Store_DropPartition_Call struct { + *mock.Call +} + +// DropPartition is a helper method to define mock.On call +// - ctx context.Context +// - partitionName string +func (_e *Store_Expecter) DropPartition(ctx interface{}, partitionName interface{}) *Store_DropPartition_Call { + return &Store_DropPartition_Call{Call: _e.mock.On("DropPartition", ctx, partitionName)} +} + +func (_c *Store_DropPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DropPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Store_DropPartition_Call) Return(_a0 error) *Store_DropPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DropPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DropPartition_Call { + _c.Call.Return(run) + return _c +} + // GetAPIKey provides a mock function with given fields: ctx, apiKey func (_m *Store) GetAPIKey(ctx context.Context, apiKey string) (database.ApiKey, error) { ret := _m.Called(ctx, apiKey) @@ -588,65 +782,6 @@ func (_c *Store_GetAPIKey_Call) RunAndReturn(run func(context.Context, string) ( return _c } -// GetCallsPartitions provides a mock function with given fields: ctx, schemaName -func (_m *Store) GetCallsPartitions(ctx context.Context, schemaName *string) ([]*string, error) { - ret := _m.Called(ctx, schemaName) - - if len(ret) == 0 { - panic("no return value specified for GetCallsPartitions") - } - - var r0 []*string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *string) ([]*string, error)); ok { - return rf(ctx, schemaName) - } - if rf, ok := ret.Get(0).(func(context.Context, *string) []*string); ok { - r0 = rf(ctx, schemaName) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*string) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *string) error); ok { - r1 = rf(ctx, schemaName) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Store_GetCallsPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCallsPartitions' -type Store_GetCallsPartitions_Call struct { - *mock.Call -} - -// GetCallsPartitions is a helper method to define mock.On call -// - ctx context.Context -// - schemaName *string -func (_e *Store_Expecter) GetCallsPartitions(ctx interface{}, schemaName interface{}) *Store_GetCallsPartitions_Call { - return &Store_GetCallsPartitions_Call{Call: _e.mock.On("GetCallsPartitions", ctx, schemaName)} -} - -func (_c *Store_GetCallsPartitions_Call) Run(run func(ctx context.Context, schemaName *string)) *Store_GetCallsPartitions_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*string)) - }) - return _c -} - -func (_c *Store_GetCallsPartitions_Call) Return(_a0 []*string, _a1 error) *Store_GetCallsPartitions_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Store_GetCallsPartitions_Call) RunAndReturn(run func(context.Context, *string) ([]*string, error)) *Store_GetCallsPartitions_Call { - _c.Call.Return(run) - return _c -} - // GetDatabaseSize provides a mock function with given fields: ctx func (_m *Store) GetDatabaseSize(ctx context.Context) (string, error) { ret := _m.Called(ctx) @@ -703,118 +838,6 @@ func (_c *Store_GetDatabaseSize_Call) RunAndReturn(run func(context.Context) (st return _c } -// GetSchemaName provides a mock function with given fields: ctx -func (_m *Store) GetSchemaName(ctx context.Context) (string, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for GetSchemaName") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (string, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) string); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Store_GetSchemaName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSchemaName' -type Store_GetSchemaName_Call struct { - *mock.Call -} - -// GetSchemaName is a helper method to define mock.On call -// - ctx context.Context -func (_e *Store_Expecter) GetSchemaName(ctx interface{}) *Store_GetSchemaName_Call { - return &Store_GetSchemaName_Call{Call: _e.mock.On("GetSchemaName", ctx)} -} - -func (_c *Store_GetSchemaName_Call) Run(run func(ctx context.Context)) *Store_GetSchemaName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Store_GetSchemaName_Call) Return(_a0 string, _a1 error) *Store_GetSchemaName_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Store_GetSchemaName_Call) RunAndReturn(run func(context.Context) (string, error)) *Store_GetSchemaName_Call { - _c.Call.Return(run) - return _c -} - -// GetSearchPath provides a mock function with given fields: ctx -func (_m *Store) GetSearchPath(ctx context.Context) (string, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for GetSearchPath") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (string, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) string); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Store_GetSearchPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSearchPath' -type Store_GetSearchPath_Call struct { - *mock.Call -} - -// GetSearchPath is a helper method to define mock.On call -// - ctx context.Context -func (_e *Store_Expecter) GetSearchPath(ctx interface{}) *Store_GetSearchPath_Call { - return &Store_GetSearchPath_Call{Call: _e.mock.On("GetSearchPath", ctx)} -} - -func (_c *Store_GetSearchPath_Call) Run(run func(ctx context.Context)) *Store_GetSearchPath_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *Store_GetSearchPath_Call) Return(_a0 string, _a1 error) *Store_GetSearchPath_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *Store_GetSearchPath_Call) RunAndReturn(run func(context.Context) (string, error)) *Store_GetSearchPath_Call { - _c.Call.Return(run) - return _c -} - // GetSystemName provides a mock function with given fields: ctx, systemID func (_m *Store) GetSystemName(ctx context.Context, systemID int) (string, error) { ret := _m.Called(ctx, systemID) @@ -872,6 +895,66 @@ func (_c *Store_GetSystemName_Call) RunAndReturn(run func(context.Context, int) return _c } +// GetTablePartitions provides a mock function with given fields: ctx, schemaName, tableName +func (_m *Store) GetTablePartitions(ctx context.Context, schemaName string, tableName string) ([]string, error) { + ret := _m.Called(ctx, schemaName, tableName) + + if len(ret) == 0 { + panic("no return value specified for GetTablePartitions") + } + + var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) ([]string, error)); ok { + return rf(ctx, schemaName, tableName) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) []string); ok { + r0 = rf(ctx, schemaName, tableName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, schemaName, tableName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_GetTablePartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTablePartitions' +type Store_GetTablePartitions_Call struct { + *mock.Call +} + +// GetTablePartitions is a helper method to define mock.On call +// - ctx context.Context +// - schemaName string +// - tableName string +func (_e *Store_Expecter) GetTablePartitions(ctx interface{}, schemaName interface{}, tableName interface{}) *Store_GetTablePartitions_Call { + return &Store_GetTablePartitions_Call{Call: _e.mock.On("GetTablePartitions", ctx, schemaName, tableName)} +} + +func (_c *Store_GetTablePartitions_Call) Run(run func(ctx context.Context, schemaName string, tableName string)) *Store_GetTablePartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *Store_GetTablePartitions_Call) Return(_a0 []string, _a1 error) *Store_GetTablePartitions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_GetTablePartitions_Call) RunAndReturn(run func(context.Context, string, string) ([]string, error)) *Store_GetTablePartitions_Call { + _c.Call.Return(run) + return _c +} + // GetTalkgroup provides a mock function with given fields: ctx, systemID, tGID func (_m *Store) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (database.GetTalkgroupRow, error) { ret := _m.Called(ctx, systemID, tGID) @@ -2176,6 +2259,54 @@ func (_c *Store_StoreTGVersion_Call) RunAndReturn(run func(context.Context, []da return _c } +// SweepCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd +func (_m *Store) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error { + ret := _m.Called(ctx, rangeStart, rangeEnd) + + if len(ret) == 0 { + panic("no return value specified for SweepCalls") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok { + r0 = rf(ctx, rangeStart, rangeEnd) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_SweepCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SweepCalls' +type Store_SweepCalls_Call struct { + *mock.Call +} + +// SweepCalls is a helper method to define mock.On call +// - ctx context.Context +// - rangeStart pgtype.Timestamptz +// - rangeEnd pgtype.Timestamptz +func (_e *Store_Expecter) SweepCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_SweepCalls_Call { + return &Store_SweepCalls_Call{Call: _e.mock.On("SweepCalls", ctx, rangeStart, rangeEnd)} +} + +func (_c *Store_SweepCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_SweepCalls_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz)) + }) + return _c +} + +func (_c *Store_SweepCalls_Call) Return(_a0 error) *Store_SweepCalls_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_SweepCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error) *Store_SweepCalls_Call { + _c.Call.Return(run) + return _c +} + // UpdatePassword provides a mock function with given fields: ctx, username, password func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error { ret := _m.Called(ctx, username, password) diff --git a/pkg/database/models.go b/pkg/database/models.go index e4e0caf..b76cbb8 100644 --- a/pkg/database/models.go +++ b/pkg/database/models.go @@ -55,7 +55,91 @@ type Call struct { Transcript *string `json:"transcript,omitempty"` } -type CallsUnpart struct { +type CallsP202407 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202408 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202409 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202410 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202411 struct { ID uuid.UUID `json:"id,omitempty"` Submitter *int32 `json:"submitter,omitempty"` System int `json:"system,omitempty"` @@ -87,9 +171,12 @@ type Incident struct { } type IncidentsCall struct { - IncidentID uuid.UUID `json:"incident_id,omitempty"` - CallID uuid.UUID `json:"call_id,omitempty"` - Notes []byte `json:"notes,omitempty"` + IncidentID uuid.UUID `json:"incident_id,omitempty"` + CallID uuid.UUID `json:"call_id,omitempty"` + CallsTblID pgtype.UUID `json:"calls_tbl_id,omitempty"` + SweptCallID pgtype.UUID `json:"swept_call_id,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + Notes []byte `json:"notes,omitempty"` } type Setting struct { @@ -98,6 +185,27 @@ type Setting struct { Value []byte `json:"value,omitempty"` } +type SweptCall struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + type System struct { ID int `json:"id,omitempty"` Name string `json:"name,omitempty"` diff --git a/pkg/database/partitions.go b/pkg/database/partitions.go new file mode 100644 index 0000000..52bf9c1 --- /dev/null +++ b/pkg/database/partitions.go @@ -0,0 +1,87 @@ +package database + +import ( + "context" + "time" +) + +type partitionsQuerier interface { + GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]string, error) + CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error + DetachPartition(ctx context.Context, partitionName string) error + DropPartition(ctx context.Context, partitionName string) error +} + +func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]string, error) { + const query = `SELECT child.relname + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace + JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace + WHERE + nmsp_parent.nspname = nmsp_child.nspname -- parent and child schemas are the same + AND nmsp_child.nspname = $2 -- schemaName + AND parent.relname = $1;` // tableName + rows, err := q.db.Query(ctx, query, schemaName, tableName) + if err != nil { + return nil, err + } + + defer rows.Close() + + var partitions []string + for rows.Next() { + var partitionName string + if err := rows.Scan(&partitionName); err != nil { + return nil, err + } + + partitions = append(partitions, partitionName) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return partitions, nil +} + +func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error { + _, err := q.db.Exec(ctx, `CREATE TABLE $1 PARTITION OF $2 FOR VALUES FROM ($2) TO ($3);`, partitionName, parentTable, start, end) + + return err +} + +func (q *Queries) DropPartition(ctx context.Context, partitionName string) error { + _, err := q.db.Exec(ctx, `DROP TABLE $1;`, partitionName) + + return err +} + +func (q *Queries) DetachPartition(ctx context.Context, partitionName string) error { + _, err := q.db.Exec(ctx, `ALTER TABLE $1 DETACH PARTITION;`, partitionName) + return err +} + +/* +func (q *Queries) SweepCalls(ctx context.context, start, end time.Time) { +const -- name: SweepCalls :exec +BEGIN; +WITH to_sweep AS ( + SELECT * FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) INSERT INTO swept_calls SELECT * FROM todelete; + +WITH to_sweep AS ( + SELECT id FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) UPDATE incidents_calls + SET + swept_call_id = call_id, + calls_tbl_id = NULL + WHERE call_id IN (SELECT id FROM to_sweep); +COMMIT;} +*/ diff --git a/pkg/database/partman/partman.go b/pkg/database/partman/partman.go index e1b092d..025a634 100644 --- a/pkg/database/partman/partman.go +++ b/pkg/database/partman/partman.go @@ -1,8 +1,8 @@ package partman import ( - "fmt" "context" + "fmt" "strconv" "strings" "time" @@ -11,9 +11,25 @@ import ( "dynatron.me/x/stillbox/pkg/database" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog/log" ) +const ( + callsTable = "calls" + + preProvisionDefault = 1 +) + +/* + * Partition scheme names: + * daily: calls_p_2024_11_28 + * weekly: calls_p_2024_w48 + * monthly: calls_p_2024_11 + * quarterly: calls_p_2024_q4 + * yearly: calls_p_2024 + */ + type PartitionError string func (pe PartitionError) Error() string { @@ -21,15 +37,19 @@ func (pe PartitionError) Error() string { } type ErrInvalidInterval string -func (e ErrInvalidInterval) Error() string { return fmt.Sprintf("invalid interval '%s'", string(e)) } + +func (in ErrInvalidInterval) Error() string { + return fmt.Sprintf("invalid interval '%s'", string(in)) +} type Interval string + const ( - Daily Interval = "daily" - Weekly Interval = "weekly" - Monthly Interval = "monthly" + Daily Interval = "daily" + Weekly Interval = "weekly" + Monthly Interval = "monthly" Quarterly Interval = "quarterly" - Yearly Interval = "yearly" + Yearly Interval = "yearly" ) func (p Interval) IsValid() bool { @@ -41,59 +61,67 @@ func (p Interval) IsValid() bool { return false } - - type PartitionManager interface { Go(ctx context.Context) - Check(ctx context.Context) error + Check(ctx context.Context, now time.Time) error } type partman struct { - db *database.Postgres - cfg config.Partition + db database.Store + cfg config.Partition intv Interval } - type partition interface { PartitionName() string - Next() partition - Range() (string, string) + Next(i int) partition + Prev(i int) partition + Range() (time.Time, time.Time) } type monthlyPartition struct { - year int - month time.Month + t time.Time } func (d monthlyPartition) PartitionName() string { - return fmt.Sprintf("calls_p_%d_%02d", d.year, d.month) + return fmt.Sprintf("calls_p_%d_%02d", d.t.Year(), d.t.Month()) } -func (d monthlyPartition) Next() partition { - return d.next() +func (d monthlyPartition) Next(i int) partition { + return d.next(i) } -func (d monthlyPartition) next() monthlyPartition { - if d.month == time.December { - d.year++ - d.month = time.January - } else { - d.month++ +func (d monthlyPartition) Prev(i int) partition { + return d.prev(i) +} + +func (d monthlyPartition) Range() (start, end time.Time) { + start = time.Date(d.t.Year(), d.t.Month(), 1, 0, 0, 0, 0, time.UTC) + end = start.AddDate(0, 1, 0) + + return +} + +func (d monthlyPartition) next(i int) monthlyPartition { + year, month, _ := d.t.Date() + + return monthlyPartition{ + t: time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, time.UTC), } - - return d } -func (d monthlyPartition) Range() (string, string) { - next := d.next() - return fmt.Sprintf("%d-%02d-01", d.year, d.month), fmt.Sprintf("%d-%02d-01", next.year, next.month) +func (d monthlyPartition) prev(i int) monthlyPartition { + year, month, _ := d.t.Date() + + return monthlyPartition{ + t: time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, d.t.Location()), + } } -func NewPartman(db *database.Postgres, cfg config.Partition) (*partman, error) { +func New(db database.Store, cfg config.Partition) (*partman, error) { pm := &partman{ - cfg: cfg, - db: db, + cfg: cfg, + db: db, intv: Interval(cfg.Interval), } @@ -108,11 +136,11 @@ var _ PartitionManager = (*partman)(nil) func (pm *partman) Go(ctx context.Context) { go func(ctx context.Context) { - tick := time.NewTicker(60*time.Minute) + tick := time.NewTicker(60 * time.Minute) select { - case <-tick.C: - err := pm.Check(ctx) + case now := <-tick.C: + err := pm.Check(ctx, now) if err != nil { log.Error().Err(err).Msg("partman check failed") } @@ -123,84 +151,185 @@ func (pm *partman) Go(ctx context.Context) { } func (pm *partman) newPartition(t time.Time) partition { - return monthlyPartition{month: t.Month(), year: t.Year()} + switch pm.intv { + case Monthly: + return monthlyPartition{t} + } + + return nil } -func (pm *partman) Check(ctx context.Context) error { - err := pm.db.InTx(ctx, func(db database.Store) error { +func (pm *partman) retentionPartitions(cur partition) []partition { + partitions := make([]partition, 0, pm.cfg.Retain) + for i := 1; i <= pm.cfg.Retain; i++ { + prev := cur.Prev(i) + partitions = append(partitions, prev) + } + + return partitions +} + +func (pm *partman) futurePartitions(cur partition) []partition { + preProv := preProvisionDefault + if pm.cfg.PreProvision != nil { + preProv = *pm.cfg.PreProvision + } + + partitions := make([]partition, 0, pm.cfg.Retain) + for i := 1; i <= preProv; i++ { + next := cur.Next(i) + partitions = append(partitions, next) + } + + return partitions +} + +func (pm *partman) expectedPartitions(now time.Time) []partition { + curPart := pm.newPartition(now) + + retain := pm.retentionPartitions(curPart) + + future := pm.futurePartitions(curPart) + + shouldExist := append(retain, curPart) + shouldExist = append(shouldExist, future...) + + return shouldExist +} + +func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) { + existing := make(map[string]partition) + expectedAndExists := make(map[string]bool) + + for _, t := range existingTables { + existing[t.PartitionName()] = t + } + + for _, t := range expectedTables { + if _, found := existing[t.PartitionName()]; found { + expectedAndExists[t.PartitionName()] = true + } else { + missingTables = append(missingTables, t) + } + } + + for _, t := range existingTables { + if _, found := expectedAndExists[t.PartitionName()]; !found { + // Only in existingTables and not in both + unexpectedTables = append(unexpectedTables, t) + } + } + + return unexpectedTables, missingTables +} + +func (pm *partman) existingPartitions(parts []string) ([]partition, error) { + existing := make([]partition, 0, len(parts)) + for _, v := range parts { + p, err := parsePartName(v) + if err != nil { + return nil, err + } + + existing = append(existing, p) + } + return existing, nil +} + +func (pm *partman) fullTableName(s string) string { + return fmt.Sprintf("%s.%s", pm.cfg.Schema, s) +} + +func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error { + s, e := p.Range() + start := pgtype.Timestamptz{Time: s, Valid: true} + end := pgtype.Timestamptz{Time: e, Valid: true} + err := tx.SweepCalls(ctx, start, end) + if err != nil { + return err + } + + err = tx.CleanupSweptCalls(ctx, start, end) + if err != nil { + return err + } + + err = tx.DetachPartition(ctx, pm.fullTableName(p.PartitionName())) + if err != nil { + return err + } + + if pm.cfg.Drop { + return tx.DropPartition(ctx, pm.fullTableName(p.PartitionName())) + } + + return nil +} + +func (pm *partman) Check(ctx context.Context, now time.Time) error { + return pm.db.InTx(ctx, func(db database.Store) error { // by default, we want to make sure a partition exists for this and next month - // since we run this at startup, it's safe to do only that. - schemaName, err := db.GetSearchPath(ctx) + // since we run this at startup, it's safe to do only that. + partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable) if err != nil { return err } - partitions, err := db.GetCallsPartitions(ctx, &schemaName) + existing, err := pm.existingPartitions(partitions) if err != nil { return err } - // this could be done in SQL - partsMap, err := partitionsMap(partitions) - if err != nil { - return err + expected := pm.expectedPartitions(now) + + unexpected, missing := pm.comparePartitions(existing, expected) + + for _, p := range unexpected { + err := pm.prunePartition(ctx, db, p) + if err != nil { + return err + } } - now := time.Now() - - thisPart := pm.newPartition(now) - nextPart := thisPart.Next() - - mustExist := []partition{thisPart, nextPart} - - for _, ch := range mustExist { - if !partsMap.exists(ch) { - - err := pm.createPartition(ctx, db, ch) - if err != nil { - return err - } - - log.Info().Str("part", ch.PartitionName()).Msg("created partition") + for _, p := range missing { + err := pm.createPartition(ctx, db, p) + if err != nil { + return err } } return nil }, pgx.TxOptions{}) - - if err != nil { - return err - } - - return nil } -func parsePartName(p *string) (monthlyPartition, error) { - r := monthlyPartition{} - dateAr := strings.Split(*p, "calls_p_") +func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error { + start, end := part.Range() + return tx.CreatePartition(ctx, callsTable, part.PartitionName(), start, end) +} + +func parsePartName(p string) (partition, error) { + dateAr := strings.Split(p, "calls_p_") if len(dateAr) != 2 { - return r, PartitionError(*p) + return nil, PartitionError(p) } dateAr = strings.Split(dateAr[1], "_") if len(dateAr) != 2 { - return r, PartitionError(*p) + return nil, PartitionError(p) } year, err := strconv.Atoi(dateAr[0]) if err != nil { - return r, PartitionError(*p) + return nil, PartitionError(p) } - r.year = year - month, err := strconv.Atoi(dateAr[1]) if err != nil { - return r, PartitionError(*p) + return nil, PartitionError(p) } - r.month = time.Month(month) + r := monthlyPartition{time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC)} return r, nil } @@ -208,19 +337,15 @@ func parsePartName(p *string) (monthlyPartition, error) { type partMap map[partition]struct{} func (pm partMap) exists(dt partition) bool { - _ , ex := pm[dt] + _, ex := pm[dt] return ex } -func partitionsMap(partitions []*string) (partMap, error) { +func partitionsMap(partitions []string, mustExist map[partition]struct{}) (partMap, error) { partsDate := make(partMap, len(partitions)) for _, p := range partitions { - if p == nil { - panic(PartitionError("")) - } - dt, err := parsePartName(p) if err != nil { return nil, err @@ -229,21 +354,16 @@ func partitionsMap(partitions []*string) (partMap, error) { partsDate[dt] = struct{}{} } - return partsDate, nil } -func (pm *partman) createPartition(ctx context.Context, db database.Store, d partition) error { - t, n := d.Range() - - _, err := db.DBTX().Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF calls FOR VALUES FROM ($1) TO ($2);`, d.PartitionName()), t, n) - if err != nil { - return err - } - - return nil +func runRetention() { + // make sure to check if partition was attached first + // before dropping. don't want to accidentally drop pre-detached partitions. } -func monthPart(m time.Month, y int) monthlyPartition { - return monthlyPartition{year: y, month: m} +func dropPart() { + // intx + // SweepCalls + // DropPart } diff --git a/pkg/database/partman/partman_test.go b/pkg/database/partman/partman_test.go new file mode 100644 index 0000000..984bc04 --- /dev/null +++ b/pkg/database/partman/partman_test.go @@ -0,0 +1,148 @@ +package partman_test + +import ( + "context" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/common" + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/database/mocks" + "dynatron.me/x/stillbox/pkg/database/partman" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var mctx = mock.Anything + +func inTx(s *mocks.Store, dbtx *mocks.DBTX) { + s.EXPECT().InTx(mctx, mock.AnythingOfType("func(database.Store) error"), mock.AnythingOfType("pgx.TxOptions")).RunAndReturn(func(ctx context.Context, f func(db database.Store) error, po pgx.TxOptions) error { + return f(s) + }) + +} + +type timeRange struct { + start time.Time + end time.Time +} + +func TestPartman(t *testing.T) { + ctx := context.Background() + + timeInUTC := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02 15:04:05", s, time.UTC) + if err != nil { + panic(err) + } + + return t + } + + dateInUTC := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02", s, time.UTC) + if err != nil { + panic(err) + } + + return t + } + + tests := []struct { + name string + now time.Time + cfg config.Partition + extant []string + expectCreate []string + expectDrop []string + expectDetach []string + expectSweep []timeRange + expectCleanup []timeRange + }{ + { + name: "monthly base", + now: timeInUTC("2024-11-28 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "monthly", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []string{ + "calls_p_2024_10", + "calls_p_2024_09", + "calls_p_2024_08", + "calls_p_2024_07", + }, + expectCreate: []string{ + "calls_p_2024_11", + "calls_p_2024_12", + "calls_p_2025_01", + }, + expectDrop: []string{ + "public.calls_p_2024_07", + "public.calls_p_2024_08", + }, + expectSweep: []timeRange{ + timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")}, + timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")}, + }, + expectCleanup: []timeRange{ + timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")}, + timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")}, + }, + expectDetach: []string{ + "public.calls_p_2024_07", + "public.calls_p_2024_08", + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + db := mocks.NewStore(t) + dbtx := mocks.NewDBTX(t) + createdPartitions := make([]string, 0, len(tc.expectCreate)) + sweptRanges := make([]timeRange, 0, len(tc.expectSweep)) + droppedPartitions := make([]string, 0, len(tc.expectDrop)) + cleanupRanges := make([]timeRange, 0, len(tc.expectCleanup)) + detachedPartitions := make([]string, 0, len(tc.expectDetach)) + + db.EXPECT().CreatePartition(mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).Run(func(ctx context.Context, tableName, partitionName string, start, end time.Time) { + createdPartitions = append(createdPartitions, partitionName) + }).Return(nil) + db.EXPECT().SweepCalls(mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz")).Run(func(ctx context.Context, start, end pgtype.Timestamptz) { + sweptRanges = append(sweptRanges, timeRange{start: start.Time, end: end.Time}) + }).Return(nil) + db.EXPECT().CleanupSweptCalls(mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz")).Run(func(ctx context.Context, start, end pgtype.Timestamptz) { + cleanupRanges = append(cleanupRanges, timeRange{start: start.Time, end: end.Time}) + }).Return(nil) + db.EXPECT().DropPartition(mctx, mock.AnythingOfType("string")).Run(func(ctx context.Context, partName string) { + droppedPartitions = append(droppedPartitions, partName) + }).Return(nil) + db.EXPECT().DetachPartition(mctx, mock.AnythingOfType("string")).Run(func(ctx context.Context, partName string) { + detachedPartitions = append(detachedPartitions, partName) + }).Return(nil) + inTx(db, dbtx) + db.EXPECT().GetTablePartitions(mctx, "public", "calls").Return(tc.extant, nil) + pm, err := partman.New(db, tc.cfg) + require.NoError(t, err) + + err = pm.Check(ctx, tc.now) + require.NoError(t, err) + + assert.ElementsMatch(t, tc.expectCreate, createdPartitions) + assert.ElementsMatch(t, tc.expectSweep, sweptRanges) + assert.ElementsMatch(t, tc.expectDrop, droppedPartitions) + assert.ElementsMatch(t, tc.expectCleanup, cleanupRanges) + assert.ElementsMatch(t, tc.expectDetach, detachedPartitions) + }) + } +} diff --git a/pkg/database/querier.go b/pkg/database/querier.go index 911cc22..e9425ca 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -15,12 +15,12 @@ type Querier interface { AddAlert(ctx context.Context, arg AddAlertParams) error AddCall(ctx context.Context, arg AddCallParams) error AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error) + CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error) DeleteAPIKey(ctx context.Context, apiKey string) error DeleteUser(ctx context.Context, username string) error GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) - GetCallsPartitions(ctx context.Context, schemaName *string) ([]*string, error) GetDatabaseSize(ctx context.Context) (string, error) GetSystemName(ctx context.Context, systemID int) (string, error) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (GetTalkgroupRow, error) @@ -43,6 +43,7 @@ type Querier interface { SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults + SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) error UpdatePassword(ctx context.Context, username string, password string) error UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults diff --git a/sql/postgres/migrations/002_partition.up.sql b/sql/postgres/migrations/002_partition.up.sql index 7ae6d80..3ac1dc2 100644 --- a/sql/postgres/migrations/002_partition.up.sql +++ b/sql/postgres/migrations/002_partition.up.sql @@ -25,4 +25,56 @@ CREATE TABLE calls ( FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) ) PARTITION BY RANGE (call_date); +-- for our prod +create table calls_p_2024_07 partition of calls for values from ('2024-07-01') to ('2024-08-01'); +create table calls_p_2024_08 partition of calls for values from ('2024-08-01') to ('2024-09-01'); +create table calls_p_2024_09 partition of calls for values from ('2024-09-01') to ('2024-10-01'); +create table calls_p_2024_10 partition of calls for values from ('2024-10-01') to ('2024-11-01'); +create table calls_p_2024_11 partition of calls for values from ('2024-11-01') to ('2024-12-01'); + + +insert into calls (id, submitter, system, talkgroup, call_date, audio_name, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration) select id, submitter, system, talkgroup, call_date, audio_name, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration from calls_unpart; + +drop table calls_unpart cascade; + +CREATE TABLE swept_calls ( + id UUID PRIMARY KEY, + submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, + system INTEGER NOT NULL, + talkgroup INTEGER NOT NULL, + call_date TIMESTAMPTZ NOT NULL, + audio_name TEXT, + audio_blob BYTEA, + duration INTEGER, + audio_type TEXT, + audio_url TEXT, + frequency INTEGER NOT NULL, + frequencies INTEGER[], + patches INTEGER[], + tg_label TEXT, + tg_alpha_tag TEXT, + tg_group TEXT, + source INTEGER NOT NULL, + transcript TEXT, + FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) +); + +DROP TABLE IF EXISTS incidents_calls; -- DATA LOSS + +CREATE TABLE IF NOT EXISTS incidents_calls( + incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, + call_id UUID NOT NULL, + calls_tbl_id UUID NULL, + swept_call_id UUID NULL REFERENCES swept_calls(id), + call_date TIMESTAMPTZ NULL, + notes JSONB, + FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date), + PRIMARY KEY (incident_id, call_id) +); + + +-- ALTER TABLE incidents_calls ADD COLUMN call_date TIMESTAMPTZ NOT NULL; +-- ALTER TABLE incidents_calls DROP CONSTRAINT incidents_calls_call_id_fkey; +-- ALTER TABLE incidents_calls ADD CONSTRAINT incidents_calls_call_id_call_date_fkey FOREIGN KEY (call_id, call_date) REFERENCES calls(id, call_date) ON UPDATE CASCADE; + COMMIT; diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index a78ce12..3955614 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -57,6 +57,20 @@ VALUES -- name: GetDatabaseSize :one SELECT pg_size_pretty(pg_database_size(current_database())); --- name: GetCallsPartitions :many -SELECT tablename FROM pg_catalog.pg_tables -WHERE schemaname = @schema_name AND tablename LIKE 'calls\_p\_____\___'; +-- name: SweepCalls :exec +WITH to_sweep AS ( + SELECT * FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) INSERT INTO swept_calls SELECT * FROM to_sweep; + +-- name: CleanupSweptCalls :exec +WITH to_sweep AS ( + SELECT id FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) UPDATE incidents_calls + SET + swept_call_id = call_id, + calls_tbl_id = NULL + WHERE call_id IN (SELECT id FROM to_sweep); diff --git a/sql/sqlc.yaml b/sql/sqlc.yaml index c233814..0b51630 100644 --- a/sql/sqlc.yaml +++ b/sql/sqlc.yaml @@ -37,3 +37,6 @@ sql: import: "dynatron.me/x/stillbox/internal/jsontypes" type: "Metadata" nullable: true + - column: "pg_catalog.pg_tables.tablename" + go_type: string + nullable: false