switchpart #62

Merged
amigan merged 2 commits from switchpart into trunk 2024-12-01 17:26:11 -05:00
7 changed files with 334 additions and 213 deletions
Showing only changes of commit fa709665a0 - Show all commits

View file

@ -52,7 +52,7 @@ func main() {
}, },
Commands: []*cli.Command{ Commands: []*cli.Command{
serve.Command(cfg), serve.Command(cfg),
admin.Command(cfg), admin.AdminCommand(cfg),
}, },
} }

View file

@ -1,195 +1,22 @@
package admin package admin
import ( import (
"context"
"errors"
"fmt"
"syscall"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/crypto/bcrypt"
"golang.org/x/term"
) )
const ( // AdminCommand is the admin command.
PromptPassword = "Password: " func AdminCommand(cfg *config.Configuration) *cli.Command {
PromptAgain = "Again: "
)
var (
ErrDontMatch = errors.New("passwords do not match")
ErrInvalidArguments = errors.New("invalid arguments")
)
// AddUser adds a new user to the database. It asks for the password on the terminal.
func AddUser(ctx context.Context, username, email string, isAdmin bool) error {
if username == "" || email == "" {
return ErrInvalidArguments
}
db := database.FromCtx(ctx)
pw, err := readPassword(PromptPassword)
if err != nil {
return err
}
pwAgain, err := readPassword(PromptAgain)
if err != nil {
return err
}
if pwAgain != pw {
return ErrDontMatch
}
if pw == "" {
return ErrInvalidArguments
}
hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost)
if err != nil {
return err
}
_, err = db.CreateUser(context.Background(), database.CreateUserParams{
Username: username,
Password: string(hashpw),
Email: email,
IsAdmin: isAdmin,
})
return err
}
// Passwd changes a user's password. It asks for the password on the terminal.
func Passwd(ctx context.Context, username string) error {
if username == "" {
return ErrInvalidArguments
}
db := database.FromCtx(ctx)
_, err := db.GetUserByUsername(ctx, username)
if err != nil && database.IsNoRows(err) {
return fmt.Errorf("no such user %s", username)
}
if err != nil {
return err
}
pw, err := readPassword(PromptPassword)
if err != nil {
return err
}
pwAgain, err := readPassword(PromptAgain)
if err != nil {
return err
}
if pwAgain != pw {
return ErrDontMatch
}
if pw == "" {
return ErrInvalidArguments
}
hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost)
if err != nil {
return err
}
return db.UpdatePassword(context.Background(), username, string(hashpw))
}
func readPassword(prompt string) (string, error) {
fmt.Print(prompt)
pw, err := term.ReadPassword(int(syscall.Stdin))
fmt.Println()
return string(pw), err
}
// Command is the users command.
func Command(cfg *config.Configuration) *cli.Command {
c := &cfg.Config
userCmd := &cli.Command{ userCmd := &cli.Command{
Name: "users", Name: "admin",
Aliases: []string{"u"}, Aliases: []string{"a"},
Usage: "administers users", Usage: "administers stillbox",
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
addUserCommand(c), UsersCommand(cfg),
passwdCommand(c), DatabaseCommand(cfg),
}, },
} }
return userCmd return userCmd
} }
func addUserCommand(cfg *config.Config) *cli.Command {
c := &cli.Command{
Name: "add",
Description: "adds a user",
UsageText: "stillbox users add [-a] [-m email] [username]",
Args: true,
Action: func(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return errors.New(ctx.Command.Usage)
}
db, err := database.NewClient(context.Background(), cfg.DB)
if err != nil {
return err
}
username := ctx.Args().Get(0)
isAdmin := ctx.Bool("admin")
email := ctx.String("email")
return AddUser(database.CtxWithDB(context.Background(), db), username, email, isAdmin)
},
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "admin",
Aliases: []string{"a"},
Value: false,
Usage: "user is an admin",
},
&cli.StringFlag{
Name: "email",
Usage: "email address",
Aliases: []string{"m"},
},
},
}
return c
}
func passwdCommand(cfg *config.Config) *cli.Command {
c := &cli.Command{
Name: "passwd",
Usage: "changes password for a user",
UsageText: "stillbox users passwd [username]",
Args: true,
Action: func(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return errors.New(ctx.Command.Usage)
}
db, err := database.NewClient(context.Background(), cfg.DB)
if err != nil {
return err
}
username := ctx.Args().Get(0)
return Passwd(database.CtxWithDB(context.Background(), db), username)
},
}
return c
}

99
pkg/cmd/admin/database.go Normal file
View file

@ -0,0 +1,99 @@
package admin
import (
"context"
"errors"
"fmt"
"os"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/pkg/database/partman"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/urfave/cli/v2"
)
// DatabaseCommand is the Database command.
func DatabaseCommand(cfg *config.Configuration) *cli.Command {
c := &cfg.Config
userCmd := &cli.Command{
Name: "database",
Aliases: []string{"db"},
Usage: "administers database",
Subcommands: []*cli.Command{
partitioningCommand(c),
},
}
return userCmd
}
func partitioningCommand(cfg *config.Config) *cli.Command {
c := &cli.Command{
Name: "partitioning",
Aliases: []string{"part"},
// someday this will say "changes" instead of "checks"
Usage: "checks partition interval",
Description: "checks partition interval matches whatever is specified in the config",
UsageText: "stillbox admin database partitioning",
Args: true,
Action: func(cctx *cli.Context) error {
ctx := context.Background()
db, err := database.NewClient(ctx, cfg.DB)
if err != nil {
return err
}
return Repartition(ctx, db, cfg.DB.Partition)
},
}
return c
}
func Repartition(ctx context.Context, db database.Store, cfg config.Partition) error {
cfgIntv := partman.Interval(cfg.Interval)
if !cfgIntv.IsValid() {
return fmt.Errorf("invalid partitioning interval '%s'", string(cfgIntv))
}
pm, err := partman.New(db, cfg)
if err != nil {
return err
}
return db.InTx(ctx, func(db database.Store) error {
parts, err := db.GetTablePartitions(ctx, cfg.Schema, partman.CallsTable)
if err != nil {
return err
}
exist, err := pm.ExistingPartitions(parts)
if err != nil {
return err
}
if len(exist) < 1 {
return errors.New("no partitions found")
}
intv := exist[0].Interval
for _, p := range exist {
if p.Interval != intv {
return errors.New("inconsistent partition intervals found")
}
}
if cfgIntv == intv {
fmt.Fprintf(os.Stderr, "config interval '%s' and all extant, attached partitions agree; doing nothing\n", string(cfgIntv))
return nil
}
panic("not implemented")
return nil
}, pgx.TxOptions{})
}

195
pkg/cmd/admin/users.go Normal file
View file

@ -0,0 +1,195 @@
package admin
import (
"context"
"errors"
"fmt"
"syscall"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"github.com/urfave/cli/v2"
"golang.org/x/crypto/bcrypt"
"golang.org/x/term"
)
const (
PromptPassword = "Password: "
PromptAgain = "Again: "
)
var (
ErrDontMatch = errors.New("passwords do not match")
ErrInvalidArguments = errors.New("invalid arguments")
)
// AddUser adds a new user to the database. It asks for the password on the terminal.
func AddUser(ctx context.Context, username, email string, isAdmin bool) error {
if username == "" || email == "" {
return ErrInvalidArguments
}
db := database.FromCtx(ctx)
pw, err := readPassword(PromptPassword)
if err != nil {
return err
}
pwAgain, err := readPassword(PromptAgain)
if err != nil {
return err
}
if pwAgain != pw {
return ErrDontMatch
}
if pw == "" {
return ErrInvalidArguments
}
hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost)
if err != nil {
return err
}
_, err = db.CreateUser(context.Background(), database.CreateUserParams{
Username: username,
Password: string(hashpw),
Email: email,
IsAdmin: isAdmin,
})
return err
}
// Passwd changes a user's password. It asks for the password on the terminal.
func Passwd(ctx context.Context, username string) error {
if username == "" {
return ErrInvalidArguments
}
db := database.FromCtx(ctx)
_, err := db.GetUserByUsername(ctx, username)
if err != nil && database.IsNoRows(err) {
return fmt.Errorf("no such user %s", username)
}
if err != nil {
return err
}
pw, err := readPassword(PromptPassword)
if err != nil {
return err
}
pwAgain, err := readPassword(PromptAgain)
if err != nil {
return err
}
if pwAgain != pw {
return ErrDontMatch
}
if pw == "" {
return ErrInvalidArguments
}
hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost)
if err != nil {
return err
}
return db.UpdatePassword(context.Background(), username, string(hashpw))
}
func readPassword(prompt string) (string, error) {
fmt.Print(prompt)
pw, err := term.ReadPassword(int(syscall.Stdin))
fmt.Println()
return string(pw), err
}
// UsersCommand is the users command.
func UsersCommand(cfg *config.Configuration) *cli.Command {
c := &cfg.Config
userCmd := &cli.Command{
Name: "users",
Aliases: []string{"u"},
Usage: "administers users",
Subcommands: []*cli.Command{
addUserCommand(c),
passwdCommand(c),
},
}
return userCmd
}
func addUserCommand(cfg *config.Config) *cli.Command {
c := &cli.Command{
Name: "add",
Description: "adds a user",
UsageText: "stillbox users add [-a] [-m email] [username]",
Args: true,
Action: func(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return errors.New(ctx.Command.Usage)
}
db, err := database.NewClient(context.Background(), cfg.DB)
if err != nil {
return err
}
username := ctx.Args().Get(0)
isAdmin := ctx.Bool("admin")
email := ctx.String("email")
return AddUser(database.CtxWithDB(context.Background(), db), username, email, isAdmin)
},
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "admin",
Aliases: []string{"a"},
Value: false,
Usage: "user is an admin",
},
&cli.StringFlag{
Name: "email",
Usage: "email address",
Aliases: []string{"m"},
},
},
}
return c
}
func passwdCommand(cfg *config.Config) *cli.Command {
c := &cli.Command{
Name: "passwd",
Usage: "changes password for a user",
UsageText: "stillbox users passwd [username]",
Args: true,
Action: func(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return errors.New(ctx.Command.Usage)
}
db, err := database.NewClient(context.Background(), cfg.DB)
if err != nil {
return err
}
username := ctx.Args().Get(0)
return Passwd(database.CtxWithDB(context.Background(), db), username)
},
}
return c
}

View file

@ -50,7 +50,7 @@ func getYearlyBounds(date time.Time) (lowerBound, upperBound time.Time) {
return return
} }
func (p partition) Next(i int) partition { func (p Partition) Next(i int) Partition {
var t time.Time var t time.Time
switch p.Interval { switch p.Interval {
case Daily: case Daily:
@ -68,7 +68,7 @@ func (p partition) Next(i int) partition {
t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location()) t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location())
} }
np := partition{ np := Partition{
ParentTable: p.ParentTable, ParentTable: p.ParentTable,
Name: p.Name, Name: p.Name,
Schema: p.Schema, Schema: p.Schema,
@ -81,7 +81,7 @@ func (p partition) Next(i int) partition {
return np return np
} }
func (p *partition) setName() { func (p *Partition) setName() {
t := p.Time t := p.Time
var suffix string var suffix string
@ -119,7 +119,7 @@ func (p *partition) setName() {
p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix) p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix)
} }
func (p partition) Prev(i int) partition { func (p Partition) Prev(i int) Partition {
var t time.Time var t time.Time
switch p.Interval { switch p.Interval {
case Daily: case Daily:
@ -138,7 +138,7 @@ func (p partition) Prev(i int) partition {
t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location()) t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location())
} }
pp := partition{ pp := Partition{
ParentTable: p.ParentTable, ParentTable: p.ParentTable,
Name: p.Name, Name: p.Name,
Schema: p.Schema, Schema: p.Schema,

View file

@ -20,7 +20,7 @@ import (
) )
const ( const (
callsTable = "calls" CallsTable = "calls"
preProvisionDefault = 1 preProvisionDefault = 1
) )
@ -93,6 +93,7 @@ func (p Interval) IsValid() bool {
type PartitionManager interface { type PartitionManager interface {
Go(ctx context.Context) Go(ctx context.Context)
Check(ctx context.Context, now time.Time) error Check(ctx context.Context, now time.Time) error
ExistingPartitions(parts []database.PartitionResult) ([]Partition, error)
} }
type partman struct { type partman struct {
@ -101,7 +102,7 @@ type partman struct {
intv Interval intv Interval
} }
type partition struct { type Partition struct {
ParentTable string ParentTable string
Schema string Schema string
Name string Name string
@ -139,9 +140,9 @@ func (pm *partman) Go(ctx context.Context) {
} }
} }
func (pm *partman) newPartition(t time.Time) partition { func (pm *partman) newPartition(t time.Time) Partition {
p := partition{ p := Partition{
ParentTable: callsTable, ParentTable: CallsTable,
Schema: pm.cfg.Schema, Schema: pm.cfg.Schema,
Interval: Interval(pm.cfg.Interval), Interval: Interval(pm.cfg.Interval),
Time: t, Time: t,
@ -152,8 +153,8 @@ func (pm *partman) newPartition(t time.Time) partition {
return p return p
} }
func (pm *partman) retentionPartitions(cur partition) []partition { func (pm *partman) retentionPartitions(cur Partition) []Partition {
partitions := make([]partition, 0, pm.cfg.Retain) partitions := make([]Partition, 0, pm.cfg.Retain)
for i := 1; i <= pm.cfg.Retain; i++ { for i := 1; i <= pm.cfg.Retain; i++ {
prev := cur.Prev(i) prev := cur.Prev(i)
partitions = append(partitions, prev) partitions = append(partitions, prev)
@ -162,13 +163,13 @@ func (pm *partman) retentionPartitions(cur partition) []partition {
return partitions return partitions
} }
func (pm *partman) futurePartitions(cur partition) []partition { func (pm *partman) futurePartitions(cur Partition) []Partition {
preProv := preProvisionDefault preProv := preProvisionDefault
if pm.cfg.PreProvision != nil { if pm.cfg.PreProvision != nil {
preProv = *pm.cfg.PreProvision preProv = *pm.cfg.PreProvision
} }
partitions := make([]partition, 0, preProv) partitions := make([]Partition, 0, preProv)
for i := 1; i <= preProv; i++ { for i := 1; i <= preProv; i++ {
next := cur.Next(i) next := cur.Next(i)
partitions = append(partitions, next) partitions = append(partitions, next)
@ -177,10 +178,10 @@ func (pm *partman) futurePartitions(cur partition) []partition {
return partitions return partitions
} }
func (pm *partman) expectedPartitions(now time.Time) []partition { func (pm *partman) expectedPartitions(now time.Time) []Partition {
curPart := pm.newPartition(now) curPart := pm.newPartition(now)
shouldExist := []partition{curPart} shouldExist := []Partition{curPart}
if pm.cfg.Retain > -1 { if pm.cfg.Retain > -1 {
retain := pm.retentionPartitions(curPart) retain := pm.retentionPartitions(curPart)
shouldExist = append(shouldExist, retain...) shouldExist = append(shouldExist, retain...)
@ -193,8 +194,8 @@ func (pm *partman) expectedPartitions(now time.Time) []partition {
return shouldExist return shouldExist
} }
func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) { func (pm *partman) comparePartitions(existingTables, expectedTables []Partition) (unexpectedTables, missingTables []Partition) {
existing := make(map[string]partition) existing := make(map[string]Partition)
expectedAndExists := make(map[string]bool) expectedAndExists := make(map[string]bool)
for _, t := range existingTables { for _, t := range existingTables {
@ -219,8 +220,8 @@ func (pm *partman) comparePartitions(existingTables, expectedTables []partition)
return unexpectedTables, missingTables return unexpectedTables, missingTables
} }
func (pm *partman) existingPartitions(parts []database.PartitionResult) ([]partition, error) { func (pm *partman) ExistingPartitions(parts []database.PartitionResult) ([]Partition, error) {
existing := make([]partition, 0, len(parts)) existing := make([]Partition, 0, len(parts))
for _, v := range parts { for _, v := range parts {
if v.Schema != pm.cfg.Schema { if v.Schema != pm.cfg.Schema {
return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema) return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema)
@ -243,7 +244,7 @@ func (pm *partman) fullTableName(s string) string {
return fmt.Sprintf("%s.%s", pm.cfg.Schema, s) return fmt.Sprintf("%s.%s", pm.cfg.Schema, s)
} }
func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error { func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p Partition) error {
s, e := p.Range() s, e := p.Range()
start := pgtype.Timestamptz{Time: s, Valid: true} start := pgtype.Timestamptz{Time: s, Valid: true}
end := pgtype.Timestamptz{Time: e, Valid: true} end := pgtype.Timestamptz{Time: e, Valid: true}
@ -262,7 +263,7 @@ func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p part
log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls") log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls")
log.Info().Str("partition", fullPartName).Msg("detaching partition") log.Info().Str("partition", fullPartName).Msg("detaching partition")
err = tx.DetachPartition(ctx, callsTable, fullPartName) err = tx.DetachPartition(ctx, CallsTable, fullPartName)
if err != nil { if err != nil {
return err return err
} }
@ -279,12 +280,12 @@ func (pm *partman) Check(ctx context.Context, now time.Time) error {
return pm.db.InTx(ctx, func(db database.Store) error { return pm.db.InTx(ctx, func(db database.Store) error {
// by default, we want to make sure a partition exists for this and next month // by default, we want to make sure a partition exists for this and next month
// since we run this at startup, it's safe to do only that. // since we run this at startup, it's safe to do only that.
partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable) partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, CallsTable)
if err != nil { if err != nil {
return err return err
} }
existing, err := pm.existingPartitions(partitions) existing, err := pm.ExistingPartitions(partitions)
if err != nil { if err != nil {
return err return err
} }
@ -314,7 +315,7 @@ func (pm *partman) Check(ctx context.Context, now time.Time) error {
}, pgx.TxOptions{}) }, pgx.TxOptions{})
} }
func (p partition) Range() (time.Time, time.Time) { func (p Partition) Range() (time.Time, time.Time) {
switch p.Interval { switch p.Interval {
case Daily: case Daily:
return getDailyBounds(p.Time) return getDailyBounds(p.Time)
@ -331,15 +332,15 @@ func (p partition) Range() (time.Time, time.Time) {
panic("unknown interval!") panic("unknown interval!")
} }
func (p partition) PartitionName() string { func (p Partition) PartitionName() string {
return p.Name return p.Name
} }
func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error { func (pm *partman) createPartition(ctx context.Context, tx database.Store, part Partition) error {
start, end := part.Range() start, end := part.Range()
name := part.PartitionName() name := part.PartitionName()
log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition") log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition")
return tx.CreatePartition(ctx, callsTable, name, start, end) return tx.CreatePartition(ctx, CallsTable, name, start, end)
} }
/* /*
@ -351,13 +352,13 @@ func (pm *partman) createPartition(ctx context.Context, tx database.Store, part
* yearly: calls_p_2024 * yearly: calls_p_2024
*/ */
func (pm *partman) verifyPartName(pr database.PartitionResult) (p partition, err error) { func (pm *partman) verifyPartName(pr database.PartitionResult) (p Partition, err error) {
pn := pr.Name pn := pr.Name
low, _, err := pr.ParseBounds() low, _, err := pr.ParseBounds()
if err != nil { if err != nil {
return return
} }
p = partition{ p = Partition{
ParentTable: pr.ParentTable, ParentTable: pr.ParentTable,
Name: pr.Name, Name: pr.Name,
Schema: pr.Schema, Schema: pr.Schema,

View file

@ -34,7 +34,6 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
err := s.db.InTx(ctx, func(tx database.Store) error { err := s.db.InTx(ctx, func(tx database.Store) error {
err := tx.AddCall(ctx, params) err := tx.AddCall(ctx, params)
if err != nil { if err != nil {
return fmt.Errorf("add call: %w", err) return fmt.Errorf("add call: %w", err)
} }