This commit is contained in:
Daniel Ponte 2024-12-01 02:28:27 -05:00
parent 16caa0a3a4
commit 68f542d23a
11 changed files with 99 additions and 125 deletions

View file

@ -1,5 +1,18 @@
db: db:
connect: 'postgres://postgres:password@localhost:5432/example' connect: 'postgres://postgres:password@localhost:5432/example'
partition:
# whether to enable the built-in partition manager
enabled: true
# the postgres schema containing our tables
schema: public
# daily|weekly|monthly|quarterly|yearly
interval: monthly
# number of partitions to retain, -1 to keep all
retain: 3
# whether to drop or simply detach
drop: true
# number of partitions to prepare ahead
preProvision: 3
cors: cors:
allowedOrigins: allowedOrigins:
- 'http://localhost:*' - 'http://localhost:*'

View file

@ -51,7 +51,6 @@ type Partition struct {
Retain int `yaml:"retain"` Retain int `yaml:"retain"`
PreProvision *int `yaml:"preProvision"` PreProvision *int `yaml:"preProvision"`
Drop bool `yaml:"detach"` Drop bool `yaml:"detach"`
LocalTime bool `yaml:"local"`
} }
type Logger struct { type Logger struct {

View file

@ -177,10 +177,11 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip
const sweepCalls = `-- name: SweepCalls :execrows const sweepCalls = `-- name: SweepCalls :execrows
WITH to_sweep AS ( WITH to_sweep AS (
SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, ic.call_date, notes FROM calls SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= $1 AND calls.call_date < $2 WHERE calls.call_date >= $1 AND calls.call_date < $2
) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, to_sweep.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, incident_id, call_id, calls_tbl_id, swept_call_id, to_sweep.call_date, notes FROM to_sweep ) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript FROM to_sweep
` `
func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) { func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {

View file

@ -94,6 +94,8 @@ func NewClient(ctx context.Context, conf config.DB) (*Postgres, error) {
return nil, err return nil, err
} }
log.Debug().Err(err).Msg("migrations done")
m.Close() m.Close()
pgConf, err := pgxpool.ParseConfig(conf.Connect) pgConf, err := pgxpool.ParseConfig(conf.Connect)

View file

@ -641,17 +641,17 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string)
return _c return _c
} }
// DetachPartition provides a mock function with given fields: ctx, partitionName // DetachPartition provides a mock function with given fields: ctx, parentTable, partitionName
func (_m *Store) DetachPartition(ctx context.Context, partitionName string) error { func (_m *Store) DetachPartition(ctx context.Context, parentTable string, partitionName string) error {
ret := _m.Called(ctx, partitionName) ret := _m.Called(ctx, parentTable, partitionName)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for DetachPartition") panic("no return value specified for DetachPartition")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, partitionName) r0 = rf(ctx, parentTable, partitionName)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -666,14 +666,15 @@ type Store_DetachPartition_Call struct {
// DetachPartition is a helper method to define mock.On call // DetachPartition is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - parentTable string
// - partitionName string // - partitionName string
func (_e *Store_Expecter) DetachPartition(ctx interface{}, partitionName interface{}) *Store_DetachPartition_Call { func (_e *Store_Expecter) DetachPartition(ctx interface{}, parentTable interface{}, partitionName interface{}) *Store_DetachPartition_Call {
return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, partitionName)} return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, parentTable, partitionName)}
} }
func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DetachPartition_Call { func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string)) *Store_DetachPartition_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string)) run(args[0].(context.Context), args[1].(string), args[2].(string))
}) })
return _c return _c
} }
@ -683,7 +684,7 @@ func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_C
return _c return _c
} }
func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DetachPartition_Call { func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string, string) error) *Store_DetachPartition_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View file

@ -25,7 +25,7 @@ type PartitionResult struct {
type partitionsQuerier interface { type partitionsQuerier interface {
GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error) GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error)
CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error
DetachPartition(ctx context.Context, partitionName string) error DetachPartition(ctx context.Context, parentTable, partitionName string) error
DropPartition(ctx context.Context, partitionName string) error DropPartition(ctx context.Context, partitionName string) error
} }
@ -65,19 +65,20 @@ func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName
} }
func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error { func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error {
_, err := q.db.Exec(ctx, `CREATE TABLE $1 PARTITION OF $2 FOR VALUES FROM ($2) TO ($3);`, partitionName, parentTable, start, end) const boundFmt = "2006-01-02 00:00:00Z00"
_, err := q.db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');`, partitionName, parentTable, start.Format(boundFmt), end.Format(boundFmt)))
return err return err
} }
func (q *Queries) DropPartition(ctx context.Context, partitionName string) error { func (q *Queries) DropPartition(ctx context.Context, partitionName string) error {
_, err := q.db.Exec(ctx, `DROP TABLE $1;`, partitionName) _, err := q.db.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, partitionName))
return err return err
} }
func (q *Queries) DetachPartition(ctx context.Context, partitionName string) error { func (q *Queries) DetachPartition(ctx context.Context, parentTable, partitionName string) error {
_, err := q.db.Exec(ctx, `ALTER TABLE $1 DETACH PARTITION;`, partitionName) _, err := q.db.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s;`, parentTable, partitionName))
return err return err
} }
@ -105,12 +106,12 @@ func (partition PartitionResult) ParseBounds() (lowerBound time.Time, upperBound
} }
func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.Parse("2006-01-02", partition.LowerBound) lowerBound, err = time.ParseInLocation("2006-01-02", partition.LowerBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err)
} }
upperBound, err = time.Parse("2006-01-02", partition.UpperBound) upperBound, err = time.ParseInLocation("2006-01-02", partition.UpperBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err)
} }
@ -119,12 +120,12 @@ func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Ti
} }
func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.Parse("2006-01-02 15:04:05", partition.LowerBound) lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.LowerBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err)
} }
upperBound, err = time.Parse("2006-01-02 15:04:05", partition.UpperBound) upperBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.UpperBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err)
} }
@ -133,12 +134,12 @@ func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound tim
} }
func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) { func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
lowerBound, err = time.Parse("2006-01-02 15:04:05Z07", partition.LowerBound) lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.LowerBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err)
} }
upperBound, err = time.Parse("2006-01-02 15:04:05Z07", partition.UpperBound) upperBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.UpperBound, time.UTC)
if err != nil { if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err) return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err)
} }

View file

@ -259,7 +259,7 @@ func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p part
log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls") log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls")
log.Info().Str("partition", fullPartName).Msg("detaching partition") log.Info().Str("partition", fullPartName).Msg("detaching partition")
err = tx.DetachPartition(ctx, fullPartName) err = tx.DetachPartition(ctx, callsTable, fullPartName)
if err != nil { if err != nil {
return err return err
} }

View file

@ -64,18 +64,6 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
return nil, err return nil, err
} }
pm, err := partman.New(db, cfg.DB.Partition)
if err != nil {
return nil, err
}
err = pm.Check(ctx, time.Now())
if err != nil {
return nil, err
}
go pm.Go(ctx)
tgCache := tgstore.NewCache() tgCache := tgstore.NewCache()
api := rest.New() api := rest.New()
@ -91,7 +79,18 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
tgs: tgCache, tgs: tgCache,
sinks: sinks.NewSinkManager(), sinks: sinks.NewSinkManager(),
rest: api, rest: api,
partman: pm, }
if cfg.DB.Partition.Enabled {
srv.partman, err = partman.New(db, cfg.DB.Partition)
if err != nil {
return nil, err
}
err = srv.partman.Check(ctx, time.Now())
if err != nil {
return nil, err
}
} }
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true)
@ -143,6 +142,10 @@ func (s *Server) Go(ctx context.Context) error {
go s.nex.Go(ctx) go s.nex.Go(ctx)
go s.alerter.Go(ctx) go s.alerter.Go(ctx)
if pm := s.partman; pm != nil {
go pm.Go(ctx)
}
var err error var err error
go func() { go func() {
err = httpSrv.ListenAndServe() err = httpSrv.ListenAndServe()

View file

@ -78,7 +78,33 @@ CREATE TABLE IF NOT EXISTS alerts(
metadata JSONB metadata JSONB
); );
CREATE TABLE IF NOT EXISTS calls( CREATE TABLE calls (
id UUID,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL,
talkgroup INTEGER NOT NULL,
call_date TIMESTAMPTZ NOT NULL,
audio_name TEXT,
audio_blob BYTEA,
duration INTEGER,
audio_type TEXT,
audio_url TEXT,
frequency INTEGER NOT NULL,
frequencies INTEGER[],
patches INTEGER[],
tg_label TEXT,
tg_alpha_tag TEXT,
tg_group TEXT,
source INTEGER NOT NULL,
transcript TEXT,
PRIMARY KEY (id, call_date),
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
) PARTITION BY RANGE (call_date);
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript));
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date);
CREATE TABLE swept_calls (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL, submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL, system INTEGER NOT NULL,
@ -100,8 +126,9 @@ CREATE TABLE IF NOT EXISTS calls(
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid) FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
); );
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript)); CREATE INDEX IF NOT EXISTS swept_calls_transcript_idx ON swept_calls USING GIN (to_tsvector('english', transcript));
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date); CREATE INDEX IF NOT EXISTS swept_calls_call_date_tg_idx ON swept_calls(system, talkgroup, call_date);
CREATE TABLE IF NOT EXISTS settings( CREATE TABLE IF NOT EXISTS settings(
name TEXT PRIMARY KEY, name TEXT PRIMARY KEY,
@ -125,8 +152,14 @@ CREATE INDEX IF NOT EXISTS incidents_name_description_idx ON incidents USING GIN
); );
CREATE TABLE IF NOT EXISTS incidents_calls( CREATE TABLE IF NOT EXISTS incidents_calls(
incident_id UUID REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE, incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE,
call_id UUID REFERENCES calls(id) ON UPDATE CASCADE, call_id UUID NOT NULL,
calls_tbl_id UUID NULL,
swept_call_id UUID NULL REFERENCES swept_calls(id),
call_date TIMESTAMPTZ NULL,
notes JSONB, notes JSONB,
FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date),
PRIMARY KEY (incident_id, call_id) PRIMARY KEY (incident_id, call_id)
); );

View file

@ -1,81 +0,0 @@
BEGIN;
ALTER TABLE calls RENAME TO calls_unpart;
CREATE TABLE calls (
id UUID,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL,
talkgroup INTEGER NOT NULL,
call_date TIMESTAMPTZ NOT NULL,
audio_name TEXT,
audio_blob BYTEA,
duration INTEGER,
audio_type TEXT,
audio_url TEXT,
frequency INTEGER NOT NULL,
frequencies INTEGER[],
patches INTEGER[],
tg_label TEXT,
tg_alpha_tag TEXT,
tg_group TEXT,
source INTEGER NOT NULL,
transcript TEXT,
PRIMARY KEY (id, call_date),
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
) PARTITION BY RANGE (call_date);
-- for our prod
create table calls_p_2024_07 partition of calls for values from ('2024-07-01 00:00:00-00') to ('2024-08-01 00:00:00-00');
create table calls_p_2024_08 partition of calls for values from ('2024-08-01 00:00:00-00') to ('2024-09-01 00:00:00-00');
create table calls_p_2024_09 partition of calls for values from ('2024-09-01 00:00:00-00') to ('2024-10-01 00:00:00-00');
create table calls_p_2024_10 partition of calls for values from ('2024-10-01 00:00:00-00') to ('2024-11-01 00:00:00-00');
create table calls_p_2024_11 partition of calls for values from ('2024-11-01 00:00:00-00') to ('2024-12-01 00:00:00-00');
create table calls_p_2024_12 partition of calls for values from ('2024-12-01 00:00:00-00') to ('2025-01-01 00:00:00-00');
insert into calls (id, submitter, system, talkgroup, call_date, audio_name, audio_blob, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration) select id, submitter, system, talkgroup, call_date, audio_name, audio_blob, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript, duration from calls_unpart;
drop table calls_unpart cascade;
CREATE TABLE swept_calls (
id UUID PRIMARY KEY,
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL,
talkgroup INTEGER NOT NULL,
call_date TIMESTAMPTZ NOT NULL,
audio_name TEXT,
audio_blob BYTEA,
duration INTEGER,
audio_type TEXT,
audio_url TEXT,
frequency INTEGER NOT NULL,
frequencies INTEGER[],
patches INTEGER[],
tg_label TEXT,
tg_alpha_tag TEXT,
tg_group TEXT,
source INTEGER NOT NULL,
transcript TEXT,
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
);
DROP TABLE IF EXISTS incidents_calls; -- DATA LOSS
CREATE TABLE IF NOT EXISTS incidents_calls(
incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE,
call_id UUID NOT NULL,
calls_tbl_id UUID NULL,
swept_call_id UUID NULL REFERENCES swept_calls(id),
call_date TIMESTAMPTZ NULL,
notes JSONB,
FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date),
PRIMARY KEY (incident_id, call_id)
);
-- ALTER TABLE incidents_calls ADD COLUMN call_date TIMESTAMPTZ NOT NULL;
-- ALTER TABLE incidents_calls DROP CONSTRAINT incidents_calls_call_id_fkey;
-- ALTER TABLE incidents_calls ADD CONSTRAINT incidents_calls_call_id_call_date_fkey FOREIGN KEY (call_id, call_date) REFERENCES calls(id, call_date) ON UPDATE CASCADE;
COMMIT;

View file

@ -59,7 +59,9 @@ SELECT pg_size_pretty(pg_database_size(current_database()));
-- name: SweepCalls :execrows -- name: SweepCalls :execrows
WITH to_sweep AS ( WITH to_sweep AS (
SELECT * FROM calls SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type,
audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
FROM calls
JOIN incidents_calls ic ON ic.call_id = calls.id JOIN incidents_calls ic ON ic.call_id = calls.id
WHERE calls.call_date >= @range_start AND calls.call_date < @range_end WHERE calls.call_date >= @range_start AND calls.call_date < @range_end
) INSERT INTO swept_calls SELECT * FROM to_sweep; ) INSERT INTO swept_calls SELECT * FROM to_sweep;