Partitioning (#60)
Closes #13 Reviewed-on: #60 Co-authored-by: Daniel Ponte <amigan@gmail.com> Co-committed-by: Daniel Ponte <amigan@gmail.com>
This commit is contained in:
parent
8ecc7939f6
commit
03ebf74abe
27 changed files with 2450 additions and 29 deletions
|
@ -8,3 +8,4 @@ packages:
|
|||
config:
|
||||
interfaces:
|
||||
Store:
|
||||
DBTX:
|
||||
|
|
|
@ -1,5 +1,18 @@
|
|||
db:
|
||||
connect: 'postgres://postgres:password@localhost:5432/example'
|
||||
partition:
|
||||
# whether to enable the built-in partition manager
|
||||
enabled: true
|
||||
# the postgres schema containing our tables
|
||||
schema: public
|
||||
# daily|weekly|monthly|quarterly|yearly
|
||||
interval: monthly
|
||||
# number of partitions to retain, -1 to keep all
|
||||
retain: 3
|
||||
# whether to drop or simply detach
|
||||
drop: true
|
||||
# number of partitions to prepare ahead
|
||||
preProvision: 3
|
||||
cors:
|
||||
allowedOrigins:
|
||||
- 'http://localhost:*'
|
||||
|
|
22
internal/isoweek/LICENSE
Normal file
22
internal/isoweek/LICENSE
Normal file
|
@ -0,0 +1,22 @@
|
|||
License for github.com/snabb/isoweek:
|
||||
|
||||
Copyright © 2016-2023 Janne Snabb snabb AT epipe.com
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included
|
||||
in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
93
internal/isoweek/isoweek.go
Normal file
93
internal/isoweek/isoweek.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
// Package isoweek calculates a starting date and time of [ISO 8601] week.
|
||||
//
|
||||
// ISO 8601 standard defines the common [week number] system used in Europe
|
||||
// and many other countries. Monday is the first day of a week.
|
||||
//
|
||||
// The Go standard library [time] package has [time.Time.ISOWeek] function
|
||||
// for getting ISO 8601 week number of a given [time.Time], but there is no
|
||||
// reverse functionality for getting a date from a week number. This package
|
||||
// implements that.
|
||||
//
|
||||
// Invalid input is silently accepted. There is a separate [Validate]
|
||||
// function if week number validation is needed.
|
||||
//
|
||||
// There are also functions for working with [Julian day numbers]. Using Julian
|
||||
// day numbers is often the easiest and fastest way to do date calculations.
|
||||
//
|
||||
// This package does not work with the "traditional" week system used in
|
||||
// US/Canada/Japan/etc. (weeks starting on Sundays). However the Julian day
|
||||
// number functions may be still useful.
|
||||
//
|
||||
// [ISO 8601]: https://en.wikipedia.org/wiki/ISO_8601
|
||||
// [week number]: https://en.wikipedia.org/wiki/ISO_week_date
|
||||
// [Julian day numbers]: https://en.wikipedia.org/wiki/Julian_day
|
||||
package isoweek
|
||||
|
||||
import "time"
|
||||
|
||||
// ISOWeekday returns the ISO 8601 weekday number of given day.
|
||||
// (1 = Mon, 2 = Tue,.. 7 = Sun)
|
||||
//
|
||||
// This is different from Go's standard [time.Weekday].
|
||||
func ISOWeekday(year int, month time.Month, day int) (weekday int) {
|
||||
// Richards, E. G. (2013) pp. 592, 618
|
||||
|
||||
return DateToJulian(year, month, day)%7 + 1
|
||||
}
|
||||
|
||||
// startOffset returns the offset (in days) from the start of a year to
|
||||
// Monday of the given week. Offset may be negative.
|
||||
func startOffset(y, week int) (offset int) {
|
||||
// This is optimized version of the following:
|
||||
//
|
||||
// return week*7 - ISOWeekday(y, 1, 4) - 3
|
||||
//
|
||||
// Uses Tomohiko Sakamoto's algorithm for calculating the weekday.
|
||||
|
||||
y = y - 1
|
||||
return week*7 - (y+y/4-y/100+y/400+3)%7 - 4
|
||||
}
|
||||
|
||||
// StartTime returns the starting time (Monday 00:00) of the given
|
||||
// ISO 8601 week.
|
||||
func StartTime(wyear, week int, loc *time.Location) (start time.Time) {
|
||||
y, m, d := StartDate(wyear, week)
|
||||
return time.Date(y, m, d, 0, 0, 0, 0, loc)
|
||||
}
|
||||
|
||||
// StartDate returns the starting date (Monday) of the given ISO 8601 week.
|
||||
func StartDate(wyear, week int) (year int, month time.Month, day int) {
|
||||
return JulianToDate(
|
||||
DateToJulian(wyear, 1, 1) + startOffset(wyear, week))
|
||||
}
|
||||
|
||||
// ordinalInYear returns the ordinal (within a year) day number.
|
||||
func ordinalInYear(year int, month time.Month, day int) (dayNo int) {
|
||||
return DateToJulian(year, month, day) - DateToJulian(year, 1, 1) + 1
|
||||
}
|
||||
|
||||
// FromDate returns ISO 8601 week number of a date.
|
||||
func FromDate(year int, month time.Month, day int) (wyear, week int) {
|
||||
week = (ordinalInYear(year, month, day) - ISOWeekday(year, month, day) + 10) / 7
|
||||
if week < 1 {
|
||||
return FromDate(year-1, 12, 31) // last week of preceding year
|
||||
}
|
||||
if week == 53 &&
|
||||
DateToJulian(StartDate(year+1, 1)) <= DateToJulian(year, month, day) {
|
||||
return year + 1, 1 // first week of following year
|
||||
}
|
||||
return year, week
|
||||
}
|
||||
|
||||
// Validate checks if a week number is valid. Returns true if it is valid.
|
||||
func Validate(wyear, week int) (ok bool) {
|
||||
if week < 1 || week > 53 {
|
||||
return false
|
||||
}
|
||||
wyear2, week2 := FromDate(StartDate(wyear, week))
|
||||
|
||||
if wyear == wyear2 && week == week2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
96
internal/isoweek/isoweek_test.go
Normal file
96
internal/isoweek/isoweek_test.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package isoweek_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dynatron.me/x/stillbox/internal/isoweek"
|
||||
)
|
||||
|
||||
// TestISOWeekday tests all days from year 1 until year 4000.
|
||||
// Ensures that behaviour matches the Go standard library Weekday.
|
||||
func TestISOWeekday(test *testing.T) {
|
||||
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
var wd1, wd2 int
|
||||
for t.Year() < 4000 {
|
||||
wd1 = int(t.Weekday())
|
||||
wd2 = isoweek.ISOWeekday(t.Date())
|
||||
|
||||
if wd2 == 7 {
|
||||
wd2 = 0
|
||||
}
|
||||
if wd1 != wd2 {
|
||||
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
|
||||
}
|
||||
t = t.AddDate(0, 0, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleISOWeekday() {
|
||||
fmt.Println(isoweek.ISOWeekday(1984, 1, 1))
|
||||
// Output: 7
|
||||
}
|
||||
|
||||
// TestStartTime tests all weeks from year 1 until year 4000.
|
||||
// Ensures that behaviour matches the Go standard library ISOWeek.
|
||||
func TestStartTime(test *testing.T) {
|
||||
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
for t.Weekday() != time.Monday {
|
||||
t = t.AddDate(0, 0, 1)
|
||||
}
|
||||
for t.Year() < 4000 {
|
||||
wy, ww := t.ISOWeek()
|
||||
wst := isoweek.StartTime(wy, ww, time.UTC)
|
||||
if !wst.Equal(t) {
|
||||
test.Errorf("mismatch: %v != %v (wy = %d, ww = %d)",
|
||||
t, wst, wy, ww)
|
||||
}
|
||||
t = t.AddDate(0, 0, 7)
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleStartTime() {
|
||||
t := isoweek.StartTime(1985, 1, time.UTC)
|
||||
fmt.Println(t)
|
||||
// Output: 1984-12-31 00:00:00 +0000 UTC
|
||||
}
|
||||
|
||||
func ExampleStartDate() {
|
||||
y, m, d := isoweek.StartDate(2000, 1)
|
||||
fmt.Println(d, m, y)
|
||||
// Output: 3 January 2000
|
||||
}
|
||||
|
||||
// TestFromDate tests all days from year 1 until year 4000.
|
||||
// Ensures that behaviour matches the Go standard library ISOWeek.
|
||||
func TestFromDate(test *testing.T) {
|
||||
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
for t.Year() < 4000 {
|
||||
wy, ww := t.ISOWeek()
|
||||
wy2, ww2 := isoweek.FromDate(t.Date())
|
||||
if wy != wy2 || ww != ww2 {
|
||||
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
|
||||
}
|
||||
t = t.AddDate(0, 0, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleFromDate() {
|
||||
fmt.Println(isoweek.FromDate(1984, 1, 1))
|
||||
// Output: 1983 52
|
||||
}
|
||||
|
||||
func ExampleValidate() {
|
||||
fmt.Println(
|
||||
isoweek.Validate(2015, 52), isoweek.Validate(2015, 53),
|
||||
isoweek.Validate(2015, 54), isoweek.Validate(2016, 0),
|
||||
isoweek.Validate(2016, 1))
|
||||
fmt.Println(
|
||||
isoweek.Validate(2016, 52), isoweek.Validate(2016, 53),
|
||||
isoweek.Validate(2016, 54), isoweek.Validate(2017, 0),
|
||||
isoweek.Validate(2017, 1))
|
||||
// Output:
|
||||
// true true false false true
|
||||
// true false false false true
|
||||
}
|
32
internal/isoweek/julian.go
Normal file
32
internal/isoweek/julian.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
package isoweek
|
||||
|
||||
import "time"
|
||||
|
||||
// DateToJulian converts a date to a Julian day number.
|
||||
func DateToJulian(year int, month time.Month, day int) (jdn int) {
|
||||
// Claus Tøndering's Calendar FAQ
|
||||
// http://www.tondering.dk/claus/cal/julperiod.php#formula
|
||||
|
||||
if month < 3 {
|
||||
year = year - 1
|
||||
month = month + 12
|
||||
}
|
||||
year = year + 4800
|
||||
|
||||
return day + (153*(int(month)-3)+2)/5 + 365*year +
|
||||
year/4 - year/100 + year/400 - 32045
|
||||
}
|
||||
|
||||
// JulianToDate converts a Julian day number to a date.
|
||||
func JulianToDate(jdn int) (year int, month time.Month, day int) {
|
||||
// Richards, E. G. (2013) pp. 585–624
|
||||
|
||||
e := 4*(jdn+1401+(4*jdn+274277)/146097*3/4-38) + 3
|
||||
h := e%1461/4*5 + 2
|
||||
|
||||
day = h%153/5 + 1
|
||||
month = time.Month((h/153+2)%12 + 1)
|
||||
year = e/1461 - 4716 + (14-int(month))/12
|
||||
|
||||
return year, month, day
|
||||
}
|
52
internal/isoweek/julian_test.go
Normal file
52
internal/isoweek/julian_test.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package isoweek_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dynatron.me/x/stillbox/internal/isoweek"
|
||||
)
|
||||
|
||||
func TestJulianToDate(test *testing.T) {
|
||||
j := isoweek.DateToJulian(1, time.January, 1)
|
||||
|
||||
for {
|
||||
y, m, d := isoweek.JulianToDate(j)
|
||||
if y >= 4000 {
|
||||
break
|
||||
}
|
||||
if j != isoweek.DateToJulian(y, m, d) {
|
||||
test.Errorf("mismatch on %04d-%02d-%02d", y, m, d)
|
||||
}
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
func TestDateToJulian(test *testing.T) {
|
||||
t := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
for t.Year() < 4000 {
|
||||
j := isoweek.DateToJulian(t.Date())
|
||||
|
||||
y, m, d := isoweek.JulianToDate(j)
|
||||
|
||||
if y != t.Year() || m != t.Month() || d != t.Day() {
|
||||
test.Errorf("mismatch on %s", t.Format("2006-01-02"))
|
||||
}
|
||||
if j+1 != isoweek.DateToJulian(y, m, d+1) {
|
||||
test.Errorf("mismatch 2 on %s", t.Format("2006-01-02"))
|
||||
}
|
||||
t = t.AddDate(0, 0, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleDateToJulian() {
|
||||
fmt.Println(isoweek.DateToJulian(2006, 1, 2))
|
||||
// Output: 2453738
|
||||
}
|
||||
|
||||
func ExampleJulianToDate() {
|
||||
fmt.Println(isoweek.JulianToDate(2453738))
|
||||
// Output: 2006 January 2
|
||||
}
|
|
@ -41,6 +41,16 @@ type CORS struct {
|
|||
type DB struct {
|
||||
Connect string `yaml:"connect"`
|
||||
LogQueries bool `yaml:"logQueries"`
|
||||
Partition Partition `yaml:"partition"`
|
||||
}
|
||||
|
||||
type Partition struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Schema string `yaml:"schema"`
|
||||
Interval string `yaml:"interval"`
|
||||
Retain int `yaml:"retain"`
|
||||
PreProvision *int `yaml:"preProvision"`
|
||||
Drop bool `yaml:"detach"`
|
||||
}
|
||||
|
||||
type Logger struct {
|
||||
|
|
|
@ -40,10 +40,13 @@ func (c *Configuration) read() error {
|
|||
return err
|
||||
}
|
||||
|
||||
k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string {
|
||||
err = k.Load(env.Provider(common.EnvPrefix, ".", func(s string) string {
|
||||
return strings.Replace(strings.ToLower(
|
||||
strings.TrimPrefix(s, common.EnvPrefix)), "_", ".", -1)
|
||||
}), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = k.UnmarshalWithConf("", &c.Config,
|
||||
koanf.UnmarshalConf{
|
||||
|
|
|
@ -135,6 +135,26 @@ func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
const cleanupSweptCalls = `-- name: CleanupSweptCalls :execrows
|
||||
WITH to_sweep AS (
|
||||
SELECT id FROM calls
|
||||
JOIN incidents_calls ic ON ic.call_id = calls.id
|
||||
WHERE calls.call_date >= $1 AND calls.call_date < $2
|
||||
) UPDATE incidents_calls
|
||||
SET
|
||||
swept_call_id = call_id,
|
||||
calls_tbl_id = NULL
|
||||
WHERE call_id IN (SELECT id FROM to_sweep)
|
||||
`
|
||||
|
||||
func (q *Queries) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, cleanupSweptCalls, rangeStart, rangeEnd)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const getDatabaseSize = `-- name: GetDatabaseSize :one
|
||||
SELECT pg_size_pretty(pg_database_size(current_database()))
|
||||
`
|
||||
|
@ -154,3 +174,20 @@ func (q *Queries) SetCallTranscript(ctx context.Context, iD uuid.UUID, transcrip
|
|||
_, err := q.db.Exec(ctx, setCallTranscript, iD, transcript)
|
||||
return err
|
||||
}
|
||||
|
||||
const sweepCalls = `-- name: SweepCalls :execrows
|
||||
WITH to_sweep AS (
|
||||
SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
|
||||
FROM calls
|
||||
JOIN incidents_calls ic ON ic.call_id = calls.id
|
||||
WHERE calls.call_date >= $1 AND calls.call_date < $2
|
||||
) INSERT INTO swept_calls SELECT id, submitter, system, talkgroup, call_date, audio_name, audio_blob, duration, audio_type, audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript FROM to_sweep
|
||||
`
|
||||
|
||||
func (q *Queries) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, sweepCalls, rangeStart, rangeEnd)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
|
|
@ -23,21 +23,27 @@ import (
|
|||
type Store interface {
|
||||
Querier
|
||||
talkgroupQuerier
|
||||
partitionsQuerier
|
||||
|
||||
DB() *Database
|
||||
DB() *Postgres
|
||||
DBTX() DBTX
|
||||
InTx(context.Context, func(Store) error, pgx.TxOptions) error
|
||||
}
|
||||
|
||||
type Database struct {
|
||||
type Postgres struct {
|
||||
*pgxpool.Pool
|
||||
*Queries
|
||||
}
|
||||
|
||||
func (db *Database) DB() *Database {
|
||||
func (q *Queries) DBTX() DBTX {
|
||||
return q.db
|
||||
}
|
||||
|
||||
func (db *Postgres) DB() *Postgres {
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error {
|
||||
func (db *Postgres) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOptions) error {
|
||||
tx, err := db.DB().Pool.BeginTx(ctx, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Tx begin: %w", err)
|
||||
|
@ -46,7 +52,7 @@ func (db *Database) InTx(ctx context.Context, f func(Store) error, opts pgx.TxOp
|
|||
//nolint:errcheck
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
dbtx := &Database{Pool: db.Pool, Queries: db.Queries.WithTx(tx)}
|
||||
dbtx := &Postgres{Pool: db.Pool, Queries: db.Queries.WithTx(tx)}
|
||||
|
||||
err = f(dbtx)
|
||||
if err != nil {
|
||||
|
@ -68,11 +74,11 @@ func (m dbLogger) Log(ctx context.Context, level tracelog.LogLevel, msg string,
|
|||
}
|
||||
|
||||
func Close(c Store) {
|
||||
c.(*Database).Pool.Close()
|
||||
c.(*Postgres).Pool.Close()
|
||||
}
|
||||
|
||||
// NewClient creates a new DB using the provided config.
|
||||
func NewClient(ctx context.Context, conf config.DB) (Store, error) {
|
||||
func NewClient(ctx context.Context, conf config.DB) (*Postgres, error) {
|
||||
dir, err := iofs.New(sqlembed.Migrations, "postgres/migrations")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -88,6 +94,8 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug().Err(err).Msg("migrations done")
|
||||
|
||||
m.Close()
|
||||
|
||||
pgConf, err := pgxpool.ParseConfig(conf.Connect)
|
||||
|
@ -107,7 +115,7 @@ func NewClient(ctx context.Context, conf config.DB) (Store, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
db := &Database{
|
||||
db := &Postgres{
|
||||
Pool: pool,
|
||||
Queries: New(pool),
|
||||
}
|
||||
|
|
287
pkg/database/mocks/DBTX.go
Normal file
287
pkg/database/mocks/DBTX.go
Normal file
|
@ -0,0 +1,287 @@
|
|||
// Code generated by mockery v2.47.0. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
pgconn "github.com/jackc/pgx/v5/pgconn"
|
||||
|
||||
pgx "github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
// DBTX is an autogenerated mock type for the DBTX type
|
||||
type DBTX struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type DBTX_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *DBTX) EXPECT() *DBTX_Expecter {
|
||||
return &DBTX_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// Exec provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *DBTX) Exec(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgconn.CommandTag, error) {
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, _a0, _a1)
|
||||
_ca = append(_ca, _a2...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for Exec")
|
||||
}
|
||||
|
||||
var r0 pgconn.CommandTag
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)); ok {
|
||||
return rf(_a0, _a1, _a2...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgconn.CommandTag); ok {
|
||||
r0 = rf(_a0, _a1, _a2...)
|
||||
} else {
|
||||
r0 = ret.Get(0).(pgconn.CommandTag)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
|
||||
r1 = rf(_a0, _a1, _a2...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// DBTX_Exec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exec'
|
||||
type DBTX_Exec_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Exec is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 string
|
||||
// - _a2 ...interface{}
|
||||
func (_e *DBTX_Expecter) Exec(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Exec_Call {
|
||||
return &DBTX_Exec_Call{Call: _e.mock.On("Exec",
|
||||
append([]interface{}{_a0, _a1}, _a2...)...)}
|
||||
}
|
||||
|
||||
func (_c *DBTX_Exec_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Exec_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]interface{}, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(interface{})
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(string), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_Exec_Call) Return(_a0 pgconn.CommandTag, _a1 error) *DBTX_Exec_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_Exec_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgconn.CommandTag, error)) *DBTX_Exec_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Query provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *DBTX) Query(_a0 context.Context, _a1 string, _a2 ...interface{}) (pgx.Rows, error) {
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, _a0, _a1)
|
||||
_ca = append(_ca, _a2...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for Query")
|
||||
}
|
||||
|
||||
var r0 pgx.Rows
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (pgx.Rows, error)); ok {
|
||||
return rf(_a0, _a1, _a2...)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Rows); ok {
|
||||
r0 = rf(_a0, _a1, _a2...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(pgx.Rows)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
|
||||
r1 = rf(_a0, _a1, _a2...)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// DBTX_Query_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Query'
|
||||
type DBTX_Query_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Query is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 string
|
||||
// - _a2 ...interface{}
|
||||
func (_e *DBTX_Expecter) Query(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_Query_Call {
|
||||
return &DBTX_Query_Call{Call: _e.mock.On("Query",
|
||||
append([]interface{}{_a0, _a1}, _a2...)...)}
|
||||
}
|
||||
|
||||
func (_c *DBTX_Query_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_Query_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]interface{}, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(interface{})
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(string), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_Query_Call) Return(_a0 pgx.Rows, _a1 error) *DBTX_Query_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_Query_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (pgx.Rows, error)) *DBTX_Query_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// QueryRow provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *DBTX) QueryRow(_a0 context.Context, _a1 string, _a2 ...interface{}) pgx.Row {
|
||||
var _ca []interface{}
|
||||
_ca = append(_ca, _a0, _a1)
|
||||
_ca = append(_ca, _a2...)
|
||||
ret := _m.Called(_ca...)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for QueryRow")
|
||||
}
|
||||
|
||||
var r0 pgx.Row
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) pgx.Row); ok {
|
||||
r0 = rf(_a0, _a1, _a2...)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(pgx.Row)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// DBTX_QueryRow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRow'
|
||||
type DBTX_QueryRow_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// QueryRow is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 string
|
||||
// - _a2 ...interface{}
|
||||
func (_e *DBTX_Expecter) QueryRow(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *DBTX_QueryRow_Call {
|
||||
return &DBTX_QueryRow_Call{Call: _e.mock.On("QueryRow",
|
||||
append([]interface{}{_a0, _a1}, _a2...)...)}
|
||||
}
|
||||
|
||||
func (_c *DBTX_QueryRow_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *DBTX_QueryRow_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
variadicArgs := make([]interface{}, len(args)-2)
|
||||
for i, a := range args[2:] {
|
||||
if a != nil {
|
||||
variadicArgs[i] = a.(interface{})
|
||||
}
|
||||
}
|
||||
run(args[0].(context.Context), args[1].(string), variadicArgs...)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_QueryRow_Call) Return(_a0 pgx.Row) *DBTX_QueryRow_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_QueryRow_Call) RunAndReturn(run func(context.Context, string, ...interface{}) pgx.Row) *DBTX_QueryRow_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// SendBatch provides a mock function with given fields: _a0, _a1
|
||||
func (_m *DBTX) SendBatch(_a0 context.Context, _a1 *pgx.Batch) pgx.BatchResults {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for SendBatch")
|
||||
}
|
||||
|
||||
var r0 pgx.BatchResults
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *pgx.Batch) pgx.BatchResults); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(pgx.BatchResults)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// DBTX_SendBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendBatch'
|
||||
type DBTX_SendBatch_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SendBatch is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
// - _a1 *pgx.Batch
|
||||
func (_e *DBTX_Expecter) SendBatch(_a0 interface{}, _a1 interface{}) *DBTX_SendBatch_Call {
|
||||
return &DBTX_SendBatch_Call{Call: _e.mock.On("SendBatch", _a0, _a1)}
|
||||
}
|
||||
|
||||
func (_c *DBTX_SendBatch_Call) Run(run func(_a0 context.Context, _a1 *pgx.Batch)) *DBTX_SendBatch_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(*pgx.Batch))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_SendBatch_Call) Return(_a0 pgx.BatchResults) *DBTX_SendBatch_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *DBTX_SendBatch_Call) RunAndReturn(run func(context.Context, *pgx.Batch) pgx.BatchResults) *DBTX_SendBatch_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewDBTX creates a new instance of DBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewDBTX(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *DBTX {
|
||||
mock := &DBTX{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -12,6 +12,8 @@ import (
|
|||
|
||||
pgx "github.com/jackc/pgx/v5"
|
||||
|
||||
time "time"
|
||||
|
||||
uuid "github.com/google/uuid"
|
||||
)
|
||||
|
||||
|
@ -227,6 +229,64 @@ func (_c *Store_BulkSetTalkgroupTags_Call) RunAndReturn(run func(context.Context
|
|||
return _c
|
||||
}
|
||||
|
||||
// CleanupSweptCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd
|
||||
func (_m *Store) CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
|
||||
ret := _m.Called(ctx, rangeStart, rangeEnd)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for CleanupSweptCalls")
|
||||
}
|
||||
|
||||
var r0 int64
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok {
|
||||
return rf(ctx, rangeStart, rangeEnd)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok {
|
||||
r0 = rf(ctx, rangeStart, rangeEnd)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok {
|
||||
r1 = rf(ctx, rangeStart, rangeEnd)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Store_CleanupSweptCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanupSweptCalls'
|
||||
type Store_CleanupSweptCalls_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CleanupSweptCalls is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - rangeStart pgtype.Timestamptz
|
||||
// - rangeEnd pgtype.Timestamptz
|
||||
func (_e *Store_Expecter) CleanupSweptCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_CleanupSweptCalls_Call {
|
||||
return &Store_CleanupSweptCalls_Call{Call: _e.mock.On("CleanupSweptCalls", ctx, rangeStart, rangeEnd)}
|
||||
}
|
||||
|
||||
func (_c *Store_CleanupSweptCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_CleanupSweptCalls_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_CleanupSweptCalls_Call) Return(_a0 int64, _a1 error) *Store_CleanupSweptCalls_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_CleanupSweptCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_CleanupSweptCalls_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CreateAPIKey provides a mock function with given fields: ctx, owner, expires, disabled
|
||||
func (_m *Store) CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (database.ApiKey, error) {
|
||||
ret := _m.Called(ctx, owner, expires, disabled)
|
||||
|
@ -286,6 +346,56 @@ func (_c *Store_CreateAPIKey_Call) RunAndReturn(run func(context.Context, int, p
|
|||
return _c
|
||||
}
|
||||
|
||||
// CreatePartition provides a mock function with given fields: ctx, parentTable, partitionName, start, end
|
||||
func (_m *Store) CreatePartition(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time) error {
|
||||
ret := _m.Called(ctx, parentTable, partitionName, start, end)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for CreatePartition")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, time.Time, time.Time) error); ok {
|
||||
r0 = rf(ctx, parentTable, partitionName, start, end)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_CreatePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreatePartition'
|
||||
type Store_CreatePartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CreatePartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - parentTable string
|
||||
// - partitionName string
|
||||
// - start time.Time
|
||||
// - end time.Time
|
||||
func (_e *Store_Expecter) CreatePartition(ctx interface{}, parentTable interface{}, partitionName interface{}, start interface{}, end interface{}) *Store_CreatePartition_Call {
|
||||
return &Store_CreatePartition_Call{Call: _e.mock.On("CreatePartition", ctx, parentTable, partitionName, start, end)}
|
||||
}
|
||||
|
||||
func (_c *Store_CreatePartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string, start time.Time, end time.Time)) *Store_CreatePartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(time.Time), args[4].(time.Time))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_CreatePartition_Call) Return(_a0 error) *Store_CreatePartition_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_CreatePartition_Call) RunAndReturn(run func(context.Context, string, string, time.Time, time.Time) error) *Store_CreatePartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// CreateUser provides a mock function with given fields: ctx, arg
|
||||
func (_m *Store) CreateUser(ctx context.Context, arg database.CreateUserParams) (database.User, error) {
|
||||
ret := _m.Called(ctx, arg)
|
||||
|
@ -344,19 +454,19 @@ func (_c *Store_CreateUser_Call) RunAndReturn(run func(context.Context, database
|
|||
}
|
||||
|
||||
// DB provides a mock function with given fields:
|
||||
func (_m *Store) DB() *database.Database {
|
||||
func (_m *Store) DB() *database.Postgres {
|
||||
ret := _m.Called()
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DB")
|
||||
}
|
||||
|
||||
var r0 *database.Database
|
||||
if rf, ok := ret.Get(0).(func() *database.Database); ok {
|
||||
var r0 *database.Postgres
|
||||
if rf, ok := ret.Get(0).(func() *database.Postgres); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*database.Database)
|
||||
r0 = ret.Get(0).(*database.Postgres)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -380,12 +490,59 @@ func (_c *Store_DB_Call) Run(run func()) *Store_DB_Call {
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DB_Call) Return(_a0 *database.Database) *Store_DB_Call {
|
||||
func (_c *Store_DB_Call) Return(_a0 *database.Postgres) *Store_DB_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DB_Call) RunAndReturn(run func() *database.Database) *Store_DB_Call {
|
||||
func (_c *Store_DB_Call) RunAndReturn(run func() *database.Postgres) *Store_DB_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DBTX provides a mock function with given fields:
|
||||
func (_m *Store) DBTX() database.DBTX {
|
||||
ret := _m.Called()
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DBTX")
|
||||
}
|
||||
|
||||
var r0 database.DBTX
|
||||
if rf, ok := ret.Get(0).(func() database.DBTX); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(database.DBTX)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_DBTX_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DBTX'
|
||||
type Store_DBTX_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DBTX is a helper method to define mock.On call
|
||||
func (_e *Store_Expecter) DBTX() *Store_DBTX_Call {
|
||||
return &Store_DBTX_Call{Call: _e.mock.On("DBTX")}
|
||||
}
|
||||
|
||||
func (_c *Store_DBTX_Call) Run(run func()) *Store_DBTX_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DBTX_Call) Return(_a0 database.DBTX) *Store_DBTX_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DBTX_Call) RunAndReturn(run func() database.DBTX) *Store_DBTX_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
@ -484,6 +641,101 @@ func (_c *Store_DeleteUser_Call) RunAndReturn(run func(context.Context, string)
|
|||
return _c
|
||||
}
|
||||
|
||||
// DetachPartition provides a mock function with given fields: ctx, parentTable, partitionName
|
||||
func (_m *Store) DetachPartition(ctx context.Context, parentTable string, partitionName string) error {
|
||||
ret := _m.Called(ctx, parentTable, partitionName)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DetachPartition")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
|
||||
r0 = rf(ctx, parentTable, partitionName)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_DetachPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetachPartition'
|
||||
type Store_DetachPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DetachPartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - parentTable string
|
||||
// - partitionName string
|
||||
func (_e *Store_Expecter) DetachPartition(ctx interface{}, parentTable interface{}, partitionName interface{}) *Store_DetachPartition_Call {
|
||||
return &Store_DetachPartition_Call{Call: _e.mock.On("DetachPartition", ctx, parentTable, partitionName)}
|
||||
}
|
||||
|
||||
func (_c *Store_DetachPartition_Call) Run(run func(ctx context.Context, parentTable string, partitionName string)) *Store_DetachPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string), args[2].(string))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DetachPartition_Call) Return(_a0 error) *Store_DetachPartition_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DetachPartition_Call) RunAndReturn(run func(context.Context, string, string) error) *Store_DetachPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DropPartition provides a mock function with given fields: ctx, partitionName
|
||||
func (_m *Store) DropPartition(ctx context.Context, partitionName string) error {
|
||||
ret := _m.Called(ctx, partitionName)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for DropPartition")
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
|
||||
r0 = rf(ctx, partitionName)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Store_DropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPartition'
|
||||
type Store_DropPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DropPartition is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - partitionName string
|
||||
func (_e *Store_Expecter) DropPartition(ctx interface{}, partitionName interface{}) *Store_DropPartition_Call {
|
||||
return &Store_DropPartition_Call{Call: _e.mock.On("DropPartition", ctx, partitionName)}
|
||||
}
|
||||
|
||||
func (_c *Store_DropPartition_Call) Run(run func(ctx context.Context, partitionName string)) *Store_DropPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DropPartition_Call) Return(_a0 error) *Store_DropPartition_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_DropPartition_Call) RunAndReturn(run func(context.Context, string) error) *Store_DropPartition_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetAPIKey provides a mock function with given fields: ctx, apiKey
|
||||
func (_m *Store) GetAPIKey(ctx context.Context, apiKey string) (database.ApiKey, error) {
|
||||
ret := _m.Called(ctx, apiKey)
|
||||
|
@ -654,6 +906,66 @@ func (_c *Store_GetSystemName_Call) RunAndReturn(run func(context.Context, int)
|
|||
return _c
|
||||
}
|
||||
|
||||
// GetTablePartitions provides a mock function with given fields: ctx, schemaName, tableName
|
||||
func (_m *Store) GetTablePartitions(ctx context.Context, schemaName string, tableName string) ([]database.PartitionResult, error) {
|
||||
ret := _m.Called(ctx, schemaName, tableName)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for GetTablePartitions")
|
||||
}
|
||||
|
||||
var r0 []database.PartitionResult
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string) ([]database.PartitionResult, error)); ok {
|
||||
return rf(ctx, schemaName, tableName)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string) []database.PartitionResult); ok {
|
||||
r0 = rf(ctx, schemaName, tableName)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]database.PartitionResult)
|
||||
}
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
|
||||
r1 = rf(ctx, schemaName, tableName)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Store_GetTablePartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTablePartitions'
|
||||
type Store_GetTablePartitions_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// GetTablePartitions is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - schemaName string
|
||||
// - tableName string
|
||||
func (_e *Store_Expecter) GetTablePartitions(ctx interface{}, schemaName interface{}, tableName interface{}) *Store_GetTablePartitions_Call {
|
||||
return &Store_GetTablePartitions_Call{Call: _e.mock.On("GetTablePartitions", ctx, schemaName, tableName)}
|
||||
}
|
||||
|
||||
func (_c *Store_GetTablePartitions_Call) Run(run func(ctx context.Context, schemaName string, tableName string)) *Store_GetTablePartitions_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(string), args[2].(string))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_GetTablePartitions_Call) Return(_a0 []database.PartitionResult, _a1 error) *Store_GetTablePartitions_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_GetTablePartitions_Call) RunAndReturn(run func(context.Context, string, string) ([]database.PartitionResult, error)) *Store_GetTablePartitions_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetTalkgroup provides a mock function with given fields: ctx, systemID, tGID
|
||||
func (_m *Store) GetTalkgroup(ctx context.Context, systemID int32, tGID int32) (database.GetTalkgroupRow, error) {
|
||||
ret := _m.Called(ctx, systemID, tGID)
|
||||
|
@ -1958,6 +2270,64 @@ func (_c *Store_StoreTGVersion_Call) RunAndReturn(run func(context.Context, []da
|
|||
return _c
|
||||
}
|
||||
|
||||
// SweepCalls provides a mock function with given fields: ctx, rangeStart, rangeEnd
|
||||
func (_m *Store) SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error) {
|
||||
ret := _m.Called(ctx, rangeStart, rangeEnd)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for SweepCalls")
|
||||
}
|
||||
|
||||
var r0 int64
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)); ok {
|
||||
return rf(ctx, rangeStart, rangeEnd)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) int64); ok {
|
||||
r0 = rf(ctx, rangeStart, rangeEnd)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) error); ok {
|
||||
r1 = rf(ctx, rangeStart, rangeEnd)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Store_SweepCalls_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SweepCalls'
|
||||
type Store_SweepCalls_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SweepCalls is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - rangeStart pgtype.Timestamptz
|
||||
// - rangeEnd pgtype.Timestamptz
|
||||
func (_e *Store_Expecter) SweepCalls(ctx interface{}, rangeStart interface{}, rangeEnd interface{}) *Store_SweepCalls_Call {
|
||||
return &Store_SweepCalls_Call{Call: _e.mock.On("SweepCalls", ctx, rangeStart, rangeEnd)}
|
||||
}
|
||||
|
||||
func (_c *Store_SweepCalls_Call) Run(run func(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz)) *Store_SweepCalls_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].(pgtype.Timestamptz), args[2].(pgtype.Timestamptz))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_SweepCalls_Call) Return(_a0 int64, _a1 error) *Store_SweepCalls_Call {
|
||||
_c.Call.Return(_a0, _a1)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *Store_SweepCalls_Call) RunAndReturn(run func(context.Context, pgtype.Timestamptz, pgtype.Timestamptz) (int64, error)) *Store_SweepCalls_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdatePassword provides a mock function with given fields: ctx, username, password
|
||||
func (_m *Store) UpdatePassword(ctx context.Context, username string, password string) error {
|
||||
ret := _m.Called(ctx, username, password)
|
||||
|
|
|
@ -55,6 +55,111 @@ type Call struct {
|
|||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type CallsP202407 struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type CallsP202408 struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type CallsP202409 struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type CallsP202410 struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type CallsP202411 struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type Incident struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
|
@ -68,6 +173,9 @@ type Incident struct {
|
|||
type IncidentsCall struct {
|
||||
IncidentID uuid.UUID `json:"incident_id,omitempty"`
|
||||
CallID uuid.UUID `json:"call_id,omitempty"`
|
||||
CallsTblID pgtype.UUID `json:"calls_tbl_id,omitempty"`
|
||||
SweptCallID pgtype.UUID `json:"swept_call_id,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
Notes []byte `json:"notes,omitempty"`
|
||||
}
|
||||
|
||||
|
@ -77,6 +185,27 @@ type Setting struct {
|
|||
Value []byte `json:"value,omitempty"`
|
||||
}
|
||||
|
||||
type SweptCall struct {
|
||||
ID uuid.UUID `json:"id,omitempty"`
|
||||
Submitter *int32 `json:"submitter,omitempty"`
|
||||
System int `json:"system,omitempty"`
|
||||
Talkgroup int `json:"talkgroup,omitempty"`
|
||||
CallDate pgtype.Timestamptz `json:"call_date,omitempty"`
|
||||
AudioName *string `json:"audio_name,omitempty"`
|
||||
AudioBlob []byte `json:"audio_blob,omitempty"`
|
||||
Duration *int32 `json:"duration,omitempty"`
|
||||
AudioType *string `json:"audio_type,omitempty"`
|
||||
AudioUrl *string `json:"audio_url,omitempty"`
|
||||
Frequency int `json:"frequency,omitempty"`
|
||||
Frequencies []int `json:"frequencies,omitempty"`
|
||||
Patches []int `json:"patches,omitempty"`
|
||||
TGLabel *string `json:"tg_label,omitempty"`
|
||||
TGAlphaTag *string `json:"tg_alpha_tag,omitempty"`
|
||||
TGGroup *string `json:"tg_group,omitempty"`
|
||||
Source int `json:"source,omitempty"`
|
||||
Transcript *string `json:"transcript,omitempty"`
|
||||
}
|
||||
|
||||
type System struct {
|
||||
ID int `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
|
|
160
pkg/database/partitions.go
Normal file
160
pkg/database/partitions.go
Normal file
|
@ -0,0 +1,160 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrLowerBoundAfterUpperBound = errors.New("lower bound after upper bound")
|
||||
ErrCantDecodePartitionBounds = errors.New("cannot decode partition bounds")
|
||||
)
|
||||
|
||||
type PartitionResult struct {
|
||||
ParentTable string
|
||||
Schema string
|
||||
Name string
|
||||
LowerBound string
|
||||
UpperBound string
|
||||
}
|
||||
|
||||
type partitionsQuerier interface {
|
||||
GetTablePartitions(ctx context.Context, schemaName, tableName string) ([]PartitionResult, error)
|
||||
CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error
|
||||
DetachPartition(ctx context.Context, parentTable, partitionName string) error
|
||||
DropPartition(ctx context.Context, partitionName string) error
|
||||
}
|
||||
|
||||
func (q *Queries) GetTablePartitions(ctx context.Context, schemaName, tableName string) (partitions []PartitionResult, err error) {
|
||||
query := fmt.Sprintf(`
|
||||
WITH parts as (
|
||||
SELECT
|
||||
relnamespace::regnamespace as schema,
|
||||
c.oid::pg_catalog.regclass AS part_name,
|
||||
regexp_match(pg_get_expr(c.relpartbound, c.oid),
|
||||
'FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)') AS bounds
|
||||
FROM
|
||||
pg_catalog.pg_class c JOIN pg_catalog.pg_inherits i ON (c.oid = i.inhrelid)
|
||||
WHERE i.inhparent = '%s.%s'::regclass
|
||||
AND c.relkind='r'
|
||||
)
|
||||
SELECT
|
||||
schema,
|
||||
part_name as name,
|
||||
'%s' as parentTable,
|
||||
bounds[1]::text AS lowerBound,
|
||||
bounds[2]::text AS upperBound
|
||||
FROM parts
|
||||
ORDER BY part_name;`, schemaName, tableName, tableName)
|
||||
|
||||
rows, err := q.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get partitions: %w", err)
|
||||
}
|
||||
|
||||
partitions, err = pgx.CollectRows(rows, pgx.RowToStructByName[PartitionResult])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to cast list: %w", err)
|
||||
}
|
||||
|
||||
return partitions, nil
|
||||
}
|
||||
|
||||
func (q *Queries) CreatePartition(ctx context.Context, parentTable, partitionName string, start, end time.Time) error {
|
||||
const boundFmt = "2006-01-02 00:00:00Z00"
|
||||
_, err := q.db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s');`, partitionName, parentTable, start.Format(boundFmt), end.Format(boundFmt)))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *Queries) DropPartition(ctx context.Context, partitionName string) error {
|
||||
_, err := q.db.Exec(ctx, fmt.Sprintf(`DROP TABLE %s;`, partitionName))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *Queries) DetachPartition(ctx context.Context, parentTable, partitionName string) error {
|
||||
_, err := q.db.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DETACH PARTITION %s;`, parentTable, partitionName))
|
||||
return err
|
||||
}
|
||||
|
||||
func (partition PartitionResult) ParseBounds() (lowerBound time.Time, upperBound time.Time, err error) {
|
||||
lowerBound, upperBound, err = parseBoundAsDate(partition)
|
||||
if err == nil {
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
lowerBound, upperBound, err = parseBoundAsDateTime(partition)
|
||||
if err == nil {
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
lowerBound, upperBound, err = parseBoundAsDateTimeWithTimezone(partition)
|
||||
if err == nil {
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
if lowerBound.After(lowerBound) {
|
||||
return time.Time{}, time.Time{}, ErrLowerBoundAfterUpperBound
|
||||
}
|
||||
|
||||
return time.Time{}, time.Time{}, ErrCantDecodePartitionBounds
|
||||
}
|
||||
|
||||
func parseBoundAsDate(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
|
||||
lowerBound, err = time.ParseInLocation("2006-01-02", partition.LowerBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as date: %w", err)
|
||||
}
|
||||
|
||||
upperBound, err = time.ParseInLocation("2006-01-02", partition.UpperBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as date: %w", err)
|
||||
}
|
||||
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
func parseBoundAsDateTime(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
|
||||
lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.LowerBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime: %w", err)
|
||||
}
|
||||
|
||||
upperBound, err = time.ParseInLocation("2006-01-02 15:04:05", partition.UpperBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime: %w", err)
|
||||
}
|
||||
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
func parseBoundAsDateTimeWithTimezone(partition PartitionResult) (lowerBound, upperBound time.Time, err error) {
|
||||
lowerBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.LowerBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse lowerbound as datetime with timezone: %w", err)
|
||||
}
|
||||
|
||||
upperBound, err = time.ParseInLocation("2006-01-02 15:04:05Z07", partition.UpperBound, time.UTC)
|
||||
if err != nil {
|
||||
return time.Time{}, time.Time{}, fmt.Errorf("can't parse upperbound as datetime with timezone: %w", err)
|
||||
}
|
||||
|
||||
lowerBound = convertToDateTimeWithoutTimezone(lowerBound)
|
||||
upperBound = convertToDateTimeWithoutTimezone(upperBound)
|
||||
|
||||
return lowerBound, upperBound, nil
|
||||
}
|
||||
|
||||
func convertToDateTimeWithoutTimezone(bound time.Time) time.Time {
|
||||
parsedTime, err := time.Parse("2006-01-02 15:04:05", bound.UTC().Format("2006-01-02 15:04:05"))
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
return parsedTime
|
||||
}
|
152
pkg/database/partman/intervals.go
Normal file
152
pkg/database/partman/intervals.go
Normal file
|
@ -0,0 +1,152 @@
|
|||
package partman
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
daysInWeek = 7
|
||||
monthsInQuarter = 3
|
||||
)
|
||||
|
||||
func getDailyBounds(date time.Time) (lowerBound, upperBound time.Time) {
|
||||
lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
|
||||
upperBound = lowerBound.AddDate(0, 0, 1)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getWeeklyBounds(date time.Time) (lowerBound, upperBound time.Time) {
|
||||
lowerBound = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -int(date.Weekday()-time.Monday))
|
||||
upperBound = lowerBound.AddDate(0, 0, daysInWeek)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getMonthlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
|
||||
lowerBound = time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.UTC)
|
||||
upperBound = lowerBound.AddDate(0, 1, 0)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getQuarterlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
|
||||
year, _, _ := date.Date()
|
||||
|
||||
quarter := (int(date.Month()) - 1) / monthsInQuarter
|
||||
firstMonthOfTheQuarter := time.Month(quarter*monthsInQuarter + 1)
|
||||
|
||||
lowerBound = time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC)
|
||||
upperBound = lowerBound.AddDate(0, monthsInQuarter, 0)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getYearlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
|
||||
lowerBound = time.Date(date.Year(), 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
upperBound = lowerBound.AddDate(1, 0, 0)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (p partition) Next(i int) partition {
|
||||
var t time.Time
|
||||
switch p.Interval {
|
||||
case Daily:
|
||||
t = p.Time.AddDate(0, 0, i)
|
||||
case Weekly:
|
||||
t = p.Time.AddDate(0, 0, i*daysInWeek)
|
||||
case Monthly:
|
||||
year, month, _ := p.Time.Date()
|
||||
|
||||
t = time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, p.Time.Location())
|
||||
case Quarterly:
|
||||
t = p.Time.AddDate(0, i*monthsInQuarter, 0)
|
||||
case Yearly:
|
||||
year, _, _ := p.Time.Date()
|
||||
|
||||
t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location())
|
||||
}
|
||||
np := partition{
|
||||
ParentTable: p.ParentTable,
|
||||
Name: p.Name,
|
||||
Schema: p.Schema,
|
||||
Interval: p.Interval,
|
||||
Time: t,
|
||||
}
|
||||
|
||||
np.setName()
|
||||
|
||||
return np
|
||||
}
|
||||
|
||||
func (p *partition) setName() {
|
||||
t := p.Time
|
||||
var suffix string
|
||||
|
||||
switch p.Interval {
|
||||
case Daily:
|
||||
suffix = t.Format("2006_01_02")
|
||||
case Weekly:
|
||||
year, week := t.ISOWeek()
|
||||
suffix = fmt.Sprintf("%d_w%02d", year, week)
|
||||
case Monthly:
|
||||
suffix = t.Format("2006_01")
|
||||
case Quarterly:
|
||||
year, month, _ := t.Date()
|
||||
|
||||
var quarter int
|
||||
|
||||
switch {
|
||||
case month >= 1 && month <= 3:
|
||||
quarter = 1
|
||||
case month >= 4 && month <= 6:
|
||||
quarter = 2
|
||||
case month >= 7 && month <= 9:
|
||||
quarter = 3
|
||||
case month >= 10 && month <= 12:
|
||||
quarter = 4
|
||||
}
|
||||
|
||||
suffix = fmt.Sprintf("%d_q%d", year, quarter)
|
||||
case Yearly:
|
||||
suffix = t.Format("2006")
|
||||
default:
|
||||
panic(ErrInvalidInterval(p.Interval))
|
||||
}
|
||||
|
||||
p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix)
|
||||
}
|
||||
|
||||
func (p partition) Prev(i int) partition {
|
||||
var t time.Time
|
||||
switch p.Interval {
|
||||
case Daily:
|
||||
t = p.Time.AddDate(0, 0, -i)
|
||||
case Weekly:
|
||||
t = p.Time.AddDate(0, 0, -i*daysInWeek)
|
||||
case Monthly:
|
||||
year, month, _ := p.Time.Date()
|
||||
|
||||
t = time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, p.Time.Location())
|
||||
case Quarterly:
|
||||
t = p.Time.AddDate(0, -i*monthsInQuarter, 0)
|
||||
case Yearly:
|
||||
year, _, _ := p.Time.Date()
|
||||
|
||||
t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location())
|
||||
}
|
||||
|
||||
pp := partition{
|
||||
ParentTable: p.ParentTable,
|
||||
Name: p.Name,
|
||||
Schema: p.Schema,
|
||||
Interval: p.Interval,
|
||||
Time: t,
|
||||
}
|
||||
pp.setName()
|
||||
|
||||
return pp
|
||||
|
||||
}
|
449
pkg/database/partman/partman.go
Normal file
449
pkg/database/partman/partman.go
Normal file
|
@ -0,0 +1,449 @@
|
|||
package partman
|
||||
|
||||
// portions lifted gratefully from github.com/qonto/postgresql-partition-manager, MIT license.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"dynatron.me/x/stillbox/internal/isoweek"
|
||||
"dynatron.me/x/stillbox/pkg/config"
|
||||
"dynatron.me/x/stillbox/pkg/database"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const (
|
||||
callsTable = "calls"
|
||||
|
||||
preProvisionDefault = 1
|
||||
)
|
||||
|
||||
var (
|
||||
ErrWrongSchema = errors.New("wrong schema name")
|
||||
ErrDifferentInterval = errors.New("stored partition interval differs from configured")
|
||||
)
|
||||
|
||||
type PartitionErr struct {
|
||||
p string
|
||||
err error
|
||||
}
|
||||
|
||||
func (pe PartitionErr) Error() string {
|
||||
r := fmt.Sprintf("bad partition '%s'", pe.p)
|
||||
if pe.err != nil {
|
||||
r += ": " + pe.err.Error()
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (pe PartitionErr) Unwrap() error {
|
||||
return pe.err
|
||||
}
|
||||
|
||||
type ParsedIntvlErr struct {
|
||||
parsed, start time.Time
|
||||
}
|
||||
|
||||
func (pie ParsedIntvlErr) Error() string {
|
||||
return fmt.Sprintf("parsed interval (%s) does not match start (%s)", pie.parsed, pie.start)
|
||||
}
|
||||
|
||||
func PartitionError(pname string, err ...error) PartitionErr {
|
||||
if len(err) > 0 {
|
||||
return PartitionErr{p: pname, err: err[0]}
|
||||
}
|
||||
|
||||
return PartitionErr{p: pname}
|
||||
}
|
||||
|
||||
type ErrInvalidInterval string
|
||||
|
||||
func (in ErrInvalidInterval) Error() string {
|
||||
return fmt.Sprintf("invalid interval '%s'", string(in))
|
||||
}
|
||||
|
||||
type Interval string
|
||||
|
||||
const (
|
||||
Unknown Interval = ""
|
||||
Daily Interval = "daily"
|
||||
Weekly Interval = "weekly"
|
||||
Monthly Interval = "monthly"
|
||||
Quarterly Interval = "quarterly"
|
||||
Yearly Interval = "yearly"
|
||||
)
|
||||
|
||||
func (p Interval) IsValid() bool {
|
||||
switch p {
|
||||
case Daily, Weekly, Monthly, Quarterly, Yearly:
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type PartitionManager interface {
|
||||
Go(ctx context.Context)
|
||||
Check(ctx context.Context, now time.Time) error
|
||||
}
|
||||
|
||||
type partman struct {
|
||||
db database.Store
|
||||
cfg config.Partition
|
||||
intv Interval
|
||||
}
|
||||
|
||||
type partition struct {
|
||||
ParentTable string
|
||||
Schema string
|
||||
Name string
|
||||
Interval Interval
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
func New(db database.Store, cfg config.Partition) (*partman, error) {
|
||||
pm := &partman{
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
intv: Interval(cfg.Interval),
|
||||
}
|
||||
|
||||
if !pm.intv.IsValid() {
|
||||
return nil, ErrInvalidInterval(pm.intv)
|
||||
}
|
||||
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
var _ PartitionManager = (*partman)(nil)
|
||||
|
||||
func (pm *partman) Go(ctx context.Context) {
|
||||
tick := time.NewTicker(60 * time.Minute)
|
||||
|
||||
select {
|
||||
case now := <-tick.C:
|
||||
err := pm.Check(ctx, now)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("partman check failed")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *partman) newPartition(t time.Time) partition {
|
||||
p := partition{
|
||||
ParentTable: callsTable,
|
||||
Schema: pm.cfg.Schema,
|
||||
Interval: Interval(pm.cfg.Interval),
|
||||
Time: t,
|
||||
}
|
||||
|
||||
p.setName()
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (pm *partman) retentionPartitions(cur partition) []partition {
|
||||
partitions := make([]partition, 0, pm.cfg.Retain)
|
||||
for i := 1; i <= pm.cfg.Retain; i++ {
|
||||
prev := cur.Prev(i)
|
||||
partitions = append(partitions, prev)
|
||||
}
|
||||
|
||||
return partitions
|
||||
}
|
||||
|
||||
func (pm *partman) futurePartitions(cur partition) []partition {
|
||||
preProv := preProvisionDefault
|
||||
if pm.cfg.PreProvision != nil {
|
||||
preProv = *pm.cfg.PreProvision
|
||||
}
|
||||
|
||||
partitions := make([]partition, 0, preProv)
|
||||
for i := 1; i <= preProv; i++ {
|
||||
next := cur.Next(i)
|
||||
partitions = append(partitions, next)
|
||||
}
|
||||
|
||||
return partitions
|
||||
}
|
||||
|
||||
func (pm *partman) expectedPartitions(now time.Time) []partition {
|
||||
curPart := pm.newPartition(now)
|
||||
|
||||
shouldExist := []partition{curPart}
|
||||
if pm.cfg.Retain > -1 {
|
||||
retain := pm.retentionPartitions(curPart)
|
||||
shouldExist = append(shouldExist, retain...)
|
||||
}
|
||||
|
||||
future := pm.futurePartitions(curPart)
|
||||
|
||||
shouldExist = append(shouldExist, future...)
|
||||
|
||||
return shouldExist
|
||||
}
|
||||
|
||||
func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) {
|
||||
existing := make(map[string]partition)
|
||||
expectedAndExists := make(map[string]bool)
|
||||
|
||||
for _, t := range existingTables {
|
||||
existing[t.PartitionName()] = t
|
||||
}
|
||||
|
||||
for _, t := range expectedTables {
|
||||
if _, found := existing[t.PartitionName()]; found {
|
||||
expectedAndExists[t.PartitionName()] = true
|
||||
} else {
|
||||
missingTables = append(missingTables, t)
|
||||
}
|
||||
}
|
||||
|
||||
for _, t := range existingTables {
|
||||
if _, found := expectedAndExists[t.PartitionName()]; !found {
|
||||
// Only in existingTables and not in both
|
||||
unexpectedTables = append(unexpectedTables, t)
|
||||
}
|
||||
}
|
||||
|
||||
return unexpectedTables, missingTables
|
||||
}
|
||||
|
||||
func (pm *partman) existingPartitions(parts []database.PartitionResult) ([]partition, error) {
|
||||
existing := make([]partition, 0, len(parts))
|
||||
for _, v := range parts {
|
||||
if v.Schema != pm.cfg.Schema {
|
||||
return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema)
|
||||
}
|
||||
p, err := pm.verifyPartName(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.Interval != Interval(pm.cfg.Interval) {
|
||||
return nil, PartitionError(v.Schema+"."+v.Name, ErrDifferentInterval)
|
||||
}
|
||||
|
||||
existing = append(existing, p)
|
||||
}
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func (pm *partman) fullTableName(s string) string {
|
||||
return fmt.Sprintf("%s.%s", pm.cfg.Schema, s)
|
||||
}
|
||||
|
||||
func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error {
|
||||
s, e := p.Range()
|
||||
start := pgtype.Timestamptz{Time: s, Valid: true}
|
||||
end := pgtype.Timestamptz{Time: e, Valid: true}
|
||||
fullPartName := pm.fullTableName(p.PartitionName())
|
||||
|
||||
swept, err := tx.SweepCalls(ctx, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info().Int64("rows", swept).Time("start", s).Time("end", e).Msg("swept calls")
|
||||
|
||||
swept, err = tx.CleanupSweptCalls(ctx, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls")
|
||||
|
||||
log.Info().Str("partition", fullPartName).Msg("detaching partition")
|
||||
err = tx.DetachPartition(ctx, callsTable, fullPartName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if pm.cfg.Drop {
|
||||
log.Info().Str("partition", fullPartName).Msg("dropping partition")
|
||||
return tx.DropPartition(ctx, fullPartName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *partman) Check(ctx context.Context, now time.Time) error {
|
||||
return pm.db.InTx(ctx, func(db database.Store) error {
|
||||
// by default, we want to make sure a partition exists for this and next month
|
||||
// since we run this at startup, it's safe to do only that.
|
||||
partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
existing, err := pm.existingPartitions(partitions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
expected := pm.expectedPartitions(now)
|
||||
|
||||
unexpected, missing := pm.comparePartitions(existing, expected)
|
||||
|
||||
if pm.cfg.Retain > -1 {
|
||||
for _, p := range unexpected {
|
||||
err := pm.prunePartition(ctx, db, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range missing {
|
||||
err := pm.createPartition(ctx, db, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}, pgx.TxOptions{})
|
||||
}
|
||||
|
||||
func (p partition) Range() (time.Time, time.Time) {
|
||||
switch p.Interval {
|
||||
case Daily:
|
||||
return getDailyBounds(p.Time)
|
||||
case Weekly:
|
||||
return getWeeklyBounds(p.Time)
|
||||
case Monthly:
|
||||
return getMonthlyBounds(p.Time)
|
||||
case Quarterly:
|
||||
return getQuarterlyBounds(p.Time)
|
||||
case Yearly:
|
||||
return getYearlyBounds(p.Time)
|
||||
}
|
||||
|
||||
panic("unknown interval!")
|
||||
}
|
||||
|
||||
func (p partition) PartitionName() string {
|
||||
return p.Name
|
||||
}
|
||||
|
||||
func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error {
|
||||
start, end := part.Range()
|
||||
name := part.PartitionName()
|
||||
log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition")
|
||||
return tx.CreatePartition(ctx, callsTable, name, start, end)
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition scheme names:
|
||||
* daily: calls_p_2024_11_28
|
||||
* weekly: calls_p_2024_w48
|
||||
* monthly: calls_p_2024_11
|
||||
* quarterly: calls_p_2024_q4
|
||||
* yearly: calls_p_2024
|
||||
*/
|
||||
|
||||
func (pm *partman) verifyPartName(pr database.PartitionResult) (p partition, err error) {
|
||||
pn := pr.Name
|
||||
low, _, err := pr.ParseBounds()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
p = partition{
|
||||
ParentTable: pr.ParentTable,
|
||||
Name: pr.Name,
|
||||
Schema: pr.Schema,
|
||||
Time: low,
|
||||
}
|
||||
dateAr := strings.Split(pn, "calls_p_")
|
||||
if len(dateAr) != 2 {
|
||||
return p, PartitionError(pn)
|
||||
}
|
||||
|
||||
dateAr = strings.Split(dateAr[1], "_")
|
||||
switch len(dateAr) {
|
||||
case 3: // daily
|
||||
p.Interval = Daily
|
||||
ymd := [3]int{}
|
||||
for i := 0; i < 3; i++ {
|
||||
r, err := strconv.Atoi(dateAr[i])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn, err)
|
||||
}
|
||||
|
||||
ymd[i] = r
|
||||
}
|
||||
parsed := time.Date(ymd[0], time.Month(ymd[1]), ymd[2], 0, 0, 0, 0, time.UTC)
|
||||
if parsed != p.Time {
|
||||
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
|
||||
}
|
||||
return p, nil
|
||||
case 2:
|
||||
year, err := strconv.Atoi(dateAr[0])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn, err)
|
||||
}
|
||||
if strings.HasPrefix(dateAr[1], "w") {
|
||||
p.Interval = Weekly
|
||||
weekNum, err := strconv.Atoi(dateAr[1][1:])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn, err)
|
||||
}
|
||||
|
||||
parsed := isoweek.StartTime(year, weekNum, time.UTC)
|
||||
if parsed != p.Time {
|
||||
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
|
||||
}
|
||||
return p, nil
|
||||
} else if strings.HasPrefix(dateAr[1], "q") {
|
||||
p.Interval = Quarterly
|
||||
quarterNum, err := strconv.Atoi(dateAr[1][1:])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn, err)
|
||||
}
|
||||
if quarterNum > 4 {
|
||||
return p, PartitionError(pn, errors.New("invalid quarter"))
|
||||
}
|
||||
firstMonthOfTheQuarter := time.Month((quarterNum-1)*monthsInQuarter + 1)
|
||||
parsed := time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC)
|
||||
if parsed != p.Time {
|
||||
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
// monthly
|
||||
p.Interval = Monthly
|
||||
month, err := strconv.Atoi(dateAr[1])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn)
|
||||
}
|
||||
|
||||
parsed := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC)
|
||||
if parsed != p.Time {
|
||||
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
|
||||
}
|
||||
return p, nil
|
||||
case 1: // yearly
|
||||
p.Interval = Yearly
|
||||
year, err := strconv.Atoi(dateAr[0])
|
||||
if err != nil {
|
||||
return p, PartitionError(pn, err)
|
||||
}
|
||||
parsed := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
if parsed != p.Time {
|
||||
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
return p, PartitionError(pn)
|
||||
}
|
431
pkg/database/partman/partman_test.go
Normal file
431
pkg/database/partman/partman_test.go
Normal file
|
@ -0,0 +1,431 @@
|
|||
package partman_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dynatron.me/x/stillbox/internal/common"
|
||||
"dynatron.me/x/stillbox/pkg/config"
|
||||
"dynatron.me/x/stillbox/pkg/database"
|
||||
"dynatron.me/x/stillbox/pkg/database/mocks"
|
||||
"dynatron.me/x/stillbox/pkg/database/partman"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var mctx = mock.Anything
|
||||
|
||||
func inTx(s *mocks.Store) {
|
||||
s.EXPECT().InTx(mctx, mock.AnythingOfType("func(database.Store) error"), mock.AnythingOfType("pgx.TxOptions")).RunAndReturn(func(ctx context.Context, f func(db database.Store) error, po pgx.TxOptions) error {
|
||||
return f(s)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
type timeRange struct {
|
||||
start time.Time
|
||||
end time.Time
|
||||
}
|
||||
|
||||
type partSpec struct {
|
||||
name string
|
||||
timeRange
|
||||
}
|
||||
|
||||
func TestPartman(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
timeInUTC := func(s string) time.Time {
|
||||
t, err := time.ParseInLocation("2006-01-02 15:04:05", s, time.UTC)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
dateInUTC := func(s string) time.Time {
|
||||
t, err := time.ParseInLocation("2006-01-02", s, time.UTC)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
partResultWithSchema := func(schema, name, low, up string) database.PartitionResult {
|
||||
return database.PartitionResult{
|
||||
ParentTable: "calls",
|
||||
Schema: schema,
|
||||
Name: name,
|
||||
LowerBound: low,
|
||||
UpperBound: up,
|
||||
}
|
||||
}
|
||||
|
||||
partResult := func(name, low, up string) database.PartitionResult {
|
||||
return partResultWithSchema("public", name, low, up)
|
||||
}
|
||||
|
||||
dailyTR := func(tr string) timeRange {
|
||||
dtr := dateInUTC(tr)
|
||||
etr := dtr.AddDate(0, 0, 1)
|
||||
return timeRange{start: dtr, end: etr}
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
now time.Time
|
||||
cfg config.Partition
|
||||
extant []database.PartitionResult
|
||||
expectCreate []partSpec
|
||||
expectDrop []string
|
||||
expectDetach []string
|
||||
expectSweep []timeRange
|
||||
expectCleanup []timeRange
|
||||
expectErr error
|
||||
}{
|
||||
{
|
||||
name: "monthly base",
|
||||
now: timeInUTC("2024-11-28 11:37:04"),
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "monthly",
|
||||
Retain: 2,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
|
||||
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
|
||||
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
|
||||
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}},
|
||||
{name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}},
|
||||
{name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}},
|
||||
},
|
||||
expectDrop: []string{
|
||||
"public.calls_p_2024_07",
|
||||
"public.calls_p_2024_08",
|
||||
},
|
||||
expectSweep: []timeRange{
|
||||
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
|
||||
{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
|
||||
},
|
||||
expectCleanup: []timeRange{
|
||||
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
|
||||
{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
|
||||
},
|
||||
expectDetach: []string{
|
||||
"public.calls_p_2024_07",
|
||||
"public.calls_p_2024_08",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "monthly retain all",
|
||||
now: timeInUTC("2024-11-28 11:37:04"),
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "monthly",
|
||||
Retain: -1,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
|
||||
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
|
||||
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
|
||||
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2024_11", timeRange: timeRange{start: dateInUTC("2024-11-01"), end: dateInUTC("2024-12-01")}},
|
||||
{name: "calls_p_2024_12", timeRange: timeRange{start: dateInUTC("2024-12-01"), end: dateInUTC("2025-01-01")}},
|
||||
{name: "calls_p_2025_01", timeRange: timeRange{start: dateInUTC("2025-01-01"), end: dateInUTC("2025-02-01")}},
|
||||
},
|
||||
expectDrop: []string{},
|
||||
expectSweep: []timeRange{},
|
||||
expectCleanup: []timeRange{},
|
||||
expectDetach: []string{},
|
||||
},
|
||||
|
||||
{
|
||||
name: "weekly base",
|
||||
now: timeInUTC("2024-11-28 11:37:04"), // week 48
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "weekly",
|
||||
Retain: 2,
|
||||
Drop: false,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_w44", "2024-10-28", "2024-11-04"),
|
||||
partResult("calls_p_2024_w45", "2024-11-04", "2024-11-11"),
|
||||
partResult("calls_p_2024_w46", "2024-11-11", "2024-11-18"),
|
||||
// missing week 47
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2024_w47", timeRange: timeRange{start: dateInUTC("2024-11-18"), end: dateInUTC("2024-11-25")}},
|
||||
{name: "calls_p_2024_w48", timeRange: timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-12-02")}},
|
||||
{name: "calls_p_2024_w49", timeRange: timeRange{start: dateInUTC("2024-12-02"), end: dateInUTC("2024-12-09")}},
|
||||
{name: "calls_p_2024_w50", timeRange: timeRange{start: dateInUTC("2024-12-09"), end: dateInUTC("2024-12-16")}},
|
||||
},
|
||||
expectSweep: []timeRange{
|
||||
{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
|
||||
{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
|
||||
},
|
||||
expectCleanup: []timeRange{
|
||||
{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
|
||||
{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
|
||||
},
|
||||
expectDetach: []string{
|
||||
"public.calls_p_2024_w44",
|
||||
"public.calls_p_2024_w45",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "daily base",
|
||||
now: timeInUTC("2024-12-31 11:37:04"),
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "daily",
|
||||
Retain: 2,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_12_26", "2024-12-26", "2024-12-27"),
|
||||
partResult("calls_p_2024_12_27", "2024-12-27", "2024-12-28"),
|
||||
partResult("calls_p_2024_12_30", "2024-12-30", "2024-12-31"),
|
||||
partResult("calls_p_2024_12_31", "2024-12-31", "2025-01-01"),
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2024_12_29", timeRange: dailyTR("2024-12-29")},
|
||||
{name: "calls_p_2025_01_01", timeRange: dailyTR("2025-01-01")},
|
||||
{name: "calls_p_2025_01_02", timeRange: dailyTR("2025-01-02")},
|
||||
},
|
||||
expectDrop: []string{
|
||||
"public.calls_p_2024_12_26",
|
||||
"public.calls_p_2024_12_27",
|
||||
},
|
||||
expectSweep: []timeRange{
|
||||
{start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")},
|
||||
{start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")},
|
||||
},
|
||||
expectCleanup: []timeRange{
|
||||
{start: dateInUTC("2024-12-26"), end: dateInUTC("2024-12-27")},
|
||||
{start: dateInUTC("2024-12-27"), end: dateInUTC("2024-12-28")},
|
||||
},
|
||||
expectDetach: []string{
|
||||
"public.calls_p_2024_12_26",
|
||||
"public.calls_p_2024_12_27",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "quarterly base",
|
||||
now: timeInUTC("2025-07-28 11:37:04"), // q3
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "quarterly",
|
||||
Retain: 2,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_q3", "2024-07-01", "2024-10-01"),
|
||||
partResult("calls_p_2024_q4", "2024-10-01", "2025-01-01"),
|
||||
partResult("calls_p_2025_q1", "2025-01-01", "2024-04-01"),
|
||||
partResult("calls_p_2025_q2", "2025-04-01", "2024-07-01"),
|
||||
},
|
||||
expectDrop: []string{
|
||||
"public.calls_p_2024_q3",
|
||||
"public.calls_p_2024_q4",
|
||||
},
|
||||
expectSweep: []timeRange{
|
||||
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")},
|
||||
{start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")},
|
||||
},
|
||||
expectCleanup: []timeRange{
|
||||
{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-10-01")},
|
||||
{start: dateInUTC("2024-10-01"), end: dateInUTC("2025-01-01")},
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2025_q3", timeRange: timeRange{dateInUTC("2025-07-01"), dateInUTC("2025-10-01")}},
|
||||
{name: "calls_p_2025_q4", timeRange: timeRange{dateInUTC("2025-10-01"), dateInUTC("2026-01-01")}},
|
||||
{name: "calls_p_2026_q1", timeRange: timeRange{dateInUTC("2026-01-01"), dateInUTC("2026-04-01")}},
|
||||
},
|
||||
expectDetach: []string{
|
||||
"public.calls_p_2024_q3",
|
||||
"public.calls_p_2024_q4",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "yearly base",
|
||||
now: timeInUTC("2023-04-28 11:37:04"), // q3
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "yearly",
|
||||
Retain: 3,
|
||||
Drop: false,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2019", "2019-01-01", "2020-01-01"),
|
||||
partResult("calls_p_2020", "2020-01-01", "2021-01-01"),
|
||||
partResult("calls_p_2021", "2021-01-01", "2022-01-01"),
|
||||
partResult("calls_p_2022", "2022-01-01", "2023-01-01"),
|
||||
partResult("calls_p_2023", "2023-01-01", "2024-01-01"),
|
||||
},
|
||||
expectDrop: []string{},
|
||||
expectSweep: []timeRange{
|
||||
{start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")},
|
||||
},
|
||||
expectCleanup: []timeRange{
|
||||
{start: dateInUTC("2019-01-01"), end: dateInUTC("2020-01-01")},
|
||||
},
|
||||
expectCreate: []partSpec{
|
||||
{name: "calls_p_2024", timeRange: timeRange{dateInUTC("2024-01-01"), dateInUTC("2025-01-01")}},
|
||||
{name: "calls_p_2025", timeRange: timeRange{dateInUTC("2025-01-01"), dateInUTC("2026-01-01")}},
|
||||
},
|
||||
expectDetach: []string{
|
||||
"public.calls_p_2019",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "changed monthly to daily",
|
||||
now: timeInUTC("2024-11-28 11:37:04"),
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "daily",
|
||||
Retain: 2,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
|
||||
partResult("calls_p_2024_09", "2024-09-01", "2024-10-01"),
|
||||
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
|
||||
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
|
||||
},
|
||||
expectErr: partman.ErrDifferentInterval,
|
||||
},
|
||||
{
|
||||
name: "monthly wrong schema",
|
||||
now: timeInUTC("2024-11-28 11:37:04"),
|
||||
cfg: config.Partition{
|
||||
Enabled: true,
|
||||
Schema: "public",
|
||||
Interval: "monthly",
|
||||
Retain: 2,
|
||||
Drop: true,
|
||||
PreProvision: common.PtrTo(2),
|
||||
},
|
||||
extant: []database.PartitionResult{
|
||||
partResult("calls_p_2024_10", "2024-10-01", "2024-11-01"),
|
||||
partResultWithSchema("reid", "calls_p_2024_09", "2024-09-01", "2024-10-01"),
|
||||
partResult("calls_p_2024_08", "2024-08-01", "2024-09-01"),
|
||||
partResult("calls_p_2024_07", "2024-07-01", "2024-08-01"),
|
||||
},
|
||||
expectErr: partman.ErrWrongSchema,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
db := mocks.NewStore(t)
|
||||
createdPartitions := make([]partSpec, 0, len(tc.expectCreate))
|
||||
sweptRanges := make([]timeRange, 0, len(tc.expectSweep))
|
||||
droppedPartitions := make([]string, 0, len(tc.expectDrop))
|
||||
cleanupRanges := make([]timeRange, 0, len(tc.expectCleanup))
|
||||
detachedPartitions := make([]string, 0, len(tc.expectDetach))
|
||||
sweepMap := make(map[timeRange]struct{})
|
||||
|
||||
if len(tc.expectCreate) > 0 {
|
||||
db.EXPECT().
|
||||
CreatePartition(
|
||||
mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Time"),
|
||||
mock.AnythingOfType("time.Time"),
|
||||
).
|
||||
Run(func(ctx context.Context, tableName, partitionName string, start, end time.Time) {
|
||||
ps := partSpec{name: partitionName, timeRange: timeRange{start: start, end: end}}
|
||||
createdPartitions = append(createdPartitions, ps)
|
||||
}).Return(nil)
|
||||
}
|
||||
|
||||
if len(tc.expectSweep) > 0 {
|
||||
db.EXPECT().
|
||||
SweepCalls(
|
||||
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
|
||||
).
|
||||
Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
|
||||
tr := timeRange{start: start.Time, end: end.Time}
|
||||
sweepMap[tr] = struct{}{}
|
||||
sweptRanges = append(sweptRanges, tr)
|
||||
}).Return(30, nil)
|
||||
}
|
||||
|
||||
if len(tc.expectCleanup) > 0 {
|
||||
db.EXPECT().
|
||||
CleanupSweptCalls(
|
||||
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
|
||||
).Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
|
||||
tr := timeRange{start: start.Time, end: end.Time}
|
||||
require.Contains(t, sweepMap, tr)
|
||||
|
||||
cleanupRanges = append(cleanupRanges, tr)
|
||||
}).Return(30, nil)
|
||||
}
|
||||
|
||||
if tc.cfg.Drop && len(tc.expectDrop) > 0 {
|
||||
db.EXPECT().
|
||||
DropPartition(mctx, mock.AnythingOfType("string")).
|
||||
Run(func(ctx context.Context, partName string) {
|
||||
droppedPartitions = append(droppedPartitions, partName)
|
||||
}).Return(nil)
|
||||
}
|
||||
|
||||
if len(tc.expectDetach) > 0 {
|
||||
db.EXPECT().
|
||||
DetachPartition(
|
||||
mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string")).
|
||||
Run(func(ctx context.Context, parentTable, partName string) {
|
||||
detachedPartitions = append(detachedPartitions, partName)
|
||||
}).Return(nil)
|
||||
}
|
||||
|
||||
inTx(db)
|
||||
|
||||
db.EXPECT().GetTablePartitions(mctx, "public", "calls").Return(tc.extant, nil)
|
||||
|
||||
pm, err := partman.New(db, tc.cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = pm.Check(ctx, tc.now)
|
||||
if tc.expectErr != nil {
|
||||
assert.ErrorIs(t, err, tc.expectErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.ElementsMatch(t, tc.expectCreate, createdPartitions, "created partitions")
|
||||
assert.ElementsMatch(t, tc.expectSweep, sweptRanges, "swept ranges")
|
||||
assert.ElementsMatch(t, tc.expectDrop, droppedPartitions, "dropped partitions")
|
||||
assert.ElementsMatch(t, tc.expectCleanup, cleanupRanges, "cleaned up ranges")
|
||||
assert.ElementsMatch(t, tc.expectDetach, detachedPartitions, "detached partitions")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ type Querier interface {
|
|||
AddAlert(ctx context.Context, arg AddAlertParams) error
|
||||
AddCall(ctx context.Context, arg AddCallParams) error
|
||||
AddLearnedTalkgroup(ctx context.Context, arg AddLearnedTalkgroupParams) (Talkgroup, error)
|
||||
CleanupSweptCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
|
||||
CreateAPIKey(ctx context.Context, owner int, expires pgtype.Timestamp, disabled *bool) (ApiKey, error)
|
||||
CreateUser(ctx context.Context, arg CreateUserParams) (User, error)
|
||||
DeleteAPIKey(ctx context.Context, apiKey string) error
|
||||
|
@ -42,6 +43,7 @@ type Querier interface {
|
|||
SetCallTranscript(ctx context.Context, iD uuid.UUID, transcript *string) error
|
||||
SetTalkgroupTags(ctx context.Context, tags []string, systemID int32, tGID int32) error
|
||||
StoreTGVersion(ctx context.Context, arg []StoreTGVersionParams) *StoreTGVersionBatchResults
|
||||
SweepCalls(ctx context.Context, rangeStart pgtype.Timestamptz, rangeEnd pgtype.Timestamptz) (int64, error)
|
||||
UpdatePassword(ctx context.Context, username string, password string) error
|
||||
UpdateTalkgroup(ctx context.Context, arg UpdateTalkgroupParams) (Talkgroup, error)
|
||||
UpsertTalkgroup(ctx context.Context, arg []UpsertTalkgroupParams) *UpsertTalkgroupBatchResults
|
||||
|
|
|
@ -205,7 +205,7 @@ func (n *notifier) Send(ctx context.Context, alerts []alert.Alert) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func New(cfg config.Notify) (Notifier, error) {
|
||||
func New(cfg config.Notify) (*notifier, error) {
|
||||
n := new(notifier)
|
||||
|
||||
for _, s := range cfg {
|
||||
|
|
|
@ -20,7 +20,7 @@ type API interface {
|
|||
type api struct {
|
||||
}
|
||||
|
||||
func New() API {
|
||||
func New() *api {
|
||||
s := new(api)
|
||||
|
||||
return s
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"dynatron.me/x/stillbox/pkg/auth"
|
||||
"dynatron.me/x/stillbox/pkg/config"
|
||||
"dynatron.me/x/stillbox/pkg/database"
|
||||
"dynatron.me/x/stillbox/pkg/database/partman"
|
||||
"dynatron.me/x/stillbox/pkg/nexus"
|
||||
"dynatron.me/x/stillbox/pkg/notify"
|
||||
"dynatron.me/x/stillbox/pkg/rest"
|
||||
|
@ -40,6 +41,7 @@ type Server struct {
|
|||
hup chan os.Signal
|
||||
tgs tgstore.Store
|
||||
rest rest.API
|
||||
partman partman.PartitionManager
|
||||
}
|
||||
|
||||
func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
|
||||
|
@ -79,6 +81,18 @@ func New(ctx context.Context, cfg *config.Configuration) (*Server, error) {
|
|||
rest: api,
|
||||
}
|
||||
|
||||
if cfg.DB.Partition.Enabled {
|
||||
srv.partman, err = partman.New(db, cfg.DB.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = srv.partman.Check(ctx, time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db, tgCache), true)
|
||||
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false)
|
||||
|
||||
|
@ -128,6 +142,10 @@ func (s *Server) Go(ctx context.Context) error {
|
|||
go s.nex.Go(ctx)
|
||||
go s.alerter.Go(ctx)
|
||||
|
||||
if pm := s.partman; pm != nil {
|
||||
go pm.Go(ctx)
|
||||
}
|
||||
|
||||
var err error
|
||||
go func() {
|
||||
err = httpSrv.ListenAndServe()
|
||||
|
|
|
@ -36,7 +36,7 @@ type sinks struct {
|
|||
sinks map[string]sinkInstance
|
||||
}
|
||||
|
||||
func NewSinkManager() Sinks {
|
||||
func NewSinkManager() *sinks {
|
||||
return &sinks{
|
||||
sinks: make(map[string]sinkInstance),
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ type cache struct {
|
|||
}
|
||||
|
||||
// NewCache returns a new cache Store.
|
||||
func NewCache() Store {
|
||||
func NewCache() *cache {
|
||||
tgc := &cache{
|
||||
tgs: make(tgMap),
|
||||
systems: make(map[int32]string),
|
||||
|
|
|
@ -78,7 +78,33 @@ CREATE TABLE IF NOT EXISTS alerts(
|
|||
metadata JSONB
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS calls(
|
||||
CREATE TABLE calls (
|
||||
id UUID,
|
||||
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
|
||||
system INTEGER NOT NULL,
|
||||
talkgroup INTEGER NOT NULL,
|
||||
call_date TIMESTAMPTZ NOT NULL,
|
||||
audio_name TEXT,
|
||||
audio_blob BYTEA,
|
||||
duration INTEGER,
|
||||
audio_type TEXT,
|
||||
audio_url TEXT,
|
||||
frequency INTEGER NOT NULL,
|
||||
frequencies INTEGER[],
|
||||
patches INTEGER[],
|
||||
tg_label TEXT,
|
||||
tg_alpha_tag TEXT,
|
||||
tg_group TEXT,
|
||||
source INTEGER NOT NULL,
|
||||
transcript TEXT,
|
||||
PRIMARY KEY (id, call_date),
|
||||
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
|
||||
) PARTITION BY RANGE (call_date);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript));
|
||||
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date);
|
||||
|
||||
CREATE TABLE swept_calls (
|
||||
id UUID PRIMARY KEY,
|
||||
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
|
||||
system INTEGER NOT NULL,
|
||||
|
@ -100,8 +126,9 @@ CREATE TABLE IF NOT EXISTS calls(
|
|||
FOREIGN KEY (system, talkgroup) REFERENCES talkgroups(system_id, tgid)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS calls_transcript_idx ON calls USING GIN (to_tsvector('english', transcript));
|
||||
CREATE INDEX IF NOT EXISTS calls_call_date_tg_idx ON calls(system, talkgroup, call_date);
|
||||
CREATE INDEX IF NOT EXISTS swept_calls_transcript_idx ON swept_calls USING GIN (to_tsvector('english', transcript));
|
||||
CREATE INDEX IF NOT EXISTS swept_calls_call_date_tg_idx ON swept_calls(system, talkgroup, call_date);
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS settings(
|
||||
name TEXT PRIMARY KEY,
|
||||
|
@ -125,8 +152,14 @@ CREATE INDEX IF NOT EXISTS incidents_name_description_idx ON incidents USING GIN
|
|||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS incidents_calls(
|
||||
incident_id UUID REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
call_id UUID REFERENCES calls(id) ON UPDATE CASCADE,
|
||||
incident_id UUID NOT NULL REFERENCES incidents(id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
call_id UUID NOT NULL,
|
||||
calls_tbl_id UUID NULL,
|
||||
swept_call_id UUID NULL REFERENCES swept_calls(id),
|
||||
call_date TIMESTAMPTZ NULL,
|
||||
notes JSONB,
|
||||
FOREIGN KEY (calls_tbl_id, call_date) REFERENCES calls(id, call_date),
|
||||
PRIMARY KEY (incident_id, call_id)
|
||||
);
|
||||
|
||||
|
||||
|
|
|
@ -56,3 +56,23 @@ VALUES
|
|||
|
||||
-- name: GetDatabaseSize :one
|
||||
SELECT pg_size_pretty(pg_database_size(current_database()));
|
||||
|
||||
-- name: SweepCalls :execrows
|
||||
WITH to_sweep AS (
|
||||
SELECT id, submitter, system, talkgroup, calls.call_date, audio_name, audio_blob, duration, audio_type,
|
||||
audio_url, frequency, frequencies, patches, tg_label, tg_alpha_tag, tg_group, source, transcript
|
||||
FROM calls
|
||||
JOIN incidents_calls ic ON ic.call_id = calls.id
|
||||
WHERE calls.call_date >= @range_start AND calls.call_date < @range_end
|
||||
) INSERT INTO swept_calls SELECT * FROM to_sweep;
|
||||
|
||||
-- name: CleanupSweptCalls :execrows
|
||||
WITH to_sweep AS (
|
||||
SELECT id FROM calls
|
||||
JOIN incidents_calls ic ON ic.call_id = calls.id
|
||||
WHERE calls.call_date >= @range_start AND calls.call_date < @range_end
|
||||
) UPDATE incidents_calls
|
||||
SET
|
||||
swept_call_id = call_id,
|
||||
calls_tbl_id = NULL
|
||||
WHERE call_id IN (SELECT id FROM to_sweep);
|
||||
|
|
|
@ -37,3 +37,6 @@ sql:
|
|||
import: "dynatron.me/x/stillbox/internal/jsontypes"
|
||||
type: "Metadata"
|
||||
nullable: true
|
||||
- column: "pg_catalog.pg_tables.tablename"
|
||||
go_type: string
|
||||
nullable: false
|
||||
|
|
Loading…
Reference in a new issue