From 03ebf74abe6778c9f36be442dae557d70c435eee Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Sun, 1 Dec 2024 03:01:09 -0500 Subject: [PATCH] Partitioning (#60) Closes #13 Reviewed-on: https://git.dynatron.me/amigan/stillbox/pulls/60 Co-authored-by: Daniel Ponte Co-committed-by: Daniel Ponte --- .mockery.yaml | 1 + config.sample.yaml | 13 + internal/isoweek/LICENSE | 22 + internal/isoweek/isoweek.go | 93 +++++ internal/isoweek/isoweek_test.go | 96 +++++ internal/isoweek/julian.go | 32 ++ internal/isoweek/julian_test.go | 52 +++ pkg/config/config.go | 14 +- pkg/config/parse.go | 5 +- pkg/database/calls.sql.go | 37 ++ pkg/database/database.go | 24 +- pkg/database/mocks/DBTX.go | 287 +++++++++++++ pkg/database/mocks/Store.go | 382 +++++++++++++++++- pkg/database/models.go | 135 ++++++- pkg/database/partitions.go | 160 ++++++++ pkg/database/partman/intervals.go | 152 +++++++ pkg/database/partman/partman.go | 449 +++++++++++++++++++++ pkg/database/partman/partman_test.go | 431 ++++++++++++++++++++ pkg/database/querier.go | 2 + pkg/notify/notify.go | 2 +- pkg/rest/api.go | 2 +- pkg/server/server.go | 18 + pkg/sinks/sinks.go | 2 +- pkg/talkgroups/tgstore/store.go | 2 +- sql/postgres/migrations/001_initial.up.sql | 43 +- sql/postgres/queries/calls.sql | 20 + sql/sqlc.yaml | 3 + 27 files changed, 2450 insertions(+), 29 deletions(-) create mode 100644 internal/isoweek/LICENSE create mode 100644 internal/isoweek/isoweek.go create mode 100644 internal/isoweek/isoweek_test.go create mode 100644 internal/isoweek/julian.go create mode 100644 internal/isoweek/julian_test.go create mode 100644 pkg/database/mocks/DBTX.go create mode 100644 pkg/database/partitions.go create mode 100644 pkg/database/partman/intervals.go create mode 100644 pkg/database/partman/partman.go create mode 100644 pkg/database/partman/partman_test.go diff --git a/.mockery.yaml b/.mockery.yaml index fa78e2c..a0b5082 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -8,3 +8,4 @@ packages: config: interfaces: Store: + DBTX: diff --git a/config.sample.yaml b/config.sample.yaml index 4e66425..500cb1c 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -1,5 +1,18 @@ db: connect: 'postgres://postgres:password@localhost:5432/example' + partition: + # whether to enable the built-in partition manager + enabled: true + # the postgres schema containing our tables + schema: public + # daily|weekly|monthly|quarterly|yearly + interval: monthly + # number of partitions to retain, -1 to keep all + retain: 3 + # whether to drop or simply detach + drop: true + # number of partitions to prepare ahead + preProvision: 3 cors: allowedOrigins: - 'http://localhost:*' diff --git a/internal/isoweek/LICENSE b/internal/isoweek/LICENSE new file mode 100644 index 0000000..31d0b01 --- /dev/null +++ b/internal/isoweek/LICENSE @@ -0,0 +1,22 @@ +License for github.com/snabb/isoweek: + +Copyright © 2016-2023 Janne Snabb snabb AT epipe.com + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/internal/isoweek/isoweek.go b/internal/isoweek/isoweek.go new file mode 100644 index 0000000..c65d196 --- /dev/null +++ b/internal/isoweek/isoweek.go @@ -0,0 +1,93 @@ +// Package isoweek calculates a starting date and time of [ISO 8601] week. +// +// ISO 8601 standard defines the common [week number] system used in Europe +// and many other countries. Monday is the first day of a week. +// +// The Go standard library [time] package has [time.Time.ISOWeek] function +// for getting ISO 8601 week number of a given [time.Time], but there is no +// reverse functionality for getting a date from a week number. This package +// implements that. +// +// Invalid input is silently accepted. There is a separate [Validate] +// function if week number validation is needed. +// +// There are also functions for working with [Julian day numbers]. Using Julian +// day numbers is often the easiest and fastest way to do date calculations. +// +// This package does not work with the "traditional" week system used in +// US/Canada/Japan/etc. (weeks starting on Sundays). However the Julian day +// number functions may be still useful. +// +// [ISO 8601]: https://en.wikipedia.org/wiki/ISO_8601 +// [week number]: https://en.wikipedia.org/wiki/ISO_week_date +// [Julian day numbers]: https://en.wikipedia.org/wiki/Julian_day +package isoweek + +import "time" + +// ISOWeekday returns the ISO 8601 weekday number of given day. +// (1 = Mon, 2 = Tue,.. 7 = Sun) +// +// This is different from Go's standard [time.Weekday]. +func ISOWeekday(year int, month time.Month, day int) (weekday int) { + // Richards, E. G. (2013) pp. 592, 618 + + return DateToJulian(year, month, day)%7 + 1 +} + +// startOffset returns the offset (in days) from the start of a year to +// Monday of the given week. Offset may be negative. +func startOffset(y, week int) (offset int) { + // This is optimized version of the following: + // + // return week*7 - ISOWeekday(y, 1, 4) - 3 + // + // Uses Tomohiko Sakamoto's algorithm for calculating the weekday. + + y = y - 1 + return week*7 - (y+y/4-y/100+y/400+3)%7 - 4 +} + +// StartTime returns the starting time (Monday 00:00) of the given +// ISO 8601 week. +func StartTime(wyear, week int, loc *time.Location) (start time.Time) { + y, m, d := StartDate(wyear, week) + return time.Date(y, m, d, 0, 0, 0, 0, loc) +} + +// StartDate returns the starting date (Monday) of the given ISO 8601 week. +func StartDate(wyear, week int) (year int, month time.Month, day int) { + return JulianToDate( + DateToJulian(wyear, 1, 1) + startOffset(wyear, week)) +} + +// ordinalInYear returns the ordinal (within a year) day number. +func ordinalInYear(year int, month time.Month, day int) (dayNo int) { + return DateToJulian(year, month, day) - DateToJulian(year, 1, 1) + 1 +} + +// FromDate returns ISO 8601 week number of a date. +func FromDate(year int, month time.Month, day int) (wyear, week int) { + week = (ordinalInYear(year, month, day) - ISOWeekday(year, month, day) + 10) / 7 + if week < 1 { + return FromDate(year-1, 12, 31) // last week of preceding year + } + if week == 53 && + DateToJulian(StartDate(year+1, 1)) <= DateToJulian(year, month, day) { + return year + 1, 1 // first week of following year + } + return year, week +} + +// Validate checks if a week number is valid. Returns true if it is valid. +func Validate(wyear, week int) (ok bool) { + if week < 1 || week > 53 { + return false + } + wyear2, week2 := FromDate(StartDate(wyear, week)) + + if wyear == wyear2 && week == week2 { + return true + } + return false +} diff --git a/internal/isoweek/isoweek_test.go b/internal/isoweek/isoweek_test.go new file mode 100644 index 0000000..907b237 --- /dev/null +++ b/internal/isoweek/isoweek_test.go @@ -0,0 +1,96 @@ +package isoweek_test + +import ( + "fmt" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/isoweek" +) + +// TestISOWeekday tests all days from year 1 until year 4000. +// Ensures that behaviour matches the Go standard library Weekday. +func TestISOWeekday(test *testing.T) { + t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + var wd1, wd2 int + for t.Year() < 4000 { + wd1 = int(t.Weekday()) + wd2 = isoweek.ISOWeekday(t.Date()) + + if wd2 == 7 { + wd2 = 0 + } + if wd1 != wd2 { + test.Errorf("mismatch on %s", t.Format("2006-01-02")) + } + t = t.AddDate(0, 0, 1) + } +} + +func ExampleISOWeekday() { + fmt.Println(isoweek.ISOWeekday(1984, 1, 1)) + // Output: 7 +} + +// TestStartTime tests all weeks from year 1 until year 4000. +// Ensures that behaviour matches the Go standard library ISOWeek. +func TestStartTime(test *testing.T) { + t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + for t.Weekday() != time.Monday { + t = t.AddDate(0, 0, 1) + } + for t.Year() < 4000 { + wy, ww := t.ISOWeek() + wst := isoweek.StartTime(wy, ww, time.UTC) + if !wst.Equal(t) { + test.Errorf("mismatch: %v != %v (wy = %d, ww = %d)", + t, wst, wy, ww) + } + t = t.AddDate(0, 0, 7) + } +} + +func ExampleStartTime() { + t := isoweek.StartTime(1985, 1, time.UTC) + fmt.Println(t) + // Output: 1984-12-31 00:00:00 +0000 UTC +} + +func ExampleStartDate() { + y, m, d := isoweek.StartDate(2000, 1) + fmt.Println(d, m, y) + // Output: 3 January 2000 +} + +// TestFromDate tests all days from year 1 until year 4000. +// Ensures that behaviour matches the Go standard library ISOWeek. +func TestFromDate(test *testing.T) { + t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + for t.Year() < 4000 { + wy, ww := t.ISOWeek() + wy2, ww2 := isoweek.FromDate(t.Date()) + if wy != wy2 || ww != ww2 { + test.Errorf("mismatch on %s", t.Format("2006-01-02")) + } + t = t.AddDate(0, 0, 1) + } +} + +func ExampleFromDate() { + fmt.Println(isoweek.FromDate(1984, 1, 1)) + // Output: 1983 52 +} + +func ExampleValidate() { + fmt.Println( + isoweek.Validate(2015, 52), isoweek.Validate(2015, 53), + isoweek.Validate(2015, 54), isoweek.Validate(2016, 0), + isoweek.Validate(2016, 1)) + fmt.Println( + isoweek.Validate(2016, 52), isoweek.Validate(2016, 53), + isoweek.Validate(2016, 54), isoweek.Validate(2017, 0), + isoweek.Validate(2017, 1)) + // Output: + // true true false false true + // true false false false true +} diff --git a/internal/isoweek/julian.go b/internal/isoweek/julian.go new file mode 100644 index 0000000..128c555 --- /dev/null +++ b/internal/isoweek/julian.go @@ -0,0 +1,32 @@ +package isoweek + +import "time" + +// DateToJulian converts a date to a Julian day number. +func DateToJulian(year int, month time.Month, day int) (jdn int) { + // Claus Tøndering's Calendar FAQ + // http://www.tondering.dk/claus/cal/julperiod.php#formula + + if month < 3 { + year = year - 1 + month = month + 12 + } + year = year + 4800 + + return day + (153*(int(month)-3)+2)/5 + 365*year + + year/4 - year/100 + year/400 - 32045 +} + +// JulianToDate converts a Julian day number to a date. +func JulianToDate(jdn int) (year int, month time.Month, day int) { + // Richards, E. G. (2013) pp. 585–624 + + e := 4*(jdn+1401+(4*jdn+274277)/146097*3/4-38) + 3 + h := e%1461/4*5 + 2 + + day = h%153/5 + 1 + month = time.Month((h/153+2)%12 + 1) + year = e/1461 - 4716 + (14-int(month))/12 + + return year, month, day +} diff --git a/internal/isoweek/julian_test.go b/internal/isoweek/julian_test.go new file mode 100644 index 0000000..abc0add --- /dev/null +++ b/internal/isoweek/julian_test.go @@ -0,0 +1,52 @@ +package isoweek_test + +import ( + "fmt" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/isoweek" +) + +func TestJulianToDate(test *testing.T) { + j := isoweek.DateToJulian(1, time.January, 1) + + for { + y, m, d := isoweek.JulianToDate(j) + if y >= 4000 { + break + } + if j != isoweek.DateToJulian(y, m, d) { + test.Errorf("mismatch on %04d-%02d-%02d", y, m, d) + } + j++ + } +} + +func TestDateToJulian(test *testing.T) { + t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + + for t.Year() < 4000 { + j := isoweek.DateToJulian(t.Date()) + + y, m, d := isoweek.JulianToDate(j) + + if y != t.Year() || m != t.Month() || d != t.Day() { + test.Errorf("mismatch on %s", t.Format("2006-01-02")) + } + if j+1 != isoweek.DateToJulian(y, m, d+1) { + test.Errorf("mismatch 2 on %s", t.Format("2006-01-02")) + } + t = t.AddDate(0, 0, 1) + } +} + +func ExampleDateToJulian() { + fmt.Println(isoweek.DateToJulian(2006, 1, 2)) + // Output: 2453738 +} + +func ExampleJulianToDate() { + fmt.Println(isoweek.JulianToDate(2453738)) + // Output: 2006 January 2 +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 7d67a9b..5a8d110 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -39,8 +39,18 @@ type CORS struct { } type DB struct { - Connect string `yaml:"connect"` - LogQueries bool `yaml:"logQueries"` + Connect string `yaml:"connect"` + LogQueries bool `yaml:"logQueries"` + Partition Partition `yaml:"partition"` +} + +type Partition struct { + Enabled bool `yaml:"enabled"` + Schema string `yaml:"schema"` + Interval string `yaml:"interval"` + Retain int `yaml:"retain"` + PreProvision *int `yaml:"preProvision"` + Drop bool `yaml:"detach"` } type Logger struct { diff --git a/pkg/config/parse.go b/pkg/config/parse.go index f8e9ac0..e8ba75f 100644 --- a/pkg/config/parse.go +++ b/pkg/config/parse.go @@ -40,10 +40,13 @@ func (c *Configuration) read() error { return err } - k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string { + err = k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string { return strings.Replace(strings.ToLower( strings.TrimPrefix(s, common.EnvPrefix)), "_", ".", -1) }), nil) + if err != nil { + return err + } err = k.UnmarshalWithConf("", &c.Config, koanf.UnmarshalConf{ diff --git a/pkg/database/calls.sql.go b/pkg/database/calls.sql.go index 2c82ea8..af8be5c 100644 --- a/pkg/database/calls.sql.go +++ b/pkg/database/calls.sql.go @@ -135,6 +135,26 @@ func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) error { return err } +const cleanupSweptCalls = `-- name: CleanupSweptCalls :execrows +WITH to_sweep AS ( + SELECT id FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= $1 AND calls.call_date < $2 +) UPDATE incidents_calls + SET + swept_call_id = call_id, + calls_tbl_id = NULL + WHERE call_id IN (SELECT id FROM to_sweep) +` + +func (q *Queries) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { + result, err := q.db.Exec(ctx, cleanupSweptCalls, rangeStart, rangeEnd) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + const getDatabaseSize = `-- name: GetDatabaseSize :one SELECT pg_size_pretty(pg_database_size(current_database())) ` @@ -154,3 +174,20 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip _, err := q.db.Exec(ctx, setCallTranscript, iD, transcript) return err } + +const sweepCalls = `-- name: SweepCalls :execrows +WITH to_sweep AS ( + SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript + FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= $1 AND calls.call_date < $2 +) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript FROM to_sweep +` + +func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { + result, err := q.db.Exec(ctx, sweepCalls, rangeStart, rangeEnd) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} diff --git a/pkg/database/database.go b/pkg/database/database.go index 97aa7d5..b833adb 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -23,21 +23,27 @@ import ( type Store interface { Querier talkgroupQuerier + partitionsQuerier - DB() *Database + DB() *Postgres + DBTX() DBTX InTx(context.Context, func(Store) error, pgx.TxOptions) error } -type Database struct { +type Postgres struct { *pgxpool.Pool *Queries } -func (db *Database) DB() *Database { +func (q *Queries) DBTX() DBTX { + return q.db +} + +func (db *Postgres) DB() *Postgres { return db } -func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error { +func (db *Postgres) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error { tx, err := db.DB().Pool.BeginTx(ctx, opts) if err != nil { return fmt.Errorf("Tx begin: %w", err) @@ -46,7 +52,7 @@ func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOp //nolint:errcheck defer tx.Rollback(ctx) - dbtx := &Database{Pool: db.Pool, Queries: db.Queries.WithTx(tx)} + dbtx := &Postgres{Pool: db.Pool, Queries: db.Queries.WithTx(tx)} err = f(dbtx) if err != nil { @@ -68,11 +74,11 @@ func (m dbLogger) Log(ctx context.Context, level tracelog.LogLevel, msg string, } func Close(c Store) { - c.(*Database).Pool.Close() + c.(*Postgres).Pool.Close() } // NewClient creates a new DB using the provided config. -func NewClient(ctx context.Context, conf config.DB) (Store, error) { +func NewClient(ctx context.Context, conf config.DB) (*Postgres, error) { dir, err := iofs.New(sqlembed.Migrations, "postgres/migrations") if err != nil { return nil, err @@ -88,6 +94,8 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) { return nil, err } + log.Debug().Err(err).Msg("migrations done") + m.Close() pgConf, err := pgxpool.ParseConfig(conf.Connect) @@ -107,7 +115,7 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) { return nil, err } - db := &Database{ + db := &Postgres{ Pool: pool, Queries: New(pool), } diff --git a/pkg/database/mocks/DBTX.go b/pkg/database/mocks/DBTX.go new file mode 100644 index 0000000..4de20fe --- /dev/null +++ b/pkg/database/mocks/DBTX.go @@ -0,0 +1,287 @@ +// Code generated by mockery v2.47.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + pgconn "github.com/jackc/pgx/v5/pgconn" + + pgx "github.com/jackc/pgx/v5" +) + +// DBTX is an autogenerated mock type for the DBTX type +type DBTX struct { + mock.Mock +} + +type DBTX_Expecter struct { + mock *mock.Mock +} + +func (_m *DBTX) EXPECT() *DBTX_Expecter { + return &DBTX_Expecter{mock: &_m.Mock} +} + +// Exec provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) Exec(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgconn.CommandTag, error) { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Exec") + } + + var r0 pgconn.CommandTag + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)); ok { + return rf(_a0, _a1, _a2...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgconn.CommandTag); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + r0 = ret.Get(0).(pgconn.CommandTag) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok { + r1 = rf(_a0, _a1, _a2...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DBTX_Exec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exec' +type DBTX_Exec_Call struct { + *mock.Call +} + +// Exec is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) Exec(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Exec_Call { + return &DBTX_Exec_Call{Call: _e.mock.On("Exec", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_Exec_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Exec_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_Exec_Call) Return(_a0 pgconn.CommandTag, _a1 error) *DBTX_Exec_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DBTX_Exec_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)) *DBTX_Exec_Call { + _c.Call.Return(run) + return _c +} + +// Query provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) Query(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgx.Rows, error) { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Query") + } + + var r0 pgx.Rows + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgx.Rows, error)); ok { + return rf(_a0, _a1, _a2...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Rows); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.Rows) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok { + r1 = rf(_a0, _a1, _a2...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DBTX_Query_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Query' +type DBTX_Query_Call struct { + *mock.Call +} + +// Query is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) Query(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Query_Call { + return &DBTX_Query_Call{Call: _e.mock.On("Query", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_Query_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Query_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_Query_Call) Return(_a0 pgx.Rows, _a1 error) *DBTX_Query_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DBTX_Query_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgx.Rows, error)) *DBTX_Query_Call { + _c.Call.Return(run) + return _c +} + +// QueryRow provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DBTX) QueryRow(_a0 context.Context, _a1 string, _a2 ...interface{}) pgx.Row { + var _ca []interface{} + _ca = append(_ca, _a0, _a1) + _ca = append(_ca, _a2...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for QueryRow") + } + + var r0 pgx.Row + if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Row); ok { + r0 = rf(_a0, _a1, _a2...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.Row) + } + } + + return r0 +} + +// DBTX_QueryRow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRow' +type DBTX_QueryRow_Call struct { + *mock.Call +} + +// QueryRow is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 ...interface{} +func (_e *DBTX_Expecter) QueryRow(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_QueryRow_Call { + return &DBTX_QueryRow_Call{Call: _e.mock.On("QueryRow", + append([]interface{}{_a0, _a1}, _a2...)...)} +} + +func (_c *DBTX_QueryRow_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_QueryRow_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]interface{}, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(interface{}) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *DBTX_QueryRow_Call) Return(_a0 pgx.Row) *DBTX_QueryRow_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DBTX_QueryRow_Call) RunAndReturn(run func(context.Context, string, ...interface{}) pgx.Row) *DBTX_QueryRow_Call { + _c.Call.Return(run) + return _c +} + +// SendBatch provides a mock function with given fields: _a0, _a1 +func (_m *DBTX) SendBatch(_a0 context.Context, _a1 *pgx.Batch) pgx.BatchResults { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for SendBatch") + } + + var r0 pgx.BatchResults + if rf, ok := ret.Get(0).(func(context.Context, *pgx.Batch) pgx.BatchResults); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(pgx.BatchResults) + } + } + + return r0 +} + +// DBTX_SendBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendBatch' +type DBTX_SendBatch_Call struct { + *mock.Call +} + +// SendBatch is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *pgx.Batch +func (_e *DBTX_Expecter) SendBatch(_a0 interface{}, _a1 interface{}) *DBTX_SendBatch_Call { + return &DBTX_SendBatch_Call{Call: _e.mock.On("SendBatch", _a0, _a1)} +} + +func (_c *DBTX_SendBatch_Call) Run(run func(_a0 context.Context, _a1 *pgx.Batch)) *DBTX_SendBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*pgx.Batch)) + }) + return _c +} + +func (_c *DBTX_SendBatch_Call) Return(_a0 pgx.BatchResults) *DBTX_SendBatch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DBTX_SendBatch_Call) RunAndReturn(run func(context.Context, *pgx.Batch) pgx.BatchResults) *DBTX_SendBatch_Call { + _c.Call.Return(run) + return _c +} + +// NewDBTX creates a new instance of DBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDBTX(t interface { + mock.TestingT + Cleanup(func()) +}) *DBTX { + mock := &DBTX{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 9630a74..564dcd1 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -12,6 +12,8 @@ import ( pgx "github.com/jackc/pgx/v5" + time "time" + uuid "github.com/google/uuid" ) @@ -227,6 +229,64 @@ func (_c *Store_BulkSetTalkgroupTags_Call) RunAndReturn(run func(context.Context return _c } +// CleanupSweptCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd +func (_m *Store) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { + ret := _m.Called(ctx, rangeStart, rangeEnd) + + if len(ret) == 0 { + panic("no return value specified for CleanupSweptCalls") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok { + return rf(ctx, rangeStart, rangeEnd) + } + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok { + r0 = rf(ctx, rangeStart, rangeEnd) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok { + r1 = rf(ctx, rangeStart, rangeEnd) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_CleanupSweptCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupSweptCalls' +type Store_CleanupSweptCalls_Call struct { + *mock.Call +} + +// CleanupSweptCalls is a helper method to define mock.On call +// - ctx context.Context +// - rangeStart pgtype.Timestamptz +// - rangeEnd pgtype.Timestamptz +func (_e *Store_Expecter) CleanupSweptCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_CleanupSweptCalls_Call { + return &Store_CleanupSweptCalls_Call{Call: _e.mock.On("CleanupSweptCalls", ctx, rangeStart, rangeEnd)} +} + +func (_c *Store_CleanupSweptCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_CleanupSweptCalls_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz)) + }) + return _c +} + +func (_c *Store_CleanupSweptCalls_Call) Return(_a0 int64, _a1 error) *Store_CleanupSweptCalls_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_CleanupSweptCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_CleanupSweptCalls_Call { + _c.Call.Return(run) + return _c +} + // CreateAPIKey provides a mock function with given fields: ctx, owner, expires, disabled func (_m *Store) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (database.ApiKey, error) { ret := _m.Called(ctx, owner, expires, disabled) @@ -286,6 +346,56 @@ func (_c *Store_CreateAPIKey_Call) RunAndReturn(run func(context.Context, int, p return _c } +// CreatePartition provides a mock function with given fields: ctx, parentTable, partitionName, start, end +func (_m *Store) CreatePartition(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time) error { + ret := _m.Called(ctx, parentTable, partitionName, start, end) + + if len(ret) == 0 { + panic("no return value specified for CreatePartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, time.Time, time.Time) error); ok { + r0 = rf(ctx, parentTable, partitionName, start, end) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_CreatePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreatePartition' +type Store_CreatePartition_Call struct { + *mock.Call +} + +// CreatePartition is a helper method to define mock.On call +// - ctx context.Context +// - parentTable string +// - partitionName string +// - start time.Time +// - end time.Time +func (_e *Store_Expecter) CreatePartition(ctx interface{}, parentTable interface{}, partitionName interface{}, start interface{}, end interface{}) *Store_CreatePartition_Call { + return &Store_CreatePartition_Call{Call: _e.mock.On("CreatePartition", ctx, parentTable, partitionName, start, end)} +} + +func (_c *Store_CreatePartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time)) *Store_CreatePartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(time.Time), args[4].(time.Time)) + }) + return _c +} + +func (_c *Store_CreatePartition_Call) Return(_a0 error) *Store_CreatePartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, string, string, time.Time, time.Time) error) *Store_CreatePartition_Call { + _c.Call.Return(run) + return _c +} + // CreateUser provides a mock function with given fields: ctx, arg func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) { ret := _m.Called(ctx, arg) @@ -344,19 +454,19 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, database } // DB provides a mock function with given fields: -func (_m *Store) DB() *database.Database { +func (_m *Store) DB() *database.Postgres { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for DB") } - var r0 *database.Database - if rf, ok := ret.Get(0).(func() *database.Database); ok { + var r0 *database.Postgres + if rf, ok := ret.Get(0).(func() *database.Postgres); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*database.Database) + r0 = ret.Get(0).(*database.Postgres) } } @@ -380,12 +490,59 @@ func (_c *Store_DB_Call) Run(run func()) *Store_DB_Call { return _c } -func (_c *Store_DB_Call) Return(_a0 *database.Database) *Store_DB_Call { +func (_c *Store_DB_Call) Return(_a0 *database.Postgres) *Store_DB_Call { _c.Call.Return(_a0) return _c } -func (_c *Store_DB_Call) RunAndReturn(run func() *database.Database) *Store_DB_Call { +func (_c *Store_DB_Call) RunAndReturn(run func() *database.Postgres) *Store_DB_Call { + _c.Call.Return(run) + return _c +} + +// DBTX provides a mock function with given fields: +func (_m *Store) DBTX() database.DBTX { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DBTX") + } + + var r0 database.DBTX + if rf, ok := ret.Get(0).(func() database.DBTX); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(database.DBTX) + } + } + + return r0 +} + +// Store_DBTX_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DBTX' +type Store_DBTX_Call struct { + *mock.Call +} + +// DBTX is a helper method to define mock.On call +func (_e *Store_Expecter) DBTX() *Store_DBTX_Call { + return &Store_DBTX_Call{Call: _e.mock.On("DBTX")} +} + +func (_c *Store_DBTX_Call) Run(run func()) *Store_DBTX_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Store_DBTX_Call) Return(_a0 database.DBTX) *Store_DBTX_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DBTX_Call) RunAndReturn(run func() database.DBTX) *Store_DBTX_Call { _c.Call.Return(run) return _c } @@ -484,6 +641,101 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string) return _c } +// DetachPartition provides a mock function with given fields: ctx, parentTable, partitionName +func (_m *Store) DetachPartition(ctx context.Context, parentTable string, partitionName string) error { + ret := _m.Called(ctx, parentTable, partitionName) + + if len(ret) == 0 { + panic("no return value specified for DetachPartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, parentTable, partitionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DetachPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetachPartition' +type Store_DetachPartition_Call struct { + *mock.Call +} + +// DetachPartition is a helper method to define mock.On call +// - ctx context.Context +// - parentTable string +// - partitionName string +func (_e *Store_Expecter) DetachPartition(ctx interface{}, parentTable interface{}, partitionName interface{}) *Store_DetachPartition_Call { + return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, parentTable, partitionName)} +} + +func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string)) *Store_DetachPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string, string) error) *Store_DetachPartition_Call { + _c.Call.Return(run) + return _c +} + +// DropPartition provides a mock function with given fields: ctx, partitionName +func (_m *Store) DropPartition(ctx context.Context, partitionName string) error { + ret := _m.Called(ctx, partitionName) + + if len(ret) == 0 { + panic("no return value specified for DropPartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, partitionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Store_DropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartition' +type Store_DropPartition_Call struct { + *mock.Call +} + +// DropPartition is a helper method to define mock.On call +// - ctx context.Context +// - partitionName string +func (_e *Store_Expecter) DropPartition(ctx interface{}, partitionName interface{}) *Store_DropPartition_Call { + return &Store_DropPartition_Call{Call: _e.mock.On("DropPartition", ctx, partitionName)} +} + +func (_c *Store_DropPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DropPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Store_DropPartition_Call) Return(_a0 error) *Store_DropPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Store_DropPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DropPartition_Call { + _c.Call.Return(run) + return _c +} + // GetAPIKey provides a mock function with given fields: ctx, apiKey func (_m *Store) GetAPIKey(ctx context.Context, apiKey string) (database.ApiKey, error) { ret := _m.Called(ctx, apiKey) @@ -654,6 +906,66 @@ func (_c *Store_GetSystemName_Call) RunAndReturn(run func(context.Context, int) return _c } +// GetTablePartitions provides a mock function with given fields: ctx, schemaName, tableName +func (_m *Store) GetTablePartitions(ctx context.Context, schemaName string, tableName string) ([]database.PartitionResult, error) { + ret := _m.Called(ctx, schemaName, tableName) + + if len(ret) == 0 { + panic("no return value specified for GetTablePartitions") + } + + var r0 []database.PartitionResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) ([]database.PartitionResult, error)); ok { + return rf(ctx, schemaName, tableName) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) []database.PartitionResult); ok { + r0 = rf(ctx, schemaName, tableName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]database.PartitionResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, schemaName, tableName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_GetTablePartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTablePartitions' +type Store_GetTablePartitions_Call struct { + *mock.Call +} + +// GetTablePartitions is a helper method to define mock.On call +// - ctx context.Context +// - schemaName string +// - tableName string +func (_e *Store_Expecter) GetTablePartitions(ctx interface{}, schemaName interface{}, tableName interface{}) *Store_GetTablePartitions_Call { + return &Store_GetTablePartitions_Call{Call: _e.mock.On("GetTablePartitions", ctx, schemaName, tableName)} +} + +func (_c *Store_GetTablePartitions_Call) Run(run func(ctx context.Context, schemaName string, tableName string)) *Store_GetTablePartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *Store_GetTablePartitions_Call) Return(_a0 []database.PartitionResult, _a1 error) *Store_GetTablePartitions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_GetTablePartitions_Call) RunAndReturn(run func(context.Context, string, string) ([]database.PartitionResult, error)) *Store_GetTablePartitions_Call { + _c.Call.Return(run) + return _c +} + // GetTalkgroup provides a mock function with given fields: ctx, systemID, tGID func (_m *Store) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (database.GetTalkgroupRow, error) { ret := _m.Called(ctx, systemID, tGID) @@ -1958,6 +2270,64 @@ func (_c *Store_StoreTGVersion_Call) RunAndReturn(run func(context.Context, []da return _c } +// SweepCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd +func (_m *Store) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { + ret := _m.Called(ctx, rangeStart, rangeEnd) + + if len(ret) == 0 { + panic("no return value specified for SweepCalls") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok { + return rf(ctx, rangeStart, rangeEnd) + } + if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok { + r0 = rf(ctx, rangeStart, rangeEnd) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok { + r1 = rf(ctx, rangeStart, rangeEnd) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Store_SweepCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SweepCalls' +type Store_SweepCalls_Call struct { + *mock.Call +} + +// SweepCalls is a helper method to define mock.On call +// - ctx context.Context +// - rangeStart pgtype.Timestamptz +// - rangeEnd pgtype.Timestamptz +func (_e *Store_Expecter) SweepCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_SweepCalls_Call { + return &Store_SweepCalls_Call{Call: _e.mock.On("SweepCalls", ctx, rangeStart, rangeEnd)} +} + +func (_c *Store_SweepCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_SweepCalls_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz)) + }) + return _c +} + +func (_c *Store_SweepCalls_Call) Return(_a0 int64, _a1 error) *Store_SweepCalls_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Store_SweepCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_SweepCalls_Call { + _c.Call.Return(run) + return _c +} + // UpdatePassword provides a mock function with given fields: ctx, username, password func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error { ret := _m.Called(ctx, username, password) diff --git a/pkg/database/models.go b/pkg/database/models.go index 042b84b..b76cbb8 100644 --- a/pkg/database/models.go +++ b/pkg/database/models.go @@ -55,6 +55,111 @@ type Call struct { Transcript *string `json:"transcript,omitempty"` } +type CallsP202407 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202408 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202409 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202410 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + +type CallsP202411 struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + type Incident struct { ID uuid.UUID `json:"id,omitempty"` Name string `json:"name,omitempty"` @@ -66,9 +171,12 @@ type Incident struct { } type IncidentsCall struct { - IncidentID uuid.UUID `json:"incident_id,omitempty"` - CallID uuid.UUID `json:"call_id,omitempty"` - Notes []byte `json:"notes,omitempty"` + IncidentID uuid.UUID `json:"incident_id,omitempty"` + CallID uuid.UUID `json:"call_id,omitempty"` + CallsTblID pgtype.UUID `json:"calls_tbl_id,omitempty"` + SweptCallID pgtype.UUID `json:"swept_call_id,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + Notes []byte `json:"notes,omitempty"` } type Setting struct { @@ -77,6 +185,27 @@ type Setting struct { Value []byte `json:"value,omitempty"` } +type SweptCall struct { + ID uuid.UUID `json:"id,omitempty"` + Submitter *int32 `json:"submitter,omitempty"` + System int `json:"system,omitempty"` + Talkgroup int `json:"talkgroup,omitempty"` + CallDate pgtype.Timestamptz `json:"call_date,omitempty"` + AudioName *string `json:"audio_name,omitempty"` + AudioBlob []byte `json:"audio_blob,omitempty"` + Duration *int32 `json:"duration,omitempty"` + AudioType *string `json:"audio_type,omitempty"` + AudioUrl *string `json:"audio_url,omitempty"` + Frequency int `json:"frequency,omitempty"` + Frequencies []int `json:"frequencies,omitempty"` + Patches []int `json:"patches,omitempty"` + TGLabel *string `json:"tg_label,omitempty"` + TGAlphaTag *string `json:"tg_alpha_tag,omitempty"` + TGGroup *string `json:"tg_group,omitempty"` + Source int `json:"source,omitempty"` + Transcript *string `json:"transcript,omitempty"` +} + type System struct { ID int `json:"id,omitempty"` Name string `json:"name,omitempty"` diff --git a/pkg/database/partitions.go b/pkg/database/partitions.go new file mode 100644 index 0000000..636c549 --- /dev/null +++ b/pkg/database/partitions.go @@ -0,0 +1,160 @@ +package database + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" +) + +var ( + ErrLowerBoundAfterUpperBound = errors.New("lower bound after upper bound") + ErrCantDecodePartitionBounds = errors.New("cannot decode partition bounds") +) + +type PartitionResult struct { + ParentTable string + Schema string + Name string + LowerBound string + UpperBound string +} + +type partitionsQuerier interface { + GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error) + CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error + DetachPartition(ctx context.Context, parentTable, partitionName string) error + DropPartition(ctx context.Context, partitionName string) error +} + +func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName string) (partitions []PartitionResult, err error) { + query := fmt.Sprintf(` + WITH parts as ( + SELECT + relnamespace::regnamespace as schema, + c.oid::pg_catalog.regclass AS part_name, + regexp_match(pg_get_expr(c.relpartbound, c.oid), + 'FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)') AS bounds + FROM + pg_catalog.pg_class c JOIN pg_catalog.pg_inherits i ON (c.oid = i.inhrelid) + WHERE i.inhparent = '%s.%s'::regclass + AND c.relkind='r' + ) + SELECT + schema, + part_name as name, + '%s' as parentTable, + bounds[1]::text AS lowerBound, + bounds[2]::text AS upperBound + FROM parts + ORDER BY part_name;`, schemaName, tableName, tableName) + + rows, err := q.db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to get partitions: %w", err) + } + + partitions, err = pgx.CollectRows(rows, pgx.RowToStructByName[PartitionResult]) + if err != nil { + return nil, fmt.Errorf("failed to cast list: %w", err) + } + + return partitions, nil +} + +func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error { + const boundFmt = "2006-01-02 00:00:00Z00" + _, err := q.db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');`, partitionName, parentTable, start.Format(boundFmt), end.Format(boundFmt))) + + return err +} + +func (q *Queries) DropPartition(ctx context.Context, partitionName string) error { + _, err := q.db.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, partitionName)) + + return err +} + +func (q *Queries) DetachPartition(ctx context.Context, parentTable, partitionName string) error { + _, err := q.db.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s;`, parentTable, partitionName)) + return err +} + +func (partition PartitionResult) ParseBounds() (lowerBound time.Time, upperBound time.Time, err error) { + lowerBound, upperBound, err = parseBoundAsDate(partition) + if err == nil { + return lowerBound, upperBound, nil + } + + lowerBound, upperBound, err = parseBoundAsDateTime(partition) + if err == nil { + return lowerBound, upperBound, nil + } + + lowerBound, upperBound, err = parseBoundAsDateTimeWithTimezone(partition) + if err == nil { + return lowerBound, upperBound, nil + } + + if lowerBound.After(lowerBound) { + return time.Time{}, time.Time{}, ErrLowerBoundAfterUpperBound + } + + return time.Time{}, time.Time{}, ErrCantDecodePartitionBounds +} + +func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { + lowerBound, err = time.ParseInLocation("2006-01-02", partition.LowerBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err) + } + + upperBound, err = time.ParseInLocation("2006-01-02", partition.UpperBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err) + } + + return lowerBound, upperBound, nil +} + +func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { + lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.LowerBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err) + } + + upperBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.UpperBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err) + } + + return lowerBound, upperBound, nil +} + +func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { + lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.LowerBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err) + } + + upperBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.UpperBound, time.UTC) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err) + } + + lowerBound = convertToDateTimeWithoutTimezone(lowerBound) + upperBound = convertToDateTimeWithoutTimezone(upperBound) + + return lowerBound, upperBound, nil +} + +func convertToDateTimeWithoutTimezone(bound time.Time) time.Time { + parsedTime, err := time.Parse("2006-01-02 15:04:05", bound.UTC().Format("2006-01-02 15:04:05")) + if err != nil { + return time.Time{} + } + + return parsedTime +} diff --git a/pkg/database/partman/intervals.go b/pkg/database/partman/intervals.go new file mode 100644 index 0000000..b80d330 --- /dev/null +++ b/pkg/database/partman/intervals.go @@ -0,0 +1,152 @@ +package partman + +import ( + "fmt" + "time" +) + +const ( + daysInWeek = 7 + monthsInQuarter = 3 +) + +func getDailyBounds(date time.Time) (lowerBound, upperBound time.Time) { + lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC) + upperBound = lowerBound.AddDate(0, 0, 1) + + return +} + +func getWeeklyBounds(date time.Time) (lowerBound, upperBound time.Time) { + lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -int(date.Weekday()-time.Monday)) + upperBound = lowerBound.AddDate(0, 0, daysInWeek) + + return +} + +func getMonthlyBounds(date time.Time) (lowerBound, upperBound time.Time) { + lowerBound = time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.UTC) + upperBound = lowerBound.AddDate(0, 1, 0) + + return +} + +func getQuarterlyBounds(date time.Time) (lowerBound, upperBound time.Time) { + year, _, _ := date.Date() + + quarter := (int(date.Month()) - 1) / monthsInQuarter + firstMonthOfTheQuarter := time.Month(quarter*monthsInQuarter + 1) + + lowerBound = time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC) + upperBound = lowerBound.AddDate(0, monthsInQuarter, 0) + + return +} + +func getYearlyBounds(date time.Time) (lowerBound, upperBound time.Time) { + lowerBound = time.Date(date.Year(), 1, 1, 0, 0, 0, 0, time.UTC) + upperBound = lowerBound.AddDate(1, 0, 0) + + return +} + +func (p partition) Next(i int) partition { + var t time.Time + switch p.Interval { + case Daily: + t = p.Time.AddDate(0, 0, i) + case Weekly: + t = p.Time.AddDate(0, 0, i*daysInWeek) + case Monthly: + year, month, _ := p.Time.Date() + + t = time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, p.Time.Location()) + case Quarterly: + t = p.Time.AddDate(0, i*monthsInQuarter, 0) + case Yearly: + year, _, _ := p.Time.Date() + + t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location()) + } + np := partition{ + ParentTable: p.ParentTable, + Name: p.Name, + Schema: p.Schema, + Interval: p.Interval, + Time: t, + } + + np.setName() + + return np +} + +func (p *partition) setName() { + t := p.Time + var suffix string + + switch p.Interval { + case Daily: + suffix = t.Format("2006_01_02") + case Weekly: + year, week := t.ISOWeek() + suffix = fmt.Sprintf("%d_w%02d", year, week) + case Monthly: + suffix = t.Format("2006_01") + case Quarterly: + year, month, _ := t.Date() + + var quarter int + + switch { + case month >= 1 && month <= 3: + quarter = 1 + case month >= 4 && month <= 6: + quarter = 2 + case month >= 7 && month <= 9: + quarter = 3 + case month >= 10 && month <= 12: + quarter = 4 + } + + suffix = fmt.Sprintf("%d_q%d", year, quarter) + case Yearly: + suffix = t.Format("2006") + default: + panic(ErrInvalidInterval(p.Interval)) + } + + p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix) +} + +func (p partition) Prev(i int) partition { + var t time.Time + switch p.Interval { + case Daily: + t = p.Time.AddDate(0, 0, -i) + case Weekly: + t = p.Time.AddDate(0, 0, -i*daysInWeek) + case Monthly: + year, month, _ := p.Time.Date() + + t = time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, p.Time.Location()) + case Quarterly: + t = p.Time.AddDate(0, -i*monthsInQuarter, 0) + case Yearly: + year, _, _ := p.Time.Date() + + t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location()) + } + + pp := partition{ + ParentTable: p.ParentTable, + Name: p.Name, + Schema: p.Schema, + Interval: p.Interval, + Time: t, + } + pp.setName() + + return pp + +} diff --git a/pkg/database/partman/partman.go b/pkg/database/partman/partman.go new file mode 100644 index 0000000..bb9608a --- /dev/null +++ b/pkg/database/partman/partman.go @@ -0,0 +1,449 @@ +package partman + +// portions lifted gratefully from github.com/qonto/postgresql-partition-manager, MIT license. + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "dynatron.me/x/stillbox/internal/isoweek" + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/database" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/rs/zerolog/log" +) + +const ( + callsTable = "calls" + + preProvisionDefault = 1 +) + +var ( + ErrWrongSchema = errors.New("wrong schema name") + ErrDifferentInterval = errors.New("stored partition interval differs from configured") +) + +type PartitionErr struct { + p string + err error +} + +func (pe PartitionErr) Error() string { + r := fmt.Sprintf("bad partition '%s'", pe.p) + if pe.err != nil { + r += ": " + pe.err.Error() + } + + return r +} + +func (pe PartitionErr) Unwrap() error { + return pe.err +} + +type ParsedIntvlErr struct { + parsed, start time.Time +} + +func (pie ParsedIntvlErr) Error() string { + return fmt.Sprintf("parsed interval (%s) does not match start (%s)", pie.parsed, pie.start) +} + +func PartitionError(pname string, err ...error) PartitionErr { + if len(err) > 0 { + return PartitionErr{p: pname, err: err[0]} + } + + return PartitionErr{p: pname} +} + +type ErrInvalidInterval string + +func (in ErrInvalidInterval) Error() string { + return fmt.Sprintf("invalid interval '%s'", string(in)) +} + +type Interval string + +const ( + Unknown Interval = "" + Daily Interval = "daily" + Weekly Interval = "weekly" + Monthly Interval = "monthly" + Quarterly Interval = "quarterly" + Yearly Interval = "yearly" +) + +func (p Interval) IsValid() bool { + switch p { + case Daily, Weekly, Monthly, Quarterly, Yearly: + return true + } + + return false +} + +type PartitionManager interface { + Go(ctx context.Context) + Check(ctx context.Context, now time.Time) error +} + +type partman struct { + db database.Store + cfg config.Partition + intv Interval +} + +type partition struct { + ParentTable string + Schema string + Name string + Interval Interval + Time time.Time +} + +func New(db database.Store, cfg config.Partition) (*partman, error) { + pm := &partman{ + cfg: cfg, + db: db, + intv: Interval(cfg.Interval), + } + + if !pm.intv.IsValid() { + return nil, ErrInvalidInterval(pm.intv) + } + + return pm, nil +} + +var _ PartitionManager = (*partman)(nil) + +func (pm *partman) Go(ctx context.Context) { + tick := time.NewTicker(60 * time.Minute) + + select { + case now := <-tick.C: + err := pm.Check(ctx, now) + if err != nil { + log.Error().Err(err).Msg("partman check failed") + } + case <-ctx.Done(): + return + } +} + +func (pm *partman) newPartition(t time.Time) partition { + p := partition{ + ParentTable: callsTable, + Schema: pm.cfg.Schema, + Interval: Interval(pm.cfg.Interval), + Time: t, + } + + p.setName() + + return p +} + +func (pm *partman) retentionPartitions(cur partition) []partition { + partitions := make([]partition, 0, pm.cfg.Retain) + for i := 1; i <= pm.cfg.Retain; i++ { + prev := cur.Prev(i) + partitions = append(partitions, prev) + } + + return partitions +} + +func (pm *partman) futurePartitions(cur partition) []partition { + preProv := preProvisionDefault + if pm.cfg.PreProvision != nil { + preProv = *pm.cfg.PreProvision + } + + partitions := make([]partition, 0, preProv) + for i := 1; i <= preProv; i++ { + next := cur.Next(i) + partitions = append(partitions, next) + } + + return partitions +} + +func (pm *partman) expectedPartitions(now time.Time) []partition { + curPart := pm.newPartition(now) + + shouldExist := []partition{curPart} + if pm.cfg.Retain > -1 { + retain := pm.retentionPartitions(curPart) + shouldExist = append(shouldExist, retain...) + } + + future := pm.futurePartitions(curPart) + + shouldExist = append(shouldExist, future...) + + return shouldExist +} + +func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) { + existing := make(map[string]partition) + expectedAndExists := make(map[string]bool) + + for _, t := range existingTables { + existing[t.PartitionName()] = t + } + + for _, t := range expectedTables { + if _, found := existing[t.PartitionName()]; found { + expectedAndExists[t.PartitionName()] = true + } else { + missingTables = append(missingTables, t) + } + } + + for _, t := range existingTables { + if _, found := expectedAndExists[t.PartitionName()]; !found { + // Only in existingTables and not in both + unexpectedTables = append(unexpectedTables, t) + } + } + + return unexpectedTables, missingTables +} + +func (pm *partman) existingPartitions(parts []database.PartitionResult) ([]partition, error) { + existing := make([]partition, 0, len(parts)) + for _, v := range parts { + if v.Schema != pm.cfg.Schema { + return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema) + } + p, err := pm.verifyPartName(v) + if err != nil { + return nil, err + } + + if p.Interval != Interval(pm.cfg.Interval) { + return nil, PartitionError(v.Schema+"."+v.Name, ErrDifferentInterval) + } + + existing = append(existing, p) + } + return existing, nil +} + +func (pm *partman) fullTableName(s string) string { + return fmt.Sprintf("%s.%s", pm.cfg.Schema, s) +} + +func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error { + s, e := p.Range() + start := pgtype.Timestamptz{Time: s, Valid: true} + end := pgtype.Timestamptz{Time: e, Valid: true} + fullPartName := pm.fullTableName(p.PartitionName()) + + swept, err := tx.SweepCalls(ctx, start, end) + if err != nil { + return err + } + log.Info().Int64("rows", swept).Time("start", s).Time("end", e).Msg("swept calls") + + swept, err = tx.CleanupSweptCalls(ctx, start, end) + if err != nil { + return err + } + log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls") + + log.Info().Str("partition", fullPartName).Msg("detaching partition") + err = tx.DetachPartition(ctx, callsTable, fullPartName) + if err != nil { + return err + } + + if pm.cfg.Drop { + log.Info().Str("partition", fullPartName).Msg("dropping partition") + return tx.DropPartition(ctx, fullPartName) + } + + return nil +} + +func (pm *partman) Check(ctx context.Context, now time.Time) error { + return pm.db.InTx(ctx, func(db database.Store) error { + // by default, we want to make sure a partition exists for this and next month + // since we run this at startup, it's safe to do only that. + partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable) + if err != nil { + return err + } + + existing, err := pm.existingPartitions(partitions) + if err != nil { + return err + } + + expected := pm.expectedPartitions(now) + + unexpected, missing := pm.comparePartitions(existing, expected) + + if pm.cfg.Retain > -1 { + for _, p := range unexpected { + err := pm.prunePartition(ctx, db, p) + if err != nil { + return err + } + } + } + + for _, p := range missing { + err := pm.createPartition(ctx, db, p) + if err != nil { + return err + } + } + + return nil + + }, pgx.TxOptions{}) +} + +func (p partition) Range() (time.Time, time.Time) { + switch p.Interval { + case Daily: + return getDailyBounds(p.Time) + case Weekly: + return getWeeklyBounds(p.Time) + case Monthly: + return getMonthlyBounds(p.Time) + case Quarterly: + return getQuarterlyBounds(p.Time) + case Yearly: + return getYearlyBounds(p.Time) + } + + panic("unknown interval!") +} + +func (p partition) PartitionName() string { + return p.Name +} + +func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error { + start, end := part.Range() + name := part.PartitionName() + log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition") + return tx.CreatePartition(ctx, callsTable, name, start, end) +} + +/* + * Partition scheme names: + * daily: calls_p_2024_11_28 + * weekly: calls_p_2024_w48 + * monthly: calls_p_2024_11 + * quarterly: calls_p_2024_q4 + * yearly: calls_p_2024 + */ + +func (pm *partman) verifyPartName(pr database.PartitionResult) (p partition, err error) { + pn := pr.Name + low, _, err := pr.ParseBounds() + if err != nil { + return + } + p = partition{ + ParentTable: pr.ParentTable, + Name: pr.Name, + Schema: pr.Schema, + Time: low, + } + dateAr := strings.Split(pn, "calls_p_") + if len(dateAr) != 2 { + return p, PartitionError(pn) + } + + dateAr = strings.Split(dateAr[1], "_") + switch len(dateAr) { + case 3: // daily + p.Interval = Daily + ymd := [3]int{} + for i := 0; i < 3; i++ { + r, err := strconv.Atoi(dateAr[i]) + if err != nil { + return p, PartitionError(pn, err) + } + + ymd[i] = r + } + parsed := time.Date(ymd[0], time.Month(ymd[1]), ymd[2], 0, 0, 0, 0, time.UTC) + if parsed != p.Time { + return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time}) + } + return p, nil + case 2: + year, err := strconv.Atoi(dateAr[0]) + if err != nil { + return p, PartitionError(pn, err) + } + if strings.HasPrefix(dateAr[1], "w") { + p.Interval = Weekly + weekNum, err := strconv.Atoi(dateAr[1][1:]) + if err != nil { + return p, PartitionError(pn, err) + } + + parsed := isoweek.StartTime(year, weekNum, time.UTC) + if parsed != p.Time { + return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time}) + } + return p, nil + } else if strings.HasPrefix(dateAr[1], "q") { + p.Interval = Quarterly + quarterNum, err := strconv.Atoi(dateAr[1][1:]) + if err != nil { + return p, PartitionError(pn, err) + } + if quarterNum > 4 { + return p, PartitionError(pn, errors.New("invalid quarter")) + } + firstMonthOfTheQuarter := time.Month((quarterNum-1)*monthsInQuarter + 1) + parsed := time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC) + if parsed != p.Time { + return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time}) + } + return p, nil + } + // monthly + p.Interval = Monthly + month, err := strconv.Atoi(dateAr[1]) + if err != nil { + return p, PartitionError(pn) + } + + parsed := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC) + if parsed != p.Time { + return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time}) + } + return p, nil + case 1: // yearly + p.Interval = Yearly + year, err := strconv.Atoi(dateAr[0]) + if err != nil { + return p, PartitionError(pn, err) + } + parsed := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC) + if parsed != p.Time { + return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time}) + } + + return p, nil + } + + return p, PartitionError(pn) +} diff --git a/pkg/database/partman/partman_test.go b/pkg/database/partman/partman_test.go new file mode 100644 index 0000000..90615d4 --- /dev/null +++ b/pkg/database/partman/partman_test.go @@ -0,0 +1,431 @@ +package partman_test + +import ( + "context" + "testing" + "time" + + "dynatron.me/x/stillbox/internal/common" + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/database/mocks" + "dynatron.me/x/stillbox/pkg/database/partman" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var mctx = mock.Anything + +func inTx(s *mocks.Store) { + s.EXPECT().InTx(mctx, mock.AnythingOfType("func(database.Store) error"), mock.AnythingOfType("pgx.TxOptions")).RunAndReturn(func(ctx context.Context, f func(db database.Store) error, po pgx.TxOptions) error { + return f(s) + }) + +} + +type timeRange struct { + start time.Time + end time.Time +} + +type partSpec struct { + name string + timeRange +} + +func TestPartman(t *testing.T) { + ctx := context.Background() + + timeInUTC := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02 15:04:05", s, time.UTC) + if err != nil { + panic(err) + } + + return t + } + + dateInUTC := func(s string) time.Time { + t, err := time.ParseInLocation("2006-01-02", s, time.UTC) + if err != nil { + panic(err) + } + + return t + } + + partResultWithSchema := func(schema, name, low, up string) database.PartitionResult { + return database.PartitionResult{ + ParentTable: "calls", + Schema: schema, + Name: name, + LowerBound: low, + UpperBound: up, + } + } + + partResult := func(name, low, up string) database.PartitionResult { + return partResultWithSchema("public", name, low, up) + } + + dailyTR := func(tr string) timeRange { + dtr := dateInUTC(tr) + etr := dtr.AddDate(0, 0, 1) + return timeRange{start: dtr, end: etr} + } + + tests := []struct { + name string + now time.Time + cfg config.Partition + extant []database.PartitionResult + expectCreate []partSpec + expectDrop []string + expectDetach []string + expectSweep []timeRange + expectCleanup []timeRange + expectErr error + }{ + { + name: "monthly base", + now: timeInUTC("2024-11-28 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "monthly", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"), + partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"), + partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"), + partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"), + }, + expectCreate: []partSpec{ + {name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}}, + {name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}}, + {name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}}, + }, + expectDrop: []string{ + "public.calls_p_2024_07", + "public.calls_p_2024_08", + }, + expectSweep: []timeRange{ + {start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")}, + {start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")}, + }, + expectCleanup: []timeRange{ + {start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")}, + {start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")}, + }, + expectDetach: []string{ + "public.calls_p_2024_07", + "public.calls_p_2024_08", + }, + }, + { + name: "monthly retain all", + now: timeInUTC("2024-11-28 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "monthly", + Retain: -1, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"), + partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"), + partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"), + partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"), + }, + expectCreate: []partSpec{ + {name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}}, + {name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}}, + {name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}}, + }, + expectDrop: []string{}, + expectSweep: []timeRange{}, + expectCleanup: []timeRange{}, + expectDetach: []string{}, + }, + + { + name: "weekly base", + now: timeInUTC("2024-11-28 11:37:04"), // week 48 + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "weekly", + Retain: 2, + Drop: false, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_w44", "2024-10-28", "2024-11-04"), + partResult("calls_p_2024_w45", "2024-11-04", "2024-11-11"), + partResult("calls_p_2024_w46", "2024-11-11", "2024-11-18"), + // missing week 47 + }, + expectCreate: []partSpec{ + {name: "calls_p_2024_w47", timeRange: timeRange{start: dateInUTC("2024-11-18"), end: dateInUTC("2024-11-25")}}, + {name: "calls_p_2024_w48", timeRange: timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-12-02")}}, + {name: "calls_p_2024_w49", timeRange: timeRange{start: dateInUTC("2024-12-02"), end: dateInUTC("2024-12-09")}}, + {name: "calls_p_2024_w50", timeRange: timeRange{start: dateInUTC("2024-12-09"), end: dateInUTC("2024-12-16")}}, + }, + expectSweep: []timeRange{ + {start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")}, + {start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")}, + }, + expectCleanup: []timeRange{ + {start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")}, + {start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")}, + }, + expectDetach: []string{ + "public.calls_p_2024_w44", + "public.calls_p_2024_w45", + }, + }, + { + name: "daily base", + now: timeInUTC("2024-12-31 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "daily", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_12_26", "2024-12-26", "2024-12-27"), + partResult("calls_p_2024_12_27", "2024-12-27", "2024-12-28"), + partResult("calls_p_2024_12_30", "2024-12-30", "2024-12-31"), + partResult("calls_p_2024_12_31", "2024-12-31", "2025-01-01"), + }, + expectCreate: []partSpec{ + {name: "calls_p_2024_12_29", timeRange: dailyTR("2024-12-29")}, + {name: "calls_p_2025_01_01", timeRange: dailyTR("2025-01-01")}, + {name: "calls_p_2025_01_02", timeRange: dailyTR("2025-01-02")}, + }, + expectDrop: []string{ + "public.calls_p_2024_12_26", + "public.calls_p_2024_12_27", + }, + expectSweep: []timeRange{ + {start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")}, + {start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")}, + }, + expectCleanup: []timeRange{ + {start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")}, + {start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")}, + }, + expectDetach: []string{ + "public.calls_p_2024_12_26", + "public.calls_p_2024_12_27", + }, + }, + { + name: "quarterly base", + now: timeInUTC("2025-07-28 11:37:04"), // q3 + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "quarterly", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_q3", "2024-07-01", "2024-10-01"), + partResult("calls_p_2024_q4", "2024-10-01", "2025-01-01"), + partResult("calls_p_2025_q1", "2025-01-01", "2024-04-01"), + partResult("calls_p_2025_q2", "2025-04-01", "2024-07-01"), + }, + expectDrop: []string{ + "public.calls_p_2024_q3", + "public.calls_p_2024_q4", + }, + expectSweep: []timeRange{ + {start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")}, + {start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")}, + }, + expectCleanup: []timeRange{ + {start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")}, + {start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")}, + }, + expectCreate: []partSpec{ + {name: "calls_p_2025_q3", timeRange: timeRange{dateInUTC("2025-07-01"), dateInUTC("2025-10-01")}}, + {name: "calls_p_2025_q4", timeRange: timeRange{dateInUTC("2025-10-01"), dateInUTC("2026-01-01")}}, + {name: "calls_p_2026_q1", timeRange: timeRange{dateInUTC("2026-01-01"), dateInUTC("2026-04-01")}}, + }, + expectDetach: []string{ + "public.calls_p_2024_q3", + "public.calls_p_2024_q4", + }, + }, + { + name: "yearly base", + now: timeInUTC("2023-04-28 11:37:04"), // q3 + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "yearly", + Retain: 3, + Drop: false, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2019", "2019-01-01", "2020-01-01"), + partResult("calls_p_2020", "2020-01-01", "2021-01-01"), + partResult("calls_p_2021", "2021-01-01", "2022-01-01"), + partResult("calls_p_2022", "2022-01-01", "2023-01-01"), + partResult("calls_p_2023", "2023-01-01", "2024-01-01"), + }, + expectDrop: []string{}, + expectSweep: []timeRange{ + {start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")}, + }, + expectCleanup: []timeRange{ + {start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")}, + }, + expectCreate: []partSpec{ + {name: "calls_p_2024", timeRange: timeRange{dateInUTC("2024-01-01"), dateInUTC("2025-01-01")}}, + {name: "calls_p_2025", timeRange: timeRange{dateInUTC("2025-01-01"), dateInUTC("2026-01-01")}}, + }, + expectDetach: []string{ + "public.calls_p_2019", + }, + }, + { + name: "changed monthly to daily", + now: timeInUTC("2024-11-28 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "daily", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"), + partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"), + partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"), + partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"), + }, + expectErr: partman.ErrDifferentInterval, + }, + { + name: "monthly wrong schema", + now: timeInUTC("2024-11-28 11:37:04"), + cfg: config.Partition{ + Enabled: true, + Schema: "public", + Interval: "monthly", + Retain: 2, + Drop: true, + PreProvision: common.PtrTo(2), + }, + extant: []database.PartitionResult{ + partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"), + partResultWithSchema("reid", "calls_p_2024_09", "2024-09-01", "2024-10-01"), + partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"), + partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"), + }, + expectErr: partman.ErrWrongSchema, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + db := mocks.NewStore(t) + createdPartitions := make([]partSpec, 0, len(tc.expectCreate)) + sweptRanges := make([]timeRange, 0, len(tc.expectSweep)) + droppedPartitions := make([]string, 0, len(tc.expectDrop)) + cleanupRanges := make([]timeRange, 0, len(tc.expectCleanup)) + detachedPartitions := make([]string, 0, len(tc.expectDetach)) + sweepMap := make(map[timeRange]struct{}) + + if len(tc.expectCreate) > 0 { + db.EXPECT(). + CreatePartition( + mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Time"), + mock.AnythingOfType("time.Time"), + ). + Run(func(ctx context.Context, tableName, partitionName string, start, end time.Time) { + ps := partSpec{name: partitionName, timeRange: timeRange{start: start, end: end}} + createdPartitions = append(createdPartitions, ps) + }).Return(nil) + } + + if len(tc.expectSweep) > 0 { + db.EXPECT(). + SweepCalls( + mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"), + ). + Run(func(ctx context.Context, start, end pgtype.Timestamptz) { + tr := timeRange{start: start.Time, end: end.Time} + sweepMap[tr] = struct{}{} + sweptRanges = append(sweptRanges, tr) + }).Return(30, nil) + } + + if len(tc.expectCleanup) > 0 { + db.EXPECT(). + CleanupSweptCalls( + mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"), + ).Run(func(ctx context.Context, start, end pgtype.Timestamptz) { + tr := timeRange{start: start.Time, end: end.Time} + require.Contains(t, sweepMap, tr) + + cleanupRanges = append(cleanupRanges, tr) + }).Return(30, nil) + } + + if tc.cfg.Drop && len(tc.expectDrop) > 0 { + db.EXPECT(). + DropPartition(mctx, mock.AnythingOfType("string")). + Run(func(ctx context.Context, partName string) { + droppedPartitions = append(droppedPartitions, partName) + }).Return(nil) + } + + if len(tc.expectDetach) > 0 { + db.EXPECT(). + DetachPartition( + mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string")). + Run(func(ctx context.Context, parentTable, partName string) { + detachedPartitions = append(detachedPartitions, partName) + }).Return(nil) + } + + inTx(db) + + db.EXPECT().GetTablePartitions(mctx, "public", "calls").Return(tc.extant, nil) + + pm, err := partman.New(db, tc.cfg) + require.NoError(t, err) + + err = pm.Check(ctx, tc.now) + if tc.expectErr != nil { + assert.ErrorIs(t, err, tc.expectErr) + } else { + require.NoError(t, err) + + assert.ElementsMatch(t, tc.expectCreate, createdPartitions, "created partitions") + assert.ElementsMatch(t, tc.expectSweep, sweptRanges, "swept ranges") + assert.ElementsMatch(t, tc.expectDrop, droppedPartitions, "dropped partitions") + assert.ElementsMatch(t, tc.expectCleanup, cleanupRanges, "cleaned up ranges") + assert.ElementsMatch(t, tc.expectDetach, detachedPartitions, "detached partitions") + } + }) + } +} diff --git a/pkg/database/querier.go b/pkg/database/querier.go index 167df43..edeba22 100644 --- a/pkg/database/querier.go +++ b/pkg/database/querier.go @@ -15,6 +15,7 @@ type Querier interface { AddAlert(ctx context.Context, arg AddAlertParams) error AddCall(ctx context.Context, arg AddCallParams) error AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error) + CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error) DeleteAPIKey(ctx context.Context, apiKey string) error @@ -42,6 +43,7 @@ type Querier interface { SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults + SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) UpdatePassword(ctx context.Context, username string, password string) error UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error) UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 286c5a5..6711638 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -205,7 +205,7 @@ func (n *notifier) Send(ctx context.Context, alerts []alert.Alert) error { return nil } -func New(cfg config.Notify) (Notifier, error) { +func New(cfg config.Notify) (*notifier, error) { n := new(notifier) for _, s := range cfg { diff --git a/pkg/rest/api.go b/pkg/rest/api.go index 8db4f98..2e9b5ee 100644 --- a/pkg/rest/api.go +++ b/pkg/rest/api.go @@ -20,7 +20,7 @@ type API interface { type api struct { } -func New() API { +func New() *api { s := new(api) return s diff --git a/pkg/server/server.go b/pkg/server/server.go index 57a9048..9166b23 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -10,6 +10,7 @@ import ( "dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/database/partman" "dynatron.me/x/stillbox/pkg/nexus" "dynatron.me/x/stillbox/pkg/notify" "dynatron.me/x/stillbox/pkg/rest" @@ -40,6 +41,7 @@ type Server struct { hup chan os.Signal tgs tgstore.Store rest rest.API + partman partman.PartitionManager } func New(ctx context.Context, cfg *config.Configuration) (*Server, error) { @@ -79,6 +81,18 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) { rest: api, } + if cfg.DB.Partition.Enabled { + srv.partman, err = partman.New(db, cfg.DB.Partition) + if err != nil { + return nil, err + } + + err = srv.partman.Check(ctx, time.Now()) + if err != nil { + return nil, err + } + } + srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true) srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false) @@ -128,6 +142,10 @@ func (s *Server) Go(ctx context.Context) error { go s.nex.Go(ctx) go s.alerter.Go(ctx) + if pm := s.partman; pm != nil { + go pm.Go(ctx) + } + var err error go func() { err = httpSrv.ListenAndServe() diff --git a/pkg/sinks/sinks.go b/pkg/sinks/sinks.go index 8259f30..4992a60 100644 --- a/pkg/sinks/sinks.go +++ b/pkg/sinks/sinks.go @@ -36,7 +36,7 @@ type sinks struct { sinks map[string]sinkInstance } -func NewSinkManager() Sinks { +func NewSinkManager() *sinks { return &sinks{ sinks: make(map[string]sinkInstance), } diff --git a/pkg/talkgroups/tgstore/store.go b/pkg/talkgroups/tgstore/store.go index 81ef67c..9c9a59b 100644 --- a/pkg/talkgroups/tgstore/store.go +++ b/pkg/talkgroups/tgstore/store.go @@ -150,7 +150,7 @@ type cache struct { } // NewCache returns a new cache Store. -func NewCache() Store { +func NewCache() *cache { tgc := &cache{ tgs: make(tgMap), systems: make(map[int32]string), diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index 14695d5..4f493b0 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -78,7 +78,33 @@ CREATE TABLE IF NOT EXISTS alerts( metadata JSONB ); -CREATE TABLE IF NOT EXISTS calls( +CREATE TABLE calls ( + id UUID, + submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, + system INTEGER NOT NULL, + talkgroup INTEGER NOT NULL, + call_date TIMESTAMPTZ NOT NULL, + audio_name TEXT, + audio_blob BYTEA, + duration INTEGER, + audio_type TEXT, + audio_url TEXT, + frequency INTEGER NOT NULL, + frequencies INTEGER[], + patches INTEGER[], + tg_label TEXT, + tg_alpha_tag TEXT, + tg_group TEXT, + source INTEGER NOT NULL, + transcript TEXT, + PRIMARY KEY (id, call_date), + FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) +) PARTITION BY RANGE (call_date); + +CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); +CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); + +CREATE TABLE swept_calls ( id UUID PRIMARY KEY, submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, system INTEGER NOT NULL, @@ -100,8 +126,9 @@ CREATE TABLE IF NOT EXISTS calls( FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) ); -CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); -CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); +CREATE INDEX IF NOT EXISTS swept_calls_transcript_idx ON swept_calls USING GIN (to_tsvector('english', transcript)); +CREATE INDEX IF NOT EXISTS swept_calls_call_date_tg_idx ON swept_calls(system, talkgroup, call_date); + CREATE TABLE IF NOT EXISTS settings( name TEXT PRIMARY KEY, @@ -125,8 +152,14 @@ CREATE INDEX IF NOT EXISTS incidents_name_description_idx ON incidents USING GIN ); CREATE TABLE IF NOT EXISTS incidents_calls( - incident_id UUID REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, - call_id UUID REFERENCES calls(id) ON UPDATE CASCADE, + incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, + call_id UUID NOT NULL, + calls_tbl_id UUID NULL, + swept_call_id UUID NULL REFERENCES swept_calls(id), + call_date TIMESTAMPTZ NULL, notes JSONB, + FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date), PRIMARY KEY (incident_id, call_id) ); + + diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index 5253556..1c581b1 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -56,3 +56,23 @@ VALUES -- name: GetDatabaseSize :one SELECT pg_size_pretty(pg_database_size(current_database())); + +-- name: SweepCalls :execrows +WITH to_sweep AS ( + SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, + audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript + FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) INSERT INTO swept_calls SELECT * FROM to_sweep; + +-- name: CleanupSweptCalls :execrows +WITH to_sweep AS ( + SELECT id FROM calls + JOIN incidents_calls ic ON ic.call_id = calls.id + WHERE calls.call_date >= @range_start AND calls.call_date < @range_end +) UPDATE incidents_calls + SET + swept_call_id = call_id, + calls_tbl_id = NULL + WHERE call_id IN (SELECT id FROM to_sweep); diff --git a/sql/sqlc.yaml b/sql/sqlc.yaml index c233814..0b51630 100644 --- a/sql/sqlc.yaml +++ b/sql/sqlc.yaml @@ -37,3 +37,6 @@ sql: import: "dynatron.me/x/stillbox/internal/jsontypes" type: "Metadata" nullable: true + - column: "pg_catalog.pg_tables.tablename" + go_type: string + nullable: false