stillbox/pkg/database/partman/partman.go

460 lines
10 KiB
Go

package partman
// portions lifted gratefully from github.com/qonto/postgresql-partition-manager, MIT license.
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"dynatron.me/x/stillbox/internal/isoweek"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/rbac/entities"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"
)
const (
CallsTable = "calls"
CheckInterval = time.Hour // every 1h
preProvisionDefault = 1
)
var (
ErrWrongSchema = errors.New("wrong schema name")
ErrDifferentInterval = errors.New("stored partition interval differs from configured")
)
type PartitionErr struct {
p string
err error
}
func (pe PartitionErr) Error() string {
r := fmt.Sprintf("bad partition '%s'", pe.p)
if pe.err != nil {
r += ": " + pe.err.Error()
}
return r
}
func (pe PartitionErr) Unwrap() error {
return pe.err
}
type ParsedIntvlErr struct {
parsed, start time.Time
}
func (pie ParsedIntvlErr) Error() string {
return fmt.Sprintf("parsed interval (%s) does not match start (%s)", pie.parsed, pie.start)
}
func PartitionError(pname string, err ...error) PartitionErr {
if len(err) > 0 {
return PartitionErr{p: pname, err: err[0]}
}
return PartitionErr{p: pname}
}
type ErrInvalidInterval string
func (in ErrInvalidInterval) Error() string {
return fmt.Sprintf("invalid interval '%s'", string(in))
}
type Interval string
const (
Unknown Interval = ""
Daily Interval = "daily"
Weekly Interval = "weekly"
Monthly Interval = "monthly"
Quarterly Interval = "quarterly"
Yearly Interval = "yearly"
)
func (p Interval) IsValid() bool {
switch p {
case Daily, Weekly, Monthly, Quarterly, Yearly:
return true
}
return false
}
type PartitionManager interface {
Go(ctx context.Context)
Check(ctx context.Context, now time.Time) error
Interval() Interval
ExistingPartitions(parts []database.PartitionResult) ([]Partition, error)
}
type partman struct {
db database.Store
cfg config.Partition
intv Interval
}
func (pm *partman) Interval() Interval {
return pm.intv
}
type Partition struct {
ParentTable string
Schema string
Name string
Interval Interval
Time time.Time
}
func New(db database.Store, cfg config.Partition) (*partman, error) {
pm := &partman{
cfg: cfg,
db: db,
intv: Interval(cfg.Interval),
}
if !pm.intv.IsValid() {
return nil, ErrInvalidInterval(pm.intv)
}
return pm, nil
}
var _ PartitionManager = (*partman)(nil)
func (pm *partman) Go(ctx context.Context) {
ctx = entities.CtxWithSubject(ctx, &entities.SystemServiceSubject{Name: "partman"})
tick := time.NewTicker(CheckInterval)
select {
case now := <-tick.C:
err := pm.Check(ctx, now)
if err != nil {
log.Error().Err(err).Msg("partman check failed")
}
case <-ctx.Done():
return
}
}
func (pm *partman) newPartition(t time.Time) Partition {
p := Partition{
ParentTable: CallsTable,
Schema: pm.cfg.Schema,
Interval: Interval(pm.cfg.Interval),
Time: t,
}
p.setName()
return p
}
func (pm *partman) retentionPartitions(cur Partition) []Partition {
partitions := make([]Partition, 0, pm.cfg.Retain)
for i := 1; i <= pm.cfg.Retain; i++ {
prev := cur.Prev(i)
partitions = append(partitions, prev)
}
return partitions
}
func (pm *partman) futurePartitions(cur Partition) []Partition {
preProv := preProvisionDefault
if pm.cfg.PreProvision != nil {
preProv = *pm.cfg.PreProvision
}
partitions := make([]Partition, 0, preProv)
for i := 1; i <= preProv; i++ {
next := cur.Next(i)
partitions = append(partitions, next)
}
return partitions
}
func (pm *partman) expectedPartitions(now time.Time) []Partition {
curPart := pm.newPartition(now)
shouldExist := []Partition{curPart}
if pm.cfg.Retain > -1 {
retain := pm.retentionPartitions(curPart)
shouldExist = append(shouldExist, retain...)
}
future := pm.futurePartitions(curPart)
shouldExist = append(shouldExist, future...)
return shouldExist
}
func (pm *partman) comparePartitions(existingTables, expectedTables []Partition) (unexpectedTables, missingTables []Partition) {
existing := make(map[string]Partition)
expectedAndExists := make(map[string]bool)
for _, t := range existingTables {
existing[t.PartitionName()] = t
}
for _, t := range expectedTables {
if _, found := existing[t.PartitionName()]; found {
expectedAndExists[t.PartitionName()] = true
} else {
missingTables = append(missingTables, t)
}
}
for _, t := range existingTables {
if _, found := expectedAndExists[t.PartitionName()]; !found {
// Only in existingTables and not in both
unexpectedTables = append(unexpectedTables, t)
}
}
return unexpectedTables, missingTables
}
func (pm *partman) ExistingPartitions(parts []database.PartitionResult) ([]Partition, error) {
existing := make([]Partition, 0, len(parts))
for _, v := range parts {
if v.Schema != pm.cfg.Schema {
return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema)
}
p, err := pm.verifyPartName(v)
if err != nil {
return nil, err
}
if p.Interval != Interval(pm.cfg.Interval) {
return nil, PartitionError(v.Schema+"."+v.Name, ErrDifferentInterval)
}
existing = append(existing, p)
}
return existing, nil
}
func (pm *partman) fullTableName(s string) string {
return fmt.Sprintf("%s.%s", pm.cfg.Schema, s)
}
func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p Partition) error {
s, e := p.Range()
start := pgtype.Timestamptz{Time: s, Valid: true}
end := pgtype.Timestamptz{Time: e, Valid: true}
fullPartName := pm.fullTableName(p.PartitionName())
// sweep calls that are referenced by an incident into swept_calls
swept, err := tx.SweepCalls(ctx, start, end)
if err != nil {
return err
}
log.Info().Int64("rows", swept).Time("start", s).Time("end", e).Msg("swept calls")
swept, err = tx.CleanupSweptCalls(ctx, start, end)
if err != nil {
return err
}
log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls")
log.Info().Str("partition", fullPartName).Msg("detaching partition")
err = tx.DetachPartition(ctx, CallsTable, fullPartName)
if err != nil {
return err
}
if pm.cfg.Drop {
log.Info().Str("partition", fullPartName).Msg("dropping partition")
return tx.DropPartition(ctx, fullPartName)
}
return nil
}
func (pm *partman) Check(ctx context.Context, now time.Time) error {
return pm.db.InTx(ctx, func(db database.Store) error {
// by default, we want to make sure a partition exists for this and next month
// since we run this at startup, it's safe to do only that.
partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, CallsTable)
if err != nil {
return err
}
existing, err := pm.ExistingPartitions(partitions)
if err != nil {
return err
}
expected := pm.expectedPartitions(now)
unexpected, missing := pm.comparePartitions(existing, expected)
if pm.cfg.Retain > -1 {
for _, p := range unexpected {
err := pm.prunePartition(ctx, db, p)
if err != nil {
return err
}
}
}
for _, p := range missing {
err := pm.createPartition(ctx, db, p)
if err != nil {
return err
}
}
return nil
}, pgx.TxOptions{})
}
func (p Partition) Range() (time.Time, time.Time) {
switch p.Interval {
case Daily:
return getDailyBounds(p.Time)
case Weekly:
return getWeeklyBounds(p.Time)
case Monthly:
return getMonthlyBounds(p.Time)
case Quarterly:
return getQuarterlyBounds(p.Time)
case Yearly:
return getYearlyBounds(p.Time)
}
panic("unknown interval!")
}
func (p Partition) PartitionName() string {
return p.Name
}
func (pm *partman) createPartition(ctx context.Context, tx database.Store, part Partition) error {
start, end := part.Range()
name := part.PartitionName()
log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition")
return tx.CreatePartition(ctx, CallsTable, name, start, end)
}
/*
* Partition scheme names:
* daily: calls_p_2024_11_28
* weekly: calls_p_2024_w48
* monthly: calls_p_2024_11
* quarterly: calls_p_2024_q4
* yearly: calls_p_2024
*/
func (pm *partman) verifyPartName(pr database.PartitionResult) (p Partition, err error) {
pn := pr.Name
low, _, err := pr.ParseBounds()
if err != nil {
return
}
p = Partition{
ParentTable: pr.ParentTable,
Name: pr.Name,
Schema: pr.Schema,
Time: low,
}
dateAr := strings.Split(pn, "calls_p_")
if len(dateAr) != 2 {
return p, PartitionError(pn)
}
dateAr = strings.Split(dateAr[1], "_")
switch len(dateAr) {
case 3: // daily
p.Interval = Daily
ymd := [3]int{}
for i := 0; i < 3; i++ {
r, err := strconv.Atoi(dateAr[i])
if err != nil {
return p, PartitionError(pn, err)
}
ymd[i] = r
}
parsed := time.Date(ymd[0], time.Month(ymd[1]), ymd[2], 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
case 2:
year, err := strconv.Atoi(dateAr[0])
if err != nil {
return p, PartitionError(pn, err)
}
if strings.HasPrefix(dateAr[1], "w") {
p.Interval = Weekly
weekNum, err := strconv.Atoi(dateAr[1][1:])
if err != nil {
return p, PartitionError(pn, err)
}
parsed := isoweek.StartTime(year, weekNum, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
} else if strings.HasPrefix(dateAr[1], "q") {
p.Interval = Quarterly
quarterNum, err := strconv.Atoi(dateAr[1][1:])
if err != nil {
return p, PartitionError(pn, err)
}
if quarterNum > 4 {
return p, PartitionError(pn, errors.New("invalid quarter"))
}
firstMonthOfTheQuarter := time.Month((quarterNum-1)*monthsInQuarter + 1)
parsed := time.Date(year, firstMonthOfTheQuarter, 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
}
// monthly
p.Interval = Monthly
month, err := strconv.Atoi(dateAr[1])
if err != nil {
return p, PartitionError(pn)
}
parsed := time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
case 1: // yearly
p.Interval = Yearly
year, err := strconv.Atoi(dateAr[0])
if err != nil {
return p, PartitionError(pn, err)
}
parsed := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC)
if parsed != p.Time {
return p, PartitionError(pn, ParsedIntvlErr{parsed: parsed, start: p.Time})
}
return p, nil
}
return p, PartitionError(pn)
}