stillbox/pkg/database/partman/partman.go

250 lines
4.6 KiB
Go
Raw Normal View History

2024-11-28 10:11:46 -05:00
package partman
import (
"fmt"
"context"
"strconv"
"strings"
"time"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type PartitionError string
func (pe PartitionError) Error() string {
return fmt.Sprintf("bad partition '%s'", string(pe))
}
type ErrInvalidInterval string
func (e ErrInvalidInterval) Error() string { return fmt.Sprintf("invalid interval '%s'", string(e)) }
type Interval string
const (
Daily Interval = "daily"
Weekly Interval = "weekly"
Monthly Interval = "monthly"
Quarterly Interval = "quarterly"
Yearly Interval = "yearly"
)
func (p Interval) IsValid() bool {
switch p {
case Daily, Weekly, Monthly, Quarterly, Yearly:
return true
}
return false
}
type PartitionManager interface {
Go(ctx context.Context)
Check(ctx context.Context) error
}
type partman struct {
db *database.Postgres
cfg config.Partition
intv Interval
}
type partition interface {
PartitionName() string
Next() partition
Range() (string, string)
}
type monthlyPartition struct {
year int
month time.Month
}
func (d monthlyPartition) PartitionName() string {
return fmt.Sprintf("calls_p_%d_%02d", d.year, d.month)
}
func (d monthlyPartition) Next() partition {
return d.next()
}
func (d monthlyPartition) next() monthlyPartition {
if d.month == time.December {
d.year++
d.month = time.January
} else {
d.month++
}
return d
}
func (d monthlyPartition) Range() (string, string) {
next := d.next()
return fmt.Sprintf("%d-%02d-01", d.year, d.month), fmt.Sprintf("%d-%02d-01", next.year, next.month)
}
func NewPartman(db *database.Postgres, cfg config.Partition) (*partman, error) {
pm := &partman{
cfg: cfg,
db: db,
intv: Interval(cfg.Interval),
}
if !pm.intv.IsValid() {
return nil, ErrInvalidInterval(pm.intv)
}
return pm, nil
}
var _ PartitionManager = (*partman)(nil)
func (pm *partman) Go(ctx context.Context) {
go func(ctx context.Context) {
tick := time.NewTicker(60*time.Minute)
select {
case <-tick.C:
err := pm.Check(ctx)
if err != nil {
log.Error().Err(err).Msg("partman check failed")
}
case <-ctx.Done():
return
}
}(ctx)
}
func (pm *partman) newPartition(t time.Time) partition {
return monthlyPartition{month: t.Month(), year: t.Year()}
}
func (pm *partman) Check(ctx context.Context) error {
err := pm.db.InTx(ctx, func(db database.Store) error {
// by default, we want to make sure a partition exists for this and next month
// since we run this at startup, it's safe to do only that.
schemaName, err := db.GetSearchPath(ctx)
if err != nil {
return err
}
partitions, err := db.GetCallsPartitions(ctx, &schemaName)
if err != nil {
return err
}
// this could be done in SQL
partsMap, err := partitionsMap(partitions)
if err != nil {
return err
}
now := time.Now()
thisPart := pm.newPartition(now)
nextPart := thisPart.Next()
mustExist := []partition{thisPart, nextPart}
for _, ch := range mustExist {
if !partsMap.exists(ch) {
err := pm.createPartition(ctx, db, ch)
if err != nil {
return err
}
log.Info().Str("part", ch.PartitionName()).Msg("created partition")
}
}
return nil
}, pgx.TxOptions{})
if err != nil {
return err
}
return nil
}
func parsePartName(p *string) (monthlyPartition, error) {
r := monthlyPartition{}
dateAr := strings.Split(*p, "calls_p_")
if len(dateAr) != 2 {
return r, PartitionError(*p)
}
dateAr = strings.Split(dateAr[1], "_")
if len(dateAr) != 2 {
return r, PartitionError(*p)
}
year, err := strconv.Atoi(dateAr[0])
if err != nil {
return r, PartitionError(*p)
}
r.year = year
month, err := strconv.Atoi(dateAr[1])
if err != nil {
return r, PartitionError(*p)
}
r.month = time.Month(month)
return r, nil
}
type partMap map[partition]struct{}
func (pm partMap) exists(dt partition) bool {
_ , ex := pm[dt]
return ex
}
func partitionsMap(partitions []*string) (partMap, error) {
partsDate := make(partMap, len(partitions))
for _, p := range partitions {
if p == nil {
panic(PartitionError("<nil>"))
}
dt, err := parsePartName(p)
if err != nil {
return nil, err
}
partsDate[dt] = struct{}{}
}
return partsDate, nil
}
func (pm *partman) createPartition(ctx context.Context, db database.Store, d partition) error {
t, n := d.Range()
_, err := db.DBTX().Exec(ctx, fmt.Sprintf(`CREATE TABLE %s PARTITION OF calls FOR VALUES FROM ($1) TO ($2);`, d.PartitionName()), t, n)
if err != nil {
return err
}
return nil
}
func monthPart(m time.Month, y int) monthlyPartition {
return monthlyPartition{year: y, month: m}
}