diff --git a/config.sample.yaml b/config.sample.yaml index 4e66425..500cb1c 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -1,5 +1,18 @@ db: connect: 'postgres://postgres:password@localhost:5432/example' + partition: + # whether to enable the built-in partition manager + enabled: true + # the postgres schema containing our tables + schema: public + # daily|weekly|monthly|quarterly|yearly + interval: monthly + # number of partitions to retain, -1 to keep all + retain: 3 + # whether to drop or simply detach + drop: true + # number of partitions to prepare ahead + preProvision: 3 cors: allowedOrigins: - 'http://localhost:*' diff --git a/pkg/config/config.go b/pkg/config/config.go index bb2bf23..5a8d110 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,7 +51,6 @@ type Partition struct { Retain int `yaml:"retain"` PreProvision *int `yaml:"preProvision"` Drop bool `yaml:"detach"` - LocalTime bool `yaml:"local"` } type Logger struct { diff --git a/pkg/database/calls.sql.go b/pkg/database/calls.sql.go index daa7312..af8be5c 100644 --- a/pkg/database/calls.sql.go +++ b/pkg/database/calls.sql.go @@ -177,10 +177,11 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip const sweepCalls = `-- name: SweepCalls :execrows WITH to_sweep AS ( - SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, ic.call_date, notes FROM calls + SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript + FROM calls JOIN incidents_calls ic ON ic.call_id = calls.id WHERE calls.call_date >= $1 AND calls.call_date < $2 -) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, to_sweep.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, to_sweep.call_date, notes FROM to_sweep +) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript FROM to_sweep ` func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { diff --git a/pkg/database/database.go b/pkg/database/database.go index fa3a25b..b833adb 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -94,6 +94,8 @@ func NewClient(ctx context.Context, conf config.DB) (*Postgres, error) { return nil, err } + log.Debug().Err(err).Msg("migrations done") + m.Close() pgConf, err := pgxpool.ParseConfig(conf.Connect) diff --git a/pkg/database/mocks/Store.go b/pkg/database/mocks/Store.go index 4efa7ea..564dcd1 100644 --- a/pkg/database/mocks/Store.go +++ b/pkg/database/mocks/Store.go @@ -641,17 +641,17 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string) return _c } -// DetachPartition provides a mock function with given fields: ctx, partitionName -func (_m *Store) DetachPartition(ctx context.Context, partitionName string) error { - ret := _m.Called(ctx, partitionName) +// DetachPartition provides a mock function with given fields: ctx, parentTable, partitionName +func (_m *Store) DetachPartition(ctx context.Context, parentTable string, partitionName string) error { + ret := _m.Called(ctx, parentTable, partitionName) if len(ret) == 0 { panic("no return value specified for DetachPartition") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, partitionName) + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, parentTable, partitionName) } else { r0 = ret.Error(0) } @@ -666,14 +666,15 @@ type Store_DetachPartition_Call struct { // DetachPartition is a helper method to define mock.On call // - ctx context.Context +// - parentTable string // - partitionName string -func (_e *Store_Expecter) DetachPartition(ctx interface{}, partitionName interface{}) *Store_DetachPartition_Call { - return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, partitionName)} +func (_e *Store_Expecter) DetachPartition(ctx interface{}, parentTable interface{}, partitionName interface{}) *Store_DetachPartition_Call { + return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, parentTable, partitionName)} } -func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DetachPartition_Call { +func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string)) *Store_DetachPartition_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) + run(args[0].(context.Context), args[1].(string), args[2].(string)) }) return _c } @@ -683,7 +684,7 @@ func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_C return _c } -func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DetachPartition_Call { +func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string, string) error) *Store_DetachPartition_Call { _c.Call.Return(run) return _c } diff --git a/pkg/database/partitions.go b/pkg/database/partitions.go index f93c749..636c549 100644 --- a/pkg/database/partitions.go +++ b/pkg/database/partitions.go @@ -25,7 +25,7 @@ type PartitionResult struct { type partitionsQuerier interface { GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error - DetachPartition(ctx context.Context, partitionName string) error + DetachPartition(ctx context.Context, parentTable, partitionName string) error DropPartition(ctx context.Context, partitionName string) error } @@ -65,19 +65,20 @@ func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName } func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error { - _, err := q.db.Exec(ctx, `CREATE TABLE $1 PARTITION OF $2 FOR VALUES FROM ($2) TO ($3);`, partitionName, parentTable, start, end) + const boundFmt = "2006-01-02 00:00:00Z00" + _, err := q.db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');`, partitionName, parentTable, start.Format(boundFmt), end.Format(boundFmt))) return err } func (q *Queries) DropPartition(ctx context.Context, partitionName string) error { - _, err := q.db.Exec(ctx, `DROP TABLE $1;`, partitionName) + _, err := q.db.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, partitionName)) return err } -func (q *Queries) DetachPartition(ctx context.Context, partitionName string) error { - _, err := q.db.Exec(ctx, `ALTER TABLE $1 DETACH PARTITION;`, partitionName) +func (q *Queries) DetachPartition(ctx context.Context, parentTable, partitionName string) error { + _, err := q.db.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s;`, parentTable, partitionName)) return err } @@ -105,12 +106,12 @@ func (partition PartitionResult) ParseBounds() (lowerBound time.Time, upperBound } func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { - lowerBound, err = time.Parse("2006-01-02", partition.LowerBound) + lowerBound, err = time.ParseInLocation("2006-01-02", partition.LowerBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err) } - upperBound, err = time.Parse("2006-01-02", partition.UpperBound) + upperBound, err = time.ParseInLocation("2006-01-02", partition.UpperBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err) } @@ -119,12 +120,12 @@ func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Ti } func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { - lowerBound, err = time.Parse("2006-01-02 15:04:05", partition.LowerBound) + lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.LowerBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err) } - upperBound, err = time.Parse("2006-01-02 15:04:05", partition.UpperBound) + upperBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.UpperBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err) } @@ -133,12 +134,12 @@ func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound tim } func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { - lowerBound, err = time.Parse("2006-01-02 15:04:05Z07", partition.LowerBound) + lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.LowerBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err) } - upperBound, err = time.Parse("2006-01-02 15:04:05Z07", partition.UpperBound) + upperBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.UpperBound, time.UTC) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err) } diff --git a/pkg/database/partman/partman.go b/pkg/database/partman/partman.go index ec8c04c..e158a13 100644 --- a/pkg/database/partman/partman.go +++ b/pkg/database/partman/partman.go @@ -259,7 +259,7 @@ func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p part log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls") log.Info().Str("partition", fullPartName).Msg("detaching partition") - err = tx.DetachPartition(ctx, fullPartName) + err = tx.DetachPartition(ctx, callsTable, fullPartName) if err != nil { return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index 0011a23..9166b23 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,18 +64,6 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) { return nil, err } - pm, err := partman.New(db, cfg.DB.Partition) - if err != nil { - return nil, err - } - - err = pm.Check(ctx, time.Now()) - if err != nil { - return nil, err - } - - go pm.Go(ctx) - tgCache := tgstore.NewCache() api := rest.New() @@ -91,7 +79,18 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) { tgs: tgCache, sinks: sinks.NewSinkManager(), rest: api, - partman: pm, + } + + if cfg.DB.Partition.Enabled { + srv.partman, err = partman.New(db, cfg.DB.Partition) + if err != nil { + return nil, err + } + + err = srv.partman.Check(ctx, time.Now()) + if err != nil { + return nil, err + } } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true) @@ -143,6 +142,10 @@ func (s *Server) Go(ctx context.Context) error { go s.nex.Go(ctx) go s.alerter.Go(ctx) + if pm := s.partman; pm != nil { + go pm.Go(ctx) + } + var err error go func() { err = httpSrv.ListenAndServe() diff --git a/sql/postgres/migrations/001_initial.up.sql b/sql/postgres/migrations/001_initial.up.sql index 14695d5..4f493b0 100644 --- a/sql/postgres/migrations/001_initial.up.sql +++ b/sql/postgres/migrations/001_initial.up.sql @@ -78,7 +78,33 @@ CREATE TABLE IF NOT EXISTS alerts( metadata JSONB ); -CREATE TABLE IF NOT EXISTS calls( +CREATE TABLE calls ( + id UUID, + submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, + system INTEGER NOT NULL, + talkgroup INTEGER NOT NULL, + call_date TIMESTAMPTZ NOT NULL, + audio_name TEXT, + audio_blob BYTEA, + duration INTEGER, + audio_type TEXT, + audio_url TEXT, + frequency INTEGER NOT NULL, + frequencies INTEGER[], + patches INTEGER[], + tg_label TEXT, + tg_alpha_tag TEXT, + tg_group TEXT, + source INTEGER NOT NULL, + transcript TEXT, + PRIMARY KEY (id, call_date), + FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) +) PARTITION BY RANGE (call_date); + +CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); +CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); + +CREATE TABLE swept_calls ( id UUID PRIMARY KEY, submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, system INTEGER NOT NULL, @@ -100,8 +126,9 @@ CREATE TABLE IF NOT EXISTS calls( FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) ); -CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); -CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); +CREATE INDEX IF NOT EXISTS swept_calls_transcript_idx ON swept_calls USING GIN (to_tsvector('english', transcript)); +CREATE INDEX IF NOT EXISTS swept_calls_call_date_tg_idx ON swept_calls(system, talkgroup, call_date); + CREATE TABLE IF NOT EXISTS settings( name TEXT PRIMARY KEY, @@ -125,8 +152,14 @@ CREATE INDEX IF NOT EXISTS incidents_name_description_idx ON incidents USING GIN ); CREATE TABLE IF NOT EXISTS incidents_calls( - incident_id UUID REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, - call_id UUID REFERENCES calls(id) ON UPDATE CASCADE, + incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, + call_id UUID NOT NULL, + calls_tbl_id UUID NULL, + swept_call_id UUID NULL REFERENCES swept_calls(id), + call_date TIMESTAMPTZ NULL, notes JSONB, + FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date), PRIMARY KEY (incident_id, call_id) ); + + diff --git a/sql/postgres/migrations/002_partition.up.sql b/sql/postgres/migrations/002_partition.up.sql deleted file mode 100644 index 1c63d25..0000000 --- a/sql/postgres/migrations/002_partition.up.sql +++ /dev/null @@ -1,81 +0,0 @@ -BEGIN; - -ALTER TABLE calls RENAME TO calls_unpart; - -CREATE TABLE calls ( - id UUID, - submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, - system INTEGER NOT NULL, - talkgroup INTEGER NOT NULL, - call_date TIMESTAMPTZ NOT NULL, - audio_name TEXT, - audio_blob BYTEA, - duration INTEGER, - audio_type TEXT, - audio_url TEXT, - frequency INTEGER NOT NULL, - frequencies INTEGER[], - patches INTEGER[], - tg_label TEXT, - tg_alpha_tag TEXT, - tg_group TEXT, - source INTEGER NOT NULL, - transcript TEXT, - PRIMARY KEY (id, call_date), - FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) -) PARTITION BY RANGE (call_date); - --- for our prod -create table calls_p_2024_07 partition of calls for values from ('2024-07-01 00:00:00-00') to ('2024-08-01 00:00:00-00'); -create table calls_p_2024_08 partition of calls for values from ('2024-08-01 00:00:00-00') to ('2024-09-01 00:00:00-00'); -create table calls_p_2024_09 partition of calls for values from ('2024-09-01 00:00:00-00') to ('2024-10-01 00:00:00-00'); -create table calls_p_2024_10 partition of calls for values from ('2024-10-01 00:00:00-00') to ('2024-11-01 00:00:00-00'); -create table calls_p_2024_11 partition of calls for values from ('2024-11-01 00:00:00-00') to ('2024-12-01 00:00:00-00'); -create table calls_p_2024_12 partition of calls for values from ('2024-12-01 00:00:00-00') to ('2025-01-01 00:00:00-00'); - - -insert into calls (id, submitter, system, talkgroup, call_date, audio_name, audio_blob, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration) select id, submitter, system, talkgroup, call_date, audio_name, audio_blob, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration from calls_unpart; - -drop table calls_unpart cascade; - -CREATE TABLE swept_calls ( - id UUID PRIMARY KEY, - submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, - system INTEGER NOT NULL, - talkgroup INTEGER NOT NULL, - call_date TIMESTAMPTZ NOT NULL, - audio_name TEXT, - audio_blob BYTEA, - duration INTEGER, - audio_type TEXT, - audio_url TEXT, - frequency INTEGER NOT NULL, - frequencies INTEGER[], - patches INTEGER[], - tg_label TEXT, - tg_alpha_tag TEXT, - tg_group TEXT, - source INTEGER NOT NULL, - transcript TEXT, - FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) -); - -DROP TABLE IF EXISTS incidents_calls; -- DATA LOSS - -CREATE TABLE IF NOT EXISTS incidents_calls( - incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, - call_id UUID NOT NULL, - calls_tbl_id UUID NULL, - swept_call_id UUID NULL REFERENCES swept_calls(id), - call_date TIMESTAMPTZ NULL, - notes JSONB, - FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date), - PRIMARY KEY (incident_id, call_id) -); - - --- ALTER TABLE incidents_calls ADD COLUMN call_date TIMESTAMPTZ NOT NULL; --- ALTER TABLE incidents_calls DROP CONSTRAINT incidents_calls_call_id_fkey; --- ALTER TABLE incidents_calls ADD CONSTRAINT incidents_calls_call_id_call_date_fkey FOREIGN KEY (call_id, call_date) REFERENCES calls(id, call_date) ON UPDATE CASCADE; - -COMMIT; diff --git a/sql/postgres/queries/calls.sql b/sql/postgres/queries/calls.sql index 4720797..1c581b1 100644 --- a/sql/postgres/queries/calls.sql +++ b/sql/postgres/queries/calls.sql @@ -59,7 +59,9 @@ SELECT pg_size_pretty(pg_database_size(current_database())); -- name: SweepCalls :execrows WITH to_sweep AS ( - SELECT * FROM calls + SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, + audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript + FROM calls JOIN incidents_calls ic ON ic.call_id = calls.id WHERE calls.call_date >= @range_start AND calls.call_date < @range_end ) INSERT INTO swept_calls SELECT * FROM to_sweep;