diff --git a/config.sample.yaml b/config.sample.yaml index 3b15d97..14a251b 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -29,3 +29,9 @@ alerting: lookbackDays: 7 halfLife: 30m recent: 2h + alertThreshold: 0.5 + renotify: 30m +notify: + - provider: slackwebhook + config: + webhookURL: "http://somewhere" diff --git a/go.mod b/go.mod index 345f628..36e4b52 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/lestrrat-go/option v1.0.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nikoksr/notify v1.0.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index f712c84..da2c282 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/nikoksr/notify v1.0.1 h1:HkUi4YHASwo3N8UEtDz9GRyEuGyX2Qwe9C6qKK24TYo= +github.com/nikoksr/notify v1.0.1/go.mod h1:w9zFImNfVM7l/gkKFAiyJITKzdC1GH458t4XqMkasZA= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= diff --git a/internal/common/common.go b/internal/common/common.go index 345fdd9..226b022 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -1,6 +1,9 @@ package common import ( + "fmt" + "strconv" + "github.com/spf13/cobra" ) @@ -42,3 +45,10 @@ func PtrOrNull[T comparable](val T) *T { return &val } + +func FmtFloat(v float64, places ...int) string { + if len(places) > 0 { + return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v) + } + return fmt.Sprintf("%.4f", v) +} diff --git a/pkg/gordio/alerting/alerting.go b/pkg/gordio/alerting/alerting.go index ad1db21..2136f90 100644 --- a/pkg/gordio/alerting/alerting.go +++ b/pkg/gordio/alerting/alerting.go @@ -1,25 +1,34 @@ package alerting import ( + "bytes" "context" + "fmt" + "net/http" "sort" + "strconv" "sync" + "text/template" "time" cl "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/database" + "dynatron.me/x/stillbox/pkg/gordio/notify" "dynatron.me/x/stillbox/pkg/gordio/sinks" "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" + "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" ) const ( ScoreThreshold = -1 CountThreshold = 1.0 + NotificationSubject = "Stillbox Alert" + DefaultRenotify = 30 * time.Minute alerterTickInterval = time.Minute ) @@ -34,12 +43,15 @@ type Alerter interface { type alerter struct { sync.RWMutex - clock timeseries.Clock - cfg config.Alerting - scorer trending.Scorer[cl.Talkgroup] - scores trending.Scores[cl.Talkgroup] - lastScore time.Time - sim *Simulation + clock timeseries.Clock + cfg config.Alerting + scorer trending.Scorer[cl.Talkgroup] + scores trending.Scores[cl.Talkgroup] + lastScore time.Time + sim *Simulation + notifyCache map[cl.Talkgroup]time.Time + renotify time.Duration + notifier notify.Notifier } type offsetClock time.Duration @@ -66,6 +78,13 @@ func WithClock(clock timeseries.Clock) AlertOption { } } +// WithNotifier sets the notifier +func WithNotifier(n notify.Notifier) AlertOption { + return func(as *alerter) { + as.notifier = n + } +} + // New creates a new Alerter using the provided configuration. func New(cfg config.Alerting, opts ...AlertOption) Alerter { if !cfg.Enable { @@ -73,8 +92,14 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter { } as := &alerter{ - cfg: cfg, - clock: timeseries.DefaultClock, + cfg: cfg, + notifyCache: make(map[cl.Talkgroup]time.Time), + clock: timeseries.DefaultClock, + renotify: DefaultRenotify, + } + + if cfg.Renotify != nil { + as.renotify = cfg.Renotify.Duration() } for _, opt := range opts { @@ -105,6 +130,11 @@ func (as *alerter) Go(ctx context.Context) { select { case now := <-ticker.C: as.score(ctx, now) + err := as.notify(ctx) + if err != nil { + log.Error().Err(err).Msg("notify") + } + as.cleanCache() case <-ctx.Done(): ticker.Stop() return @@ -113,6 +143,145 @@ func (as *alerter) Go(ctx context.Context) { } +const notificationTemplStr = `{{ range . -}} +{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }}) +{{ end }}` + +var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) + +func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { + as.RLock() + defer as.RUnlock() + ns := make([]notification, 0, len(as.scores)) + ctx := r.Context() + + for _, s := range as.scores { + n, err := makeNotification(ctx, s) + if err != nil { + log.Error().Err(err).Msg("test notificaiton") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + ns = append(ns, n) + } + + err := as.sendNotification(ctx, ns) + if err != nil { + log.Error().Err(err).Msg("test notification send") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Write([]byte("Sent")) +} + +// notify iterates the scores and sends out any necessary notifications +func (as *alerter) notify(ctx context.Context) error { + if as.notifier == nil { + return nil + } + + now := time.Now() + + as.Lock() + defer as.Unlock() + + var notifications []notification + for _, s := range as.scores { + if s.Score > as.cfg.AlertThreshold { + if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify { + as.notifyCache[s.ID] = time.Now() + n, err := makeNotification(ctx, s) + if err != nil { + return err + } + + notifications = append(notifications, n) + } + } + } + + if len(notifications) > 0 { + return as.sendNotification(ctx, notifications) + } + + return nil +} + +type notification struct { + TGName string + Score trending.Score[cl.Talkgroup] +} + +// sendNotification renders and sends the notification. +func (as *alerter) sendNotification(ctx context.Context, n []notification) error { + msgBuffer := new(bytes.Buffer) + + err := notificationTemplate.Execute(msgBuffer, n) + if err != nil { + return fmt.Errorf("notification template render: %w", err) + } + + log.Debug().Str("msg", msgBuffer.String()).Msg("notifying") + + return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String()) +} + +// makeNotification creates a notification for later rendering by the template. +// It takes a talkgroup Score as input. +func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) { + d := notification{ + Score: tg, + } + + db := database.FromCtx(ctx) + tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup)) + switch err { + case nil: + if tgRecord.SystemName == "" { + tgRecord.SystemName = strconv.Itoa(int(tg.ID.System)) + } + + if tgRecord.Name != nil { + d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name) + } else { + d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup)) + } + case pgx.ErrNoRows: + system, err := db.GetSystemName(ctx, int(tg.ID.System)) + switch err { + case nil: + d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup)) + case pgx.ErrNoRows: + d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup)) + default: + return d, fmt.Errorf("sendNotification get system: %w", err) + } + default: + return d, fmt.Errorf("sendNotification get talkgroup: %w", err) + } + + return d, nil +} + +// cleanCache clears the cache of aged-out entries +func (as *alerter) cleanCache() { + if as.notifier == nil { + return + } + + now := time.Now() + + as.Lock() + defer as.Unlock() + + for k, t := range as.notifyCache { + if now.Sub(t) > as.renotify { + delete(as.notifyCache, k) + } + } +} + func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( []timeseries.Granularity{ @@ -125,16 +294,17 @@ func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { return ts } -func (as *alerter) startBackfill(ctx context.Context) { +func (as *alerter) startBackfill(ctx context.Context) error { now := time.Now() since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays)) log.Debug().Time("since", since).Msg("starting stats backfill") count, err := as.backfill(ctx, since, now) if err != nil { - log.Error().Err(err).Msg("backfill failed") - return + return fmt.Errorf("backfill failed: %w", err) } log.Debug().Int("callsCount", count).Str("in", time.Now().Sub(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished") + + return nil } func (as *alerter) score(ctx context.Context, now time.Time) { diff --git a/pkg/gordio/alerting/simulate.go b/pkg/gordio/alerting/simulate.go index f8bf15f..ccf7cdc 100644 --- a/pkg/gordio/alerting/simulate.go +++ b/pkg/gordio/alerting/simulate.go @@ -18,8 +18,7 @@ import ( "github.com/rs/zerolog/log" ) -// A Simulation simulates what happens to the alerter during a specified time -// period using past data from the database. +// A Simulation simulates what happens to the alerter during a specified time period using past data from the database. type Simulation struct { // normal Alerting config config.Alerting diff --git a/pkg/gordio/alerting/stats.go b/pkg/gordio/alerting/stats.go index 2c69dec..0efdcd3 100644 --- a/pkg/gordio/alerting/stats.go +++ b/pkg/gordio/alerting/stats.go @@ -3,16 +3,15 @@ package alerting import ( _ "embed" "errors" - "fmt" "html/template" "net/http" - "strconv" "time" "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/database" + "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/trending" @@ -29,12 +28,7 @@ type stats interface { var ( funcMap = template.FuncMap{ - "f": func(v float64, places ...int) string { - if len(places) > 0 { - return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v) - } - return fmt.Sprintf("%.4f", v) - }, + "f": common.FmtFloat, "dict": func(values ...interface{}) (map[string]interface{}, error) { if len(values)%2 != 0 { return nil, errors.New("invalid dict call") @@ -66,6 +60,7 @@ var ( func (as *alerter) PrivateRoutes(r chi.Router) { r.Get("/tgstats", as.tgStatsHandler) r.Post("/tgstats", as.simulateHandler) + r.Get("/testnotify", as.testNotifyHandler) } func (as *noopAlerter) PrivateRoutes(r chi.Router) {} diff --git a/pkg/gordio/config/config.go b/pkg/gordio/config/config.go index 3ba54d3..9abfe55 100644 --- a/pkg/gordio/config/config.go +++ b/pkg/gordio/config/config.go @@ -21,6 +21,7 @@ type Config struct { Listen string `yaml:"listen"` Public bool `yaml:"public"` RateLimit RateLimit `yaml:"rateLimit"` + Notify Notify `yaml:"notify"` configPath string } @@ -53,10 +54,27 @@ type RateLimit struct { } type Alerting struct { - Enable bool `yaml:"enable"` - LookbackDays uint `yaml:"lookbackDays"` - HalfLife jsontime.Duration `yaml:"halfLife"` - Recent jsontime.Duration `yaml:"recent"` + Enable bool `yaml:"enable"` + LookbackDays uint `yaml:"lookbackDays"` + HalfLife jsontime.Duration `yaml:"halfLife"` + Recent jsontime.Duration `yaml:"recent"` + AlertThreshold float64 `yaml:"alertThreshold"` + Renotify *jsontime.Duration `yaml:"renotify,omitempty"` +} + +type Notify []NotifyService + +type NotifyService struct { + Provider string `json:"provider"` + Config map[string]interface{} `json:"config"` +} + +func (n *NotifyService) GetS(k, defaultVal string) string { + if v, has := n.Config[k].(string); has { + return v + } + + return defaultVal } func (rl *RateLimit) Verify() bool { diff --git a/pkg/gordio/database/querier.go b/pkg/gordio/database/querier.go index 50ffb6b..9fbbd17 100644 --- a/pkg/gordio/database/querier.go +++ b/pkg/gordio/database/querier.go @@ -20,6 +20,7 @@ type Querier interface { DeleteUser(ctx context.Context, username string) error GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetDatabaseSize(ctx context.Context) (string, error) + GetSystemName(ctx context.Context, systemID int) (string, error) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) diff --git a/pkg/gordio/database/talkgroups.sql.go b/pkg/gordio/database/talkgroups.sql.go index c5b3fa4..2d2c5ff 100644 --- a/pkg/gordio/database/talkgroups.sql.go +++ b/pkg/gordio/database/talkgroups.sql.go @@ -19,6 +19,17 @@ func (q *Queries) BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []str return err } +const getSystemName = `-- name: GetSystemName :one +SELECT name FROM systems WHERE id = $1 +` + +func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, error) { + row := q.db.QueryRow(ctx, getSystemName, systemID) + var name string + err := row.Scan(&name) + return name, err +} + const getTalkgroup = `-- name: GetTalkgroup :one SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups WHERE id = systg2id($1, $2) diff --git a/pkg/gordio/notify/notify.go b/pkg/gordio/notify/notify.go new file mode 100644 index 0000000..e073ac7 --- /dev/null +++ b/pkg/gordio/notify/notify.go @@ -0,0 +1,89 @@ +package notify + +import ( + "fmt" + stdhttp "net/http" + "time" + + "dynatron.me/x/stillbox/pkg/gordio/config" + + "github.com/nikoksr/notify" + "github.com/nikoksr/notify/service/http" +) + +type Notifier interface { + notify.Notifier +} + +type notifier struct { + *notify.Notify + cfg []config.NotifyService +} + +func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(string, string) any { + icon := cfg.GetS("icon", "🚨") + url := cfg.GetS("messageURL", "") + + type Attachment struct { + Title string `json:"title"` + Text string `json:"text"` + Fallback string `json:"fallback"` + Footer string `json:"footer"` + TitleLink string `json:"title_link"` + Timestamp int64 `json:"ts"` + } + return func(subject, message string) any { + m := struct { + Username string `json:"username"` + Attachments []Attachment `json:"attachments"` + IconEmoji string `json:"icon_emoji"` + }{ + Username: "Stillbox", + Attachments: []Attachment{ + { + Title: subject, + Text: message, + TitleLink: url, + Timestamp: time.Now().Unix(), + }, + }, + IconEmoji: icon, + } + + return m + } +} + +func (n *notifier) addService(cfg config.NotifyService) error { + switch cfg.Provider { + case "slackwebhook": + hs := http.New() + hs.AddReceivers(&http.Webhook{ + ContentType: "application/json", + Header: make(stdhttp.Header), + Method: stdhttp.MethodPost, + URL: cfg.GetS("webhookURL", ""), + BuildPayload: n.buildSlackWebhookPayload(cfg), + }) + n.UseServices(hs) + default: + return fmt.Errorf("unknown provider '%s'", cfg.Provider) + } + + return nil +} + +func New(cfg config.Notify) (Notifier, error) { + n := ¬ifier{ + Notify: notify.NewWithServices(), + } + + for _, s := range cfg { + err := n.addService(s) + if err != nil { + return nil, err + } + } + + return n, nil +} diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 27b725b..444af30 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -11,6 +11,7 @@ import ( "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/pkg/gordio/nexus" + "dynatron.me/x/stillbox/pkg/gordio/notify" "dynatron.me/x/stillbox/pkg/gordio/sinks" "dynatron.me/x/stillbox/pkg/gordio/sources" "github.com/go-chi/chi/v5" @@ -22,16 +23,17 @@ import ( const shutdownTimeout = 5 * time.Second type Server struct { - auth *auth.Auth - conf *config.Config - db *database.DB - r *chi.Mux - sources sources.Sources - sinks sinks.Sinks - nex *nexus.Nexus - logger *Logger - alerter alerting.Alerter - hup chan os.Signal + auth *auth.Auth + conf *config.Config + db *database.DB + r *chi.Mux + sources sources.Sources + sinks sinks.Sinks + nex *nexus.Nexus + logger *Logger + alerter alerting.Alerter + notifier notify.Notifier + hup chan os.Signal } func New(ctx context.Context, cfg *config.Config) (*Server, error) { @@ -48,15 +50,23 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { ctx = database.CtxWithDB(ctx, db) r := chi.NewRouter() + authenticator := auth.NewAuthenticator(cfg.Auth) + + notifier, err := notify.New(cfg.Notify) + if err != nil { + return nil, err + } + srv := &Server{ - auth: authenticator, - conf: cfg, - db: db, - r: r, - nex: nexus.New(), - logger: logger, - alerter: alerting.New(cfg.Alerting), + auth: authenticator, + conf: cfg, + db: db, + r: r, + nex: nexus.New(), + logger: logger, + alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)), + notifier: notifier, } srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) diff --git a/sql/postgres/queries/talkgroups.sql b/sql/postgres/queries/talkgroups.sql index fe902e9..18f6dfb 100644 --- a/sql/postgres/queries/talkgroups.sql +++ b/sql/postgres/queries/talkgroups.sql @@ -50,3 +50,6 @@ TRUE learned FROM talkgroups_learned tgl JOIN systems sys ON tgl.system_id = sys.id WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE; + +-- name: GetSystemName :one +SELECT name FROM systems WHERE id = sqlc.arg(system_id);