diff --git a/cmd/stillbox/main.go b/cmd/stillbox/main.go index 32eec62..a0903f4 100644 --- a/cmd/stillbox/main.go +++ b/cmd/stillbox/main.go @@ -52,7 +52,7 @@ func main() { }, Commands: []*cli.Command{ serve.Command(cfg), - admin.Command(cfg), + admin.AdminCommand(cfg), }, } diff --git a/pkg/cmd/admin/admin.go b/pkg/cmd/admin/admin.go index 8a0aae8..a463abc 100644 --- a/pkg/cmd/admin/admin.go +++ b/pkg/cmd/admin/admin.go @@ -1,195 +1,22 @@ package admin import ( - "context" - "errors" - "fmt" - "syscall" - "dynatron.me/x/stillbox/pkg/config" - "dynatron.me/x/stillbox/pkg/database" + "github.com/urfave/cli/v2" - "golang.org/x/crypto/bcrypt" - "golang.org/x/term" ) -const ( - PromptPassword = "Password: " - PromptAgain = "Again: " -) - -var ( - ErrDontMatch = errors.New("passwords do not match") - ErrInvalidArguments = errors.New("invalid arguments") -) - -// AddUser adds a new user to the database. It asks for the password on the terminal. -func AddUser(ctx context.Context, username, email string, isAdmin bool) error { - if username == "" || email == "" { - return ErrInvalidArguments - } - - db := database.FromCtx(ctx) - - pw, err := readPassword(PromptPassword) - if err != nil { - return err - } - - pwAgain, err := readPassword(PromptAgain) - if err != nil { - return err - } - - if pwAgain != pw { - return ErrDontMatch - } - - if pw == "" { - return ErrInvalidArguments - } - - hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost) - if err != nil { - return err - } - - _, err = db.CreateUser(context.Background(), database.CreateUserParams{ - Username: username, - Password: string(hashpw), - Email: email, - IsAdmin: isAdmin, - }) - - return err -} - -// Passwd changes a user's password. It asks for the password on the terminal. -func Passwd(ctx context.Context, username string) error { - if username == "" { - return ErrInvalidArguments - } - - db := database.FromCtx(ctx) - - _, err := db.GetUserByUsername(ctx, username) - if err != nil && database.IsNoRows(err) { - return fmt.Errorf("no such user %s", username) - } - - if err != nil { - return err - } - - pw, err := readPassword(PromptPassword) - if err != nil { - return err - } - - pwAgain, err := readPassword(PromptAgain) - if err != nil { - return err - } - - if pwAgain != pw { - return ErrDontMatch - } - - if pw == "" { - return ErrInvalidArguments - } - - hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost) - if err != nil { - return err - } - - return db.UpdatePassword(context.Background(), username, string(hashpw)) -} - -func readPassword(prompt string) (string, error) { - fmt.Print(prompt) - pw, err := term.ReadPassword(int(syscall.Stdin)) - fmt.Println() - return string(pw), err -} - -// Command is the users command. -func Command(cfg *config.Configuration) *cli.Command { - c := &cfg.Config +// AdminCommand is the admin command. +func AdminCommand(cfg *config.Configuration) *cli.Command { userCmd := &cli.Command{ - Name: "users", - Aliases: []string{"u"}, - Usage: "administers users", + Name: "admin", + Aliases: []string{"a"}, + Usage: "administers stillbox", Subcommands: []*cli.Command{ - addUserCommand(c), - passwdCommand(c), + UsersCommand(cfg), + DatabaseCommand(cfg), }, } return userCmd } - -func addUserCommand(cfg *config.Config) *cli.Command { - c := &cli.Command{ - Name: "add", - Description: "adds a user", - UsageText: "stillbox users add [-a] [-m email] [username]", - Args: true, - Action: func(ctx *cli.Context) error { - if ctx.Args().Len() != 1 { - return errors.New(ctx.Command.Usage) - } - - db, err := database.NewClient(context.Background(), cfg.DB) - if err != nil { - return err - } - - username := ctx.Args().Get(0) - isAdmin := ctx.Bool("admin") - email := ctx.String("email") - - return AddUser(database.CtxWithDB(context.Background(), db), username, email, isAdmin) - }, - Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "admin", - Aliases: []string{"a"}, - Value: false, - Usage: "user is an admin", - }, - &cli.StringFlag{ - Name: "email", - Usage: "email address", - Aliases: []string{"m"}, - }, - }, - } - - return c -} - -func passwdCommand(cfg *config.Config) *cli.Command { - c := &cli.Command{ - Name: "passwd", - Usage: "changes password for a user", - UsageText: "stillbox users passwd [username]", - Args: true, - Action: func(ctx *cli.Context) error { - if ctx.Args().Len() != 1 { - return errors.New(ctx.Command.Usage) - } - - db, err := database.NewClient(context.Background(), cfg.DB) - if err != nil { - return err - } - username := ctx.Args().Get(0) - - return Passwd(database.CtxWithDB(context.Background(), db), username) - }, - } - - return c -} diff --git a/pkg/cmd/admin/database.go b/pkg/cmd/admin/database.go new file mode 100644 index 0000000..075715d --- /dev/null +++ b/pkg/cmd/admin/database.go @@ -0,0 +1,99 @@ +package admin + +import ( + "context" + "errors" + "fmt" + "os" + + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/database/partman" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/urfave/cli/v2" +) + +// DatabaseCommand is the Database command. +func DatabaseCommand(cfg *config.Configuration) *cli.Command { + c := &cfg.Config + userCmd := &cli.Command{ + Name: "database", + Aliases: []string{"db"}, + Usage: "administers database", + Subcommands: []*cli.Command{ + partitioningCommand(c), + }, + } + + return userCmd +} + +func partitioningCommand(cfg *config.Config) *cli.Command { + c := &cli.Command{ + Name: "partitioning", + Aliases: []string{"part"}, + // someday this will say "changes" instead of "checks" + Usage: "checks partition interval", + Description: "checks partition interval matches whatever is specified in the config", + UsageText: "stillbox admin database partitioning", + Args: true, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + db, err := database.NewClient(ctx, cfg.DB) + if err != nil { + return err + } + + return Repartition(ctx, db, cfg.DB.Partition) + }, + } + + return c +} + +func Repartition(ctx context.Context, db database.Store, cfg config.Partition) error { + cfgIntv := partman.Interval(cfg.Interval) + if !cfgIntv.IsValid() { + return fmt.Errorf("invalid partitioning interval '%s'", string(cfgIntv)) + } + + pm, err := partman.New(db, cfg) + if err != nil { + return err + } + + return db.InTx(ctx, func(db database.Store) error { + parts, err := db.GetTablePartitions(ctx, cfg.Schema, partman.CallsTable) + if err != nil { + return err + } + + exist, err := pm.ExistingPartitions(parts) + if err != nil { + return err + } + + if len(exist) < 1 { + return errors.New("no partitions found") + } + + intv := exist[0].Interval + for _, p := range exist { + if p.Interval != intv { + return errors.New("inconsistent partition intervals found") + } + } + + if cfgIntv == intv { + fmt.Fprintf(os.Stderr, "config interval '%s' and all extant, attached partitions agree; doing nothing\n", string(cfgIntv)) + + return nil + } + + panic("not implemented") + + return nil + }, pgx.TxOptions{}) +} diff --git a/pkg/cmd/admin/users.go b/pkg/cmd/admin/users.go new file mode 100644 index 0000000..fb608ff --- /dev/null +++ b/pkg/cmd/admin/users.go @@ -0,0 +1,195 @@ +package admin + +import ( + "context" + "errors" + "fmt" + "syscall" + + "dynatron.me/x/stillbox/pkg/config" + "dynatron.me/x/stillbox/pkg/database" + "github.com/urfave/cli/v2" + "golang.org/x/crypto/bcrypt" + "golang.org/x/term" +) + +const ( + PromptPassword = "Password: " + PromptAgain = "Again: " +) + +var ( + ErrDontMatch = errors.New("passwords do not match") + ErrInvalidArguments = errors.New("invalid arguments") +) + +// AddUser adds a new user to the database. It asks for the password on the terminal. +func AddUser(ctx context.Context, username, email string, isAdmin bool) error { + if username == "" || email == "" { + return ErrInvalidArguments + } + + db := database.FromCtx(ctx) + + pw, err := readPassword(PromptPassword) + if err != nil { + return err + } + + pwAgain, err := readPassword(PromptAgain) + if err != nil { + return err + } + + if pwAgain != pw { + return ErrDontMatch + } + + if pw == "" { + return ErrInvalidArguments + } + + hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost) + if err != nil { + return err + } + + _, err = db.CreateUser(context.Background(), database.CreateUserParams{ + Username: username, + Password: string(hashpw), + Email: email, + IsAdmin: isAdmin, + }) + + return err +} + +// Passwd changes a user's password. It asks for the password on the terminal. +func Passwd(ctx context.Context, username string) error { + if username == "" { + return ErrInvalidArguments + } + + db := database.FromCtx(ctx) + + _, err := db.GetUserByUsername(ctx, username) + if err != nil && database.IsNoRows(err) { + return fmt.Errorf("no such user %s", username) + } + + if err != nil { + return err + } + + pw, err := readPassword(PromptPassword) + if err != nil { + return err + } + + pwAgain, err := readPassword(PromptAgain) + if err != nil { + return err + } + + if pwAgain != pw { + return ErrDontMatch + } + + if pw == "" { + return ErrInvalidArguments + } + + hashpw, err := bcrypt.GenerateFromPassword([]byte(pw), bcrypt.DefaultCost) + if err != nil { + return err + } + + return db.UpdatePassword(context.Background(), username, string(hashpw)) +} + +func readPassword(prompt string) (string, error) { + fmt.Print(prompt) + pw, err := term.ReadPassword(int(syscall.Stdin)) + fmt.Println() + return string(pw), err +} + +// UsersCommand is the users command. +func UsersCommand(cfg *config.Configuration) *cli.Command { + c := &cfg.Config + userCmd := &cli.Command{ + Name: "users", + Aliases: []string{"u"}, + Usage: "administers users", + Subcommands: []*cli.Command{ + addUserCommand(c), + passwdCommand(c), + }, + } + + return userCmd +} + +func addUserCommand(cfg *config.Config) *cli.Command { + c := &cli.Command{ + Name: "add", + Description: "adds a user", + UsageText: "stillbox users add [-a] [-m email] [username]", + Args: true, + Action: func(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return errors.New(ctx.Command.Usage) + } + + db, err := database.NewClient(context.Background(), cfg.DB) + if err != nil { + return err + } + + username := ctx.Args().Get(0) + isAdmin := ctx.Bool("admin") + email := ctx.String("email") + + return AddUser(database.CtxWithDB(context.Background(), db), username, email, isAdmin) + }, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "admin", + Aliases: []string{"a"}, + Value: false, + Usage: "user is an admin", + }, + &cli.StringFlag{ + Name: "email", + Usage: "email address", + Aliases: []string{"m"}, + }, + }, + } + + return c +} + +func passwdCommand(cfg *config.Config) *cli.Command { + c := &cli.Command{ + Name: "passwd", + Usage: "changes password for a user", + UsageText: "stillbox users passwd [username]", + Args: true, + Action: func(ctx *cli.Context) error { + if ctx.Args().Len() != 1 { + return errors.New(ctx.Command.Usage) + } + + db, err := database.NewClient(context.Background(), cfg.DB) + if err != nil { + return err + } + username := ctx.Args().Get(0) + + return Passwd(database.CtxWithDB(context.Background(), db), username) + }, + } + + return c +} diff --git a/pkg/database/partman/intervals.go b/pkg/database/partman/intervals.go index b80d330..0252302 100644 --- a/pkg/database/partman/intervals.go +++ b/pkg/database/partman/intervals.go @@ -50,7 +50,7 @@ func getYearlyBounds(date time.Time) (lowerBound, upperBound time.Time) { return } -func (p partition) Next(i int) partition { +func (p Partition) Next(i int) Partition { var t time.Time switch p.Interval { case Daily: @@ -68,7 +68,7 @@ func (p partition) Next(i int) partition { t = time.Date(year+i, 1, 1, 0, 0, 0, 0, p.Time.Location()) } - np := partition{ + np := Partition{ ParentTable: p.ParentTable, Name: p.Name, Schema: p.Schema, @@ -81,7 +81,7 @@ func (p partition) Next(i int) partition { return np } -func (p *partition) setName() { +func (p *Partition) setName() { t := p.Time var suffix string @@ -119,7 +119,7 @@ func (p *partition) setName() { p.Name = fmt.Sprintf("%s_p_%s", p.ParentTable, suffix) } -func (p partition) Prev(i int) partition { +func (p Partition) Prev(i int) Partition { var t time.Time switch p.Interval { case Daily: @@ -138,7 +138,7 @@ func (p partition) Prev(i int) partition { t = time.Date(year-i, 1, 1, 0, 0, 0, 0, p.Time.Location()) } - pp := partition{ + pp := Partition{ ParentTable: p.ParentTable, Name: p.Name, Schema: p.Schema, diff --git a/pkg/database/partman/partman.go b/pkg/database/partman/partman.go index bb9608a..3676cec 100644 --- a/pkg/database/partman/partman.go +++ b/pkg/database/partman/partman.go @@ -20,7 +20,7 @@ import ( ) const ( - callsTable = "calls" + CallsTable = "calls" preProvisionDefault = 1 ) @@ -93,6 +93,7 @@ func (p Interval) IsValid() bool { type PartitionManager interface { Go(ctx context.Context) Check(ctx context.Context, now time.Time) error + ExistingPartitions(parts []database.PartitionResult) ([]Partition, error) } type partman struct { @@ -101,7 +102,7 @@ type partman struct { intv Interval } -type partition struct { +type Partition struct { ParentTable string Schema string Name string @@ -139,9 +140,9 @@ func (pm *partman) Go(ctx context.Context) { } } -func (pm *partman) newPartition(t time.Time) partition { - p := partition{ - ParentTable: callsTable, +func (pm *partman) newPartition(t time.Time) Partition { + p := Partition{ + ParentTable: CallsTable, Schema: pm.cfg.Schema, Interval: Interval(pm.cfg.Interval), Time: t, @@ -152,8 +153,8 @@ func (pm *partman) newPartition(t time.Time) partition { return p } -func (pm *partman) retentionPartitions(cur partition) []partition { - partitions := make([]partition, 0, pm.cfg.Retain) +func (pm *partman) retentionPartitions(cur Partition) []Partition { + partitions := make([]Partition, 0, pm.cfg.Retain) for i := 1; i <= pm.cfg.Retain; i++ { prev := cur.Prev(i) partitions = append(partitions, prev) @@ -162,13 +163,13 @@ func (pm *partman) retentionPartitions(cur partition) []partition { return partitions } -func (pm *partman) futurePartitions(cur partition) []partition { +func (pm *partman) futurePartitions(cur Partition) []Partition { preProv := preProvisionDefault if pm.cfg.PreProvision != nil { preProv = *pm.cfg.PreProvision } - partitions := make([]partition, 0, preProv) + partitions := make([]Partition, 0, preProv) for i := 1; i <= preProv; i++ { next := cur.Next(i) partitions = append(partitions, next) @@ -177,10 +178,10 @@ func (pm *partman) futurePartitions(cur partition) []partition { return partitions } -func (pm *partman) expectedPartitions(now time.Time) []partition { +func (pm *partman) expectedPartitions(now time.Time) []Partition { curPart := pm.newPartition(now) - shouldExist := []partition{curPart} + shouldExist := []Partition{curPart} if pm.cfg.Retain > -1 { retain := pm.retentionPartitions(curPart) shouldExist = append(shouldExist, retain...) @@ -193,8 +194,8 @@ func (pm *partman) expectedPartitions(now time.Time) []partition { return shouldExist } -func (pm *partman) comparePartitions(existingTables, expectedTables []partition) (unexpectedTables, missingTables []partition) { - existing := make(map[string]partition) +func (pm *partman) comparePartitions(existingTables, expectedTables []Partition) (unexpectedTables, missingTables []Partition) { + existing := make(map[string]Partition) expectedAndExists := make(map[string]bool) for _, t := range existingTables { @@ -219,8 +220,8 @@ func (pm *partman) comparePartitions(existingTables, expectedTables []partition) return unexpectedTables, missingTables } -func (pm *partman) existingPartitions(parts []database.PartitionResult) ([]partition, error) { - existing := make([]partition, 0, len(parts)) +func (pm *partman) ExistingPartitions(parts []database.PartitionResult) ([]Partition, error) { + existing := make([]Partition, 0, len(parts)) for _, v := range parts { if v.Schema != pm.cfg.Schema { return nil, PartitionError(v.Schema+"."+v.Name, ErrWrongSchema) @@ -243,7 +244,7 @@ func (pm *partman) fullTableName(s string) string { return fmt.Sprintf("%s.%s", pm.cfg.Schema, s) } -func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p partition) error { +func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p Partition) error { s, e := p.Range() start := pgtype.Timestamptz{Time: s, Valid: true} end := pgtype.Timestamptz{Time: e, Valid: true} @@ -262,7 +263,7 @@ func (pm *partman) prunePartition(ctx context.Context, tx database.Store, p part log.Debug().Int64("rows", swept).Time("start", s).Time("end", e).Msg("cleaned up swept calls") log.Info().Str("partition", fullPartName).Msg("detaching partition") - err = tx.DetachPartition(ctx, callsTable, fullPartName) + err = tx.DetachPartition(ctx, CallsTable, fullPartName) if err != nil { return err } @@ -279,12 +280,12 @@ func (pm *partman) Check(ctx context.Context, now time.Time) error { return pm.db.InTx(ctx, func(db database.Store) error { // by default, we want to make sure a partition exists for this and next month // since we run this at startup, it's safe to do only that. - partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, callsTable) + partitions, err := db.GetTablePartitions(ctx, pm.cfg.Schema, CallsTable) if err != nil { return err } - existing, err := pm.existingPartitions(partitions) + existing, err := pm.ExistingPartitions(partitions) if err != nil { return err } @@ -314,7 +315,7 @@ func (pm *partman) Check(ctx context.Context, now time.Time) error { }, pgx.TxOptions{}) } -func (p partition) Range() (time.Time, time.Time) { +func (p Partition) Range() (time.Time, time.Time) { switch p.Interval { case Daily: return getDailyBounds(p.Time) @@ -331,15 +332,15 @@ func (p partition) Range() (time.Time, time.Time) { panic("unknown interval!") } -func (p partition) PartitionName() string { +func (p Partition) PartitionName() string { return p.Name } -func (pm *partman) createPartition(ctx context.Context, tx database.Store, part partition) error { +func (pm *partman) createPartition(ctx context.Context, tx database.Store, part Partition) error { start, end := part.Range() name := part.PartitionName() log.Info().Str("partition", name).Time("start", start).Time("end", end).Msg("creating partition") - return tx.CreatePartition(ctx, callsTable, name, start, end) + return tx.CreatePartition(ctx, CallsTable, name, start, end) } /* @@ -351,13 +352,13 @@ func (pm *partman) createPartition(ctx context.Context, tx database.Store, part * yearly: calls_p_2024 */ -func (pm *partman) verifyPartName(pr database.PartitionResult) (p partition, err error) { +func (pm *partman) verifyPartName(pr database.PartitionResult) (p Partition, err error) { pn := pr.Name low, _, err := pr.ParseBounds() if err != nil { return } - p = partition{ + p = Partition{ ParentTable: pr.ParentTable, Name: pr.Name, Schema: pr.Schema, diff --git a/pkg/sinks/database.go b/pkg/sinks/database.go index 7b01794..68cf70b 100644 --- a/pkg/sinks/database.go +++ b/pkg/sinks/database.go @@ -34,7 +34,6 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { err := s.db.InTx(ctx, func(tx database.Store) error { err := tx.AddCall(ctx, params) if err != nil { - return fmt.Errorf("add call: %w", err) }