stillbox/pkg/database/partman/partman_test.go

390 lines
12 KiB
Go
Raw Normal View History

2024-11-29 19:22:50 -05:00
package partman_test
import (
"context"
"testing"
"time"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/database/mocks"
"dynatron.me/x/stillbox/pkg/database/partman"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
var mctx = mock.Anything
2024-11-29 23:24:43 -05:00
func inTx(s *mocks.Store) {
2024-11-29 19:22:50 -05:00
s.EXPECT().InTx(mctx, mock.AnythingOfType("func(database.Store) error"), mock.AnythingOfType("pgx.TxOptions")).RunAndReturn(func(ctx context.Context, f func(db database.Store) error, po pgx.TxOptions) error {
return f(s)
})
}
type timeRange struct {
start time.Time
end time.Time
}
func TestPartman(t *testing.T) {
ctx := context.Background()
timeInUTC := func(s string) time.Time {
t, err := time.ParseInLocation("2006-01-02 15:04:05", s, time.UTC)
if err != nil {
panic(err)
}
return t
}
dateInUTC := func(s string) time.Time {
t, err := time.ParseInLocation("2006-01-02", s, time.UTC)
if err != nil {
panic(err)
}
return t
}
2024-11-30 12:27:59 -05:00
etbWithSchema := func(schema, name, low, up string) database.PartitionResult {
2024-11-30 11:06:40 -05:00
return database.PartitionResult{
ParentTable: "calls",
2024-11-30 12:27:59 -05:00
Schema: schema,
2024-11-30 11:06:40 -05:00
Name: name,
LowerBound: low,
UpperBound: up,
}
}
2024-11-30 12:27:59 -05:00
etb := func(name, low, up string) database.PartitionResult {
return etbWithSchema("public", name, low, up)
}
2024-11-29 19:22:50 -05:00
tests := []struct {
name string
now time.Time
cfg config.Partition
2024-11-30 11:06:40 -05:00
extant []database.PartitionResult
2024-11-29 19:22:50 -05:00
expectCreate []string
expectDrop []string
expectDetach []string
expectSweep []timeRange
expectCleanup []timeRange
2024-11-30 12:27:59 -05:00
expectErr error
2024-11-29 19:22:50 -05:00
}{
{
name: "monthly base",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "monthly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
2024-11-30 11:06:40 -05:00
extant: []database.PartitionResult{
etb("calls_p_2024_10", "2024-10-01", "2024-11-01"),
etb("calls_p_2024_09", "2024-09-01", "2024-10-01"),
etb("calls_p_2024_08", "2024-08-01", "2024-09-01"),
etb("calls_p_2024_07", "2024-07-01", "2024-08-01"),
2024-11-29 19:22:50 -05:00
},
expectCreate: []string{
"calls_p_2024_11",
"calls_p_2024_12",
"calls_p_2025_01",
},
expectDrop: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectSweep: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectCleanup: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectDetach: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
},
2024-11-29 23:24:43 -05:00
{
name: "weekly base",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "weekly",
Retain: 2,
Drop: false,
PreProvision: common.PtrTo(2),
},
2024-11-30 11:06:40 -05:00
extant: []database.PartitionResult{
etb("calls_p_2024_w44", "2024-10-28", "2024-11-04"),
etb("calls_p_2024_w45", "2024-11-04", "2024-11-11"),
etb("calls_p_2024_w46", "2024-11-11", "2024-11-18"),
2024-11-29 23:24:43 -05:00
},
expectCreate: []string{
2024-11-30 11:06:40 -05:00
"calls_p_2024_w47",
2024-11-29 23:24:43 -05:00
"calls_p_2024_w48",
"calls_p_2024_w49",
"calls_p_2024_w50",
},
expectSweep: []timeRange{
2024-11-30 11:06:40 -05:00
timeRange{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
timeRange{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
2024-11-29 23:24:43 -05:00
},
expectCleanup: []timeRange{
2024-11-30 11:06:40 -05:00
timeRange{start: dateInUTC("2024-10-28"), end: dateInUTC("2024-11-04")},
timeRange{start: dateInUTC("2024-11-04"), end: dateInUTC("2024-11-11")},
2024-11-29 23:24:43 -05:00
},
expectDetach: []string{
"public.calls_p_2024_w44",
"public.calls_p_2024_w45",
},
},
2024-11-30 12:27:59 -05:00
{
name: "daily base",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "daily",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
etb("calls_p_2024_11_24", "2024-11-24", "2024-11-25"),
etb("calls_p_2024_11_25", "2024-11-25", "2024-11-26"),
etb("calls_p_2024_11_26", "2024-11-26", "2024-11-27"),
etb("calls_p_2024_11_27", "2024-11-27", "2024-11-28"),
},
expectCreate: []string{
"calls_p_2024_11_28",
"calls_p_2024_11_29",
"calls_p_2024_11_30",
},
expectDrop: []string{
"public.calls_p_2024_11_24",
"public.calls_p_2024_11_25",
},
expectSweep: []timeRange{
timeRange{start: dateInUTC("2024-11-24"), end: dateInUTC("2024-11-25")},
timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-11-26")},
},
expectCleanup: []timeRange{
timeRange{start: dateInUTC("2024-11-24"), end: dateInUTC("2024-11-25")},
timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-11-26")},
},
expectDetach: []string{
"public.calls_p_2024_11_24",
"public.calls_p_2024_11_25",
},
},
{
name: "quarterly base",
now: timeInUTC("2025-07-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "quarterly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
etb("calls_p_2024_q3", "2024-07-01", "2024-10-01"),
etb("calls_p_2024_q4", "2024-10-01", "2025-01-01"),
etb("calls_p_2025_q1", "2025-01-01", "2024-04-01"),
etb("calls_p_2025_q2", "2025-04-01", "2024-07-01"),
},
expectCreate: []string{},
expectDrop: []string{
"public.calls_p_2024_11_24",
"public.calls_p_2024_11_25",
},
expectSweep: []timeRange{
timeRange{start: dateInUTC("2024-11-24"), end: dateInUTC("2024-11-25")},
timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-11-26")},
},
expectCleanup: []timeRange{
timeRange{start: dateInUTC("2024-11-24"), end: dateInUTC("2024-11-25")},
timeRange{start: dateInUTC("2024-11-25"), end: dateInUTC("2024-11-26")},
},
expectDetach: []string{
"public.calls_p_2024_11_24",
"public.calls_p_2024_11_25",
},
},
{
name: "changed monthly to daily",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "daily",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
etb("calls_p_2024_10", "2024-10-01", "2024-11-01"),
etb("calls_p_2024_09", "2024-09-01", "2024-10-01"),
etb("calls_p_2024_08", "2024-08-01", "2024-09-01"),
etb("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectCreate: []string{
"calls_p_2024_11",
"calls_p_2024_12",
"calls_p_2025_01",
},
expectDrop: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectSweep: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectCleanup: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectDetach: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectErr: partman.ErrDifferentInterval,
},
{
name: "monthly wrong schema",
now: timeInUTC("2024-11-28 11:37:04"),
cfg: config.Partition{
Enabled: true,
Schema: "public",
Interval: "monthly",
Retain: 2,
Drop: true,
PreProvision: common.PtrTo(2),
},
extant: []database.PartitionResult{
etb("calls_p_2024_10", "2024-10-01", "2024-11-01"),
etbWithSchema("reid", "calls_p_2024_09", "2024-09-01", "2024-10-01"),
etb("calls_p_2024_08", "2024-08-01", "2024-09-01"),
etb("calls_p_2024_07", "2024-07-01", "2024-08-01"),
},
expectCreate: []string{
"calls_p_2024_11",
"calls_p_2024_12",
"calls_p_2025_01",
},
expectDrop: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectSweep: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectCleanup: []timeRange{
timeRange{start: dateInUTC("2024-07-01"), end: dateInUTC("2024-08-01")},
timeRange{start: dateInUTC("2024-08-01"), end: dateInUTC("2024-09-01")},
},
expectDetach: []string{
"public.calls_p_2024_07",
"public.calls_p_2024_08",
},
expectErr: partman.ErrWrongSchema,
},
2024-11-29 19:22:50 -05:00
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
db := mocks.NewStore(t)
createdPartitions := make([]string, 0, len(tc.expectCreate))
sweptRanges := make([]timeRange, 0, len(tc.expectSweep))
droppedPartitions := make([]string, 0, len(tc.expectDrop))
cleanupRanges := make([]timeRange, 0, len(tc.expectCleanup))
detachedPartitions := make([]string, 0, len(tc.expectDetach))
2024-11-30 12:27:59 -05:00
sweepMap := make(map[timeRange]struct{})
2024-11-29 19:22:50 -05:00
2024-11-30 12:27:59 -05:00
if tc.expectErr == nil {
db.EXPECT().
CreatePartition(
mctx, mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Time"),
mock.AnythingOfType("time.Time"),
).
Run(func(ctx context.Context, tableName, partitionName string, start, end time.Time) {
createdPartitions = append(createdPartitions, partitionName)
}).Return(nil)
db.EXPECT().
SweepCalls(
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
).
Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
tr := timeRange{start: start.Time, end: end.Time}
sweepMap[tr] = struct{}{}
sweptRanges = append(sweptRanges, tr)
}).Return(nil)
db.EXPECT().
CleanupSweptCalls(
mctx, mock.AnythingOfType("pgtype.Timestamptz"), mock.AnythingOfType("pgtype.Timestamptz"),
).Run(func(ctx context.Context, start, end pgtype.Timestamptz) {
tr := timeRange{start: start.Time, end: end.Time}
require.Contains(t, sweepMap, tr)
2024-11-30 11:06:40 -05:00
2024-11-30 12:27:59 -05:00
cleanupRanges = append(cleanupRanges, tr)
2024-11-30 11:06:40 -05:00
}).Return(nil)
2024-11-30 12:27:59 -05:00
if tc.cfg.Drop {
db.EXPECT().
DropPartition(mctx, mock.AnythingOfType("string")).
Run(func(ctx context.Context, partName string) {
droppedPartitions = append(droppedPartitions, partName)
}).Return(nil)
}
2024-11-30 11:06:40 -05:00
db.EXPECT().
2024-11-30 12:27:59 -05:00
DetachPartition(
mctx, mock.AnythingOfType("string")).
2024-11-30 11:06:40 -05:00
Run(func(ctx context.Context, partName string) {
2024-11-30 12:27:59 -05:00
detachedPartitions = append(detachedPartitions, partName)
2024-11-30 11:06:40 -05:00
}).Return(nil)
}
2024-11-29 23:24:43 -05:00
inTx(db)
2024-11-30 11:06:40 -05:00
2024-11-29 19:22:50 -05:00
db.EXPECT().GetTablePartitions(mctx, "public", "calls").Return(tc.extant, nil)
2024-11-30 11:06:40 -05:00
2024-11-29 19:22:50 -05:00
pm, err := partman.New(db, tc.cfg)
require.NoError(t, err)
err = pm.Check(ctx, tc.now)
2024-11-30 12:27:59 -05:00
if tc.expectErr != nil {
assert.ErrorIs(t, err, tc.expectErr)
} else {
require.NoError(t, err)
2024-11-29 19:22:50 -05:00
2024-11-30 12:27:59 -05:00
assert.ElementsMatch(t, tc.expectCreate, createdPartitions, "created partitions")
assert.ElementsMatch(t, tc.expectSweep, sweptRanges, "swept ranges")
assert.ElementsMatch(t, tc.expectDrop, droppedPartitions, "dropped partitions")
assert.ElementsMatch(t, tc.expectCleanup, cleanupRanges, "cleaned up ranges")
assert.ElementsMatch(t, tc.expectDetach, detachedPartitions, "detached partitions")
}
2024-11-29 19:22:50 -05:00
})
}
}