package partman import ( "context" "fmt" "strconv" "strings" "time" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog/log" ) const ( callsTable = "calls" preProvisionDefault = 1 ) /* * Partition scheme names: * daily: calls_p_2024_11_28 * weekly: calls_p_2024_w48 * monthly: calls_p_2024_11 * quarterly: calls_p_2024_q4 * yearly: calls_p_2024 */ type PartitionError string func (pe PartitionError) Error() string { return fmt.Sprintf("bad partition '%s'", string(pe)) } type ErrInvalidInterval string func (in ErrInvalidInterval) Error() string { return fmt.Sprintf("invalid interval '%s'", string(in)) } type Interval string const ( Daily Interval = "daily" Weekly Interval = "weekly" Monthly Interval = "monthly" Quarterly Interval = "quarterly" Yearly Interval = "yearly" ) func (p Interval) IsValid() bool { switch p { case Daily, Weekly, Monthly, Quarterly, Yearly: return true } return false } type PartitionManager interface { Go(ctx context.Context) Check(ctx context.Context, now time.Time) error } type partman struct { db database.Store cfg config.Partition intv Interval } type partition interface { PartitionName() string Next(i int) partition Prev(i int) partition Range() (time.Time, time.Time) } type monthlyPartition struct { t time.Time } func (d monthlyPartition) PartitionName() string { return fmt.Sprintf("calls_p_%d_%02d", d.t.Year(), d.t.Month()) } func (d monthlyPartition) Next(i int) partition { return d.next(i) } func (d monthlyPartition) Prev(i int) partition { return d.prev(i) } func (d monthlyPartition) Range() (start, end time.Time) { start = time.Date(d.t.Year(), d.t.Month(), 1, 0, 0, 0, 0, time.UTC) end = start.AddDate(0, 1, 0) return } func (d monthlyPartition) next(i int) monthlyPartition { year, month, _ := d.t.Date() return monthlyPartition{ t: time.Date(year, month+time.Month(i), 1, 0, 0, 0, 0, time.UTC), } } func (d monthlyPartition) prev(i int) monthlyPartition { year, month, _ := d.t.Date() return monthlyPartition{ t: time.Date(year, month-time.Month(i), 1, 0, 0, 0, 0, d.t.Location()), } } func New(db database.Store, cfg config.Partition) (*partman, error) { pm := &partman{ cfg: cfg, db: db, intv: Interval(cfg.Interval), } if !pm.intv.IsValid() { return nil, ErrInvalidInterval(pm.intv) } return pm, nil } var _ PartitionManager = (*partman)(nil) func (pm *partman) Go(ctx context.Context) { go func(ctx context.Context) { tick := time.NewTicker(60 * time.Minute) select { case now := <-tick.C: err := pm.Check(ctx, now) if err != nil { log.Error().Err(err).Msg("partman check failed") } case <-ctx.Done(): return } }(ctx) } func (pm *partman) newPartition(t time.Time) partition { switch pm.intv { case Monthly: return monthlyPartition{t} } return nil } func (pm *partman) retentionPartitions(cur partition) []partition { partitions := make([]partition, 0, pm.cfg.Retain) for i := 1; i <= pm.cfg.Retain; i++ { prev := cur.Prev(i) partitions = append(partitions, prev) } return partitions } func (pm *partman) futurePartitions(cur partition) []partition { preProv := preProvisionDefault if pm.cfg.PreProvision != nil { preProv = *pm.cfg.PreProvision } partitions := make([]partition, 0, pm.cfg.Retain) for i := 1; i <= preProv; i++ { next := cur.Next(i) partitions = append(partitions, next) } return partitions } func (pm *partman) expectedPartitions(now time.Time) []partition { curPart := pm.newPartition(now) retain := pm.retentionPartitions(curPart) future := pm.futurePartitions(curPart) shouldExist := append(retain, curPart) shouldExist = append(shouldExist, future...) return shouldExist } func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) { existing := make(map[string]partition) expectedAndExists := make(map[string]bool) for _, t := range existingTables { existing[t.PartitionName()] = t } for _, t := range expectedTables { if _, found := existing[t.PartitionName()]; found { expectedAndExists[t.PartitionName()] = true } else { missingTables = append(missingTables, t) } } for _, t := range existingTables { if _, found := expectedAndExists[t.PartitionName()]; !found { // Only in existingTables and not in both unexpectedTables = append(unexpectedTables, t) } } return unexpectedTables, missingTables } func (pm *partman) existingPartitions(parts []string) ([]partition, error) { existing := make([]partition, 0, len(parts)) for _, v := range parts { p, err := parsePartName(v) if err != nil { return nil, err } existing = append(existing, p) } return existing, nil } func (pm *partman) fullTableName(s string) string { return fmt.Sprintf("%s.%s", pm.cfg.Schema, s) } func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error { s, e := p.Range() start := pgtype.Timestamptz{Time: s, Valid: true} end := pgtype.Timestamptz{Time: e, Valid: true} err := tx.SweepCalls(ctx, start, end) if err != nil { return err } err = tx.CleanupSweptCalls(ctx, start, end) if err != nil { return err } err = tx.DetachPartition(ctx, pm.fullTableName(p.PartitionName())) if err != nil { return err } if pm.cfg.Drop { return tx.DropPartition(ctx, pm.fullTableName(p.PartitionName())) } return nil } func (pm *partman) Check(ctx context.Context, now time.Time) error { return pm.db.InTx(ctx, func(db database.Store) error { // by default, we want to make sure a partition exists for this and next month // since we run this at startup, it's safe to do only that. partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable) if err != nil { return err } existing, err := pm.existingPartitions(partitions) if err != nil { return err } expected := pm.expectedPartitions(now) unexpected, missing := pm.comparePartitions(existing, expected) for _, p := range unexpected { err := pm.prunePartition(ctx, db, p) if err != nil { return err } } for _, p := range missing { err := pm.createPartition(ctx, db, p) if err != nil { return err } } return nil }, pgx.TxOptions{}) } func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error { start, end := part.Range() return tx.CreatePartition(ctx, callsTable, part.PartitionName(), start, end) } func parsePartName(p string) (partition, error) { dateAr := strings.Split(p, "calls_p_") if len(dateAr) != 2 { return nil, PartitionError(p) } dateAr = strings.Split(dateAr[1], "_") if len(dateAr) != 2 { return nil, PartitionError(p) } year, err := strconv.Atoi(dateAr[0]) if err != nil { return nil, PartitionError(p) } month, err := strconv.Atoi(dateAr[1]) if err != nil { return nil, PartitionError(p) } r := monthlyPartition{time.Date(year, time.Month(month), 1, 0, 0, 0, 0, time.UTC)} return r, nil } type partMap map[partition]struct{} func (pm partMap) exists(dt partition) bool { _, ex := pm[dt] return ex } func partitionsMap(partitions []string, mustExist map[partition]struct{}) (partMap, error) { partsDate := make(partMap, len(partitions)) for _, p := range partitions { dt, err := parsePartName(p) if err != nil { return nil, err } partsDate[dt] = struct{}{} } return partsDate, nil } func runRetention() { // make sure to check if partition was attached first // before dropping. don't want to accidentally drop pre-detached partitions. } func dropPart() { // intx // SweepCalls // DropPart }