From bdee792d75d2d4e0999f6f37f8f86be5986341fe Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Thu, 7 Nov 2024 23:17:16 -0500 Subject: [PATCH] #28 per backend notification templates --- config.sample.yaml | 6 ++ internal/common/common.go | 10 --- internal/common/template.go | 48 ++++++++++++++ pkg/alerting/alert/alert.go | 81 ++++++++++++++++++++++++ pkg/alerting/alerting.go | 112 ++++----------------------------- pkg/alerting/stats.go | 37 ++--------- pkg/config/config.go | 6 +- pkg/notify/notify.go | 121 ++++++++++++++++++++++++++++++++---- pkg/talkgroups/talkgroup.go | 5 ++ 9 files changed, 269 insertions(+), 157 deletions(-) create mode 100644 internal/common/template.go create mode 100644 pkg/alerting/alert/alert.go diff --git a/config.sample.yaml b/config.sample.yaml index cee3a9f..5a71064 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -33,5 +33,11 @@ alerting: renotify: 30m notify: - provider: slackwebhook +# subjectTemplate: "Stillbox Alert ({{ highest . }})" +# bodyTemplate: | +# {{ range . -}} +# {{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }}/{{ .Score.Count }} recent calls) +# +# {{ end -}} config: webhookURL: "http://somewhere" diff --git a/internal/common/common.go b/internal/common/common.go index ca3b18c..01b6971 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -1,9 +1,6 @@ package common import ( - "fmt" - "strconv" - "github.com/spf13/cobra" ) @@ -47,10 +44,3 @@ func PtrOrNull[T comparable](val T) *T { return &val } - -func FmtFloat(v float64, places ...int) string { - if len(places) > 0 { - return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v) - } - return fmt.Sprintf("%.4f", v) -} diff --git a/internal/common/template.go b/internal/common/template.go new file mode 100644 index 0000000..f4724c1 --- /dev/null +++ b/internal/common/template.go @@ -0,0 +1,48 @@ +package common + +import ( + "errors" + "fmt" + "strconv" + "text/template" + "time" + + "dynatron.me/x/stillbox/internal/jsontime" +) + +var ( + FuncMap = template.FuncMap{ + "f": fmtFloat, + "dict": func(values ...interface{}) (map[string]interface{}, error) { + if len(values)%2 != 0 { + return nil, errors.New("invalid dict call") + } + dict := make(map[string]interface{}, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, ok := values[i].(string) + if !ok { + return nil, errors.New("dict keys must be strings") + } + dict[key] = values[i+1] + } + return dict, nil + }, + "formTime": func(t jsontime.Time) string { + return time.Time(t).Format("2006-01-02T15:04") + }, + "ago": func(s string) (string, error) { + d, err := time.ParseDuration(s) + if err != nil { + return "", err + } + return time.Now().Add(-d).Format("2006-01-02T15:04"), nil + }, + } +) + +func fmtFloat(v float64, places ...int) string { + if len(places) > 0 { + return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v) + } + return fmt.Sprintf("%.4f", v) +} diff --git a/pkg/alerting/alert/alert.go b/pkg/alerting/alert/alert.go new file mode 100644 index 0000000..c96b038 --- /dev/null +++ b/pkg/alerting/alert/alert.go @@ -0,0 +1,81 @@ +package alert + +import ( + "context" + "fmt" + "strconv" + "time" + + "dynatron.me/x/stillbox/internal/trending" + "dynatron.me/x/stillbox/pkg/database" + "dynatron.me/x/stillbox/pkg/talkgroups" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +type Alert struct { + ID uuid.UUID + Timestamp time.Time + TGName string + Score trending.Score[talkgroups.ID] + OrigScore float64 + Weight float32 + Suppressed bool +} + +func (a *Alert) ToAddAlertParams() database.AddAlertParams { + f32score := float32(a.Score.Score) + f32origscore := float32(a.OrigScore) + + var origScore *float32 + if a.Score.Score != a.OrigScore { + origScore = &f32origscore + } + + return database.AddAlertParams{ + ID: a.ID, + Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, + PackedTg: a.Score.ID.Pack(), + Weight: &a.Weight, + Score: &f32score, + OrigScore: origScore, + Notified: !a.Suppressed, + } +} + +// makeAlert creates a notification for later rendering by the template. +// It takes a talkgroup Score as input. +func Make(ctx context.Context, store talkgroups.Store, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) { + d := Alert{ + ID: uuid.New(), + Score: score, + Timestamp: time.Now(), + Weight: 1.0, + OrigScore: origScore, + } + + tgRecord, err := store.TG(ctx, score.ID) + switch err { + case nil: + d.Weight = tgRecord.Talkgroup.Weight + if tgRecord.System.Name == "" { + tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) + } + + if tgRecord.Talkgroup.Name != nil { + d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup) + } else { + d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) + } + default: + system, has := store.SystemName(ctx, int(score.ID.System)) + if has { + d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) + } else { + d.TGName = fmt.Sprintf("%d:%d", int(score.ID.System), int(score.ID.Talkgroup)) + } + } + + return d, nil +} diff --git a/pkg/alerting/alerting.go b/pkg/alerting/alerting.go index 3222a3e..db4c1ff 100644 --- a/pkg/alerting/alerting.go +++ b/pkg/alerting/alerting.go @@ -1,16 +1,14 @@ package alerting import ( - "bytes" "context" "fmt" "net/http" "sort" - "strconv" "sync" - "text/template" "time" + "dynatron.me/x/stillbox/pkg/alerting/alert" "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/database" @@ -21,8 +19,6 @@ import ( "dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/trending" - "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog/log" ) @@ -51,7 +47,7 @@ type alerter struct { scores trending.Scores[talkgroups.ID] lastScore time.Time sim *Simulation - alertCache map[talkgroups.ID]Alert + alertCache map[talkgroups.ID]alert.Alert renotify time.Duration notifier notify.Notifier tgCache talkgroups.Store @@ -96,7 +92,7 @@ func New(cfg config.Alerting, tgCache talkgroups.Store, opts ...AlertOption) Ale as := &alerter{ cfg: cfg, - alertCache: make(map[talkgroups.ID]Alert), + alertCache: make(map[talkgroups.ID]alert.Alert), clock: timeseries.DefaultClock, renotify: DefaultRenotify, tgCache: tgCache, @@ -150,22 +146,18 @@ func (as *alerter) Go(ctx context.Context) { } -const notificationTemplStr = `{{ range . -}} -{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }}/{{ .Score.Count }} recent calls) - -{{ end -}}` - -var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr)) - -func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Alert, error) { +func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]alert.Alert, error) { err := as.tgCache.Hint(ctx, as.scoredTGs()) if err != nil { return nil, fmt.Errorf("prime TG cache: %w", err) } + as.Lock() + defer as.Unlock() + db := database.FromCtx(ctx) - var notifications []Alert + var notifications []alert.Alert for _, s := range as.scores { origScore := s.Score tgr, err := as.tgCache.TG(ctx, s.ID) @@ -176,7 +168,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al if s.Score > as.cfg.AlertThreshold || testMode { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { s.Score *= as.tgCache.Weight(ctx, s.ID, now) - a, err := as.makeAlert(ctx, s, origScore) + a, err := alert.Make(ctx, as.tgCache, s, origScore) if err != nil { return nil, fmt.Errorf("makeAlert: %w", err) } @@ -206,7 +198,7 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al } func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { - alerts := make([]Alert, 0, len(as.scores)) + alerts := make([]alert.Alert, 0, len(as.scores)) ctx := r.Context() alerts, err := as.eval(ctx, time.Now(), true) @@ -216,7 +208,7 @@ func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) { return } - err = as.sendNotification(ctx, alerts) + err = as.notifier.Send(ctx, alerts) if err != nil { log.Error().Err(err).Msg("test notification send") http.Error(w, err.Error(), http.StatusInternalServerError) @@ -258,92 +250,12 @@ func (as *alerter) notify(ctx context.Context) error { } if len(notifications) > 0 { - return as.sendNotification(ctx, notifications) + return as.notifier.Send(ctx, notifications) } return nil } -type Alert struct { - ID uuid.UUID - Timestamp time.Time - TGName string - Score trending.Score[talkgroups.ID] - OrigScore float64 - Weight float32 - Suppressed bool -} - -func (a *Alert) ToAddAlertParams() database.AddAlertParams { - f32score := float32(a.Score.Score) - f32origscore := float32(a.OrigScore) - - var origScore *float32 - if a.Score.Score != a.OrigScore { - origScore = &f32origscore - } - - return database.AddAlertParams{ - ID: a.ID, - Time: pgtype.Timestamptz{Time: a.Timestamp, Valid: true}, - PackedTg: a.Score.ID.Pack(), - Weight: &a.Weight, - Score: &f32score, - OrigScore: origScore, - Notified: !a.Suppressed, - } -} - -// sendNotification renders and sends the notification. -func (as *alerter) sendNotification(ctx context.Context, n []Alert) error { - msgBuffer := new(bytes.Buffer) - - err := notificationTemplate.Execute(msgBuffer, n) - if err != nil { - return fmt.Errorf("notification template render: %w", err) - } - - log.Debug().Str("msg", msgBuffer.String()).Msg("notifying") - - return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String()) -} - -// makeAlert creates a notification for later rendering by the template. -// It takes a talkgroup Score as input. -func (as *alerter) makeAlert(ctx context.Context, score trending.Score[talkgroups.ID], origScore float64) (Alert, error) { - d := Alert{ - ID: uuid.New(), - Score: score, - Timestamp: time.Now(), - Weight: 1.0, - OrigScore: origScore, - } - - tgRecord, err := as.tgCache.TG(ctx, score.ID) - switch err { - case nil: - d.Weight = tgRecord.Talkgroup.Weight - if tgRecord.System.Name == "" { - tgRecord.System.Name = strconv.Itoa(int(score.ID.System)) - } - - if tgRecord.Talkgroup.Name != nil { - d.TGName = fmt.Sprintf("%s %s (%d)", tgRecord.System.Name, *tgRecord.Talkgroup.Name, score.ID.Talkgroup) - } else { - d.TGName = fmt.Sprintf("%s:%d", tgRecord.System.Name, int(score.ID.Talkgroup)) - } - default: - system, has := as.tgCache.SystemName(ctx, int(score.ID.System)) - if has { - d.TGName = fmt.Sprintf("%s:%d", system, int(score.ID.Talkgroup)) - } else { - d.TGName = fmt.Sprintf("%d:%d", int(score.ID.System), int(score.ID.Talkgroup)) - } - } - - return d, nil -} - // cleanCache clears the cache of aged-out entries func (as *alerter) cleanCache() { if as.notifier == nil { diff --git a/pkg/alerting/stats.go b/pkg/alerting/stats.go index 0d5c3be..b724c01 100644 --- a/pkg/alerting/stats.go +++ b/pkg/alerting/stats.go @@ -2,7 +2,6 @@ package alerting import ( _ "embed" - "errors" "html/template" "net/http" "time" @@ -12,7 +11,6 @@ import ( "dynatron.me/x/stillbox/pkg/talkgroups" "dynatron.me/x/stillbox/internal/common" - "dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/trending" "github.com/go-chi/chi/v5" @@ -22,41 +20,14 @@ import ( //go:embed stats.html var statsTemplateFile string +var ( + statTmpl = template.Must(template.New("stats").Funcs(common.FuncMap).Parse(statsTemplateFile)) +) + type stats interface { PrivateRoutes(chi.Router) } -var ( - funcMap = template.FuncMap{ - "f": common.FmtFloat, - "dict": func(values ...interface{}) (map[string]interface{}, error) { - if len(values)%2 != 0 { - return nil, errors.New("invalid dict call") - } - dict := make(map[string]interface{}, len(values)/2) - for i := 0; i < len(values); i += 2 { - key, ok := values[i].(string) - if !ok { - return nil, errors.New("dict keys must be strings") - } - dict[key] = values[i+1] - } - return dict, nil - }, - "formTime": func(t jsontime.Time) string { - return time.Time(t).Format("2006-01-02T15:04") - }, - "ago": func(s string) (string, error) { - d, err := time.ParseDuration(s) - if err != nil { - return "", err - } - return time.Now().Add(-d).Format("2006-01-02T15:04"), nil - }, - } - statTmpl = template.Must(template.New("stats").Funcs(funcMap).Parse(statsTemplateFile)) -) - func (as *alerter) PrivateRoutes(r chi.Router) { r.Get("/tgstats", as.tgStatsHandler) r.Post("/tgstats", as.simulateHandler) diff --git a/pkg/config/config.go b/pkg/config/config.go index 757842b..6a848f9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -65,8 +65,10 @@ type Alerting struct { type Notify []NotifyService type NotifyService struct { - Provider string `json:"provider"` - Config map[string]interface{} `json:"config"` + Provider string `yaml:"provider" json:"provider"` + SubjectTemplate *string `yaml:"subjectTemplate" json:"subjectTemplate"` + BodyTemplate *string `yaml:"bodyTemplate" json:"bodyTemplate"` + Config map[string]interface{} `yaml:"config" json:"config"` } func (n *NotifyService) GetS(k, defaultVal string) string { diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index cf9716e..19b0de3 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -1,10 +1,15 @@ package notify import ( + "bytes" + "context" "fmt" stdhttp "net/http" + "text/template" "time" + "dynatron.me/x/stillbox/internal/common" + "dynatron.me/x/stillbox/pkg/alerting/alert" "dynatron.me/x/stillbox/pkg/config" "github.com/go-viper/mapstructure/v2" @@ -13,15 +18,70 @@ import ( ) type Notifier interface { - notify.Notifier + Send(ctx context.Context, alerts []alert.Alert) error +} + +type backend struct { + *notify.Notify + subject *template.Template + body *template.Template } type notifier struct { - *notify.Notify + backends []backend } -func (n *notifier) buildSlackWebhookPayload(cfg *slackWebhookConfig) func(string, string) any { +func highest(a []alert.Alert) string { + if len(a) < 1 { + return "none" + } + top := a[0] + for _, a := range a { + if a.Score.Score > top.Score.Score { + top = a + } + } + + return top.TGName +} + +var alertFm = template.FuncMap{ + "highest": highest, +} + +const defaultBodyTemplStr = `{{ range . -}} +{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }}/{{ .Score.Count }} recent calls) + +{{ end -}}` + +var defaultBodyTemplate = template.Must(template.New("body").Funcs(common.FuncMap).Funcs(alertFm).Parse(defaultBodyTemplStr)) + +var defaultSubjectTemplStr = `Stillbox Alert ({{ highest . }}` +var defaultSubjectTemplate = template.Must(template.New("subject").Funcs(common.FuncMap).Funcs(alertFm).Parse(defaultSubjectTemplStr)) + +// Send renders and sends the Alerts. +func (b *backend) Send(ctx context.Context, alerts []alert.Alert) (err error) { + var subject, body bytes.Buffer + err = b.subject.ExecuteTemplate(&subject, "subject", alerts) + if err != nil { + return err + } + + err = b.body.ExecuteTemplate(&body, "body", alerts) + if err != nil { + return err + } + + err = b.Notify.Send(ctx, subject.String(), body.String()) + if err != nil { + return err + } + + return nil +} + +func buildSlackWebhookPayload(cfg *slackWebhookConfig) func(string, string) any { type Attachment struct { Title string `json:"title"` Text string `json:"text"` @@ -53,12 +113,38 @@ func (n *notifier) buildSlackWebhookPayload(cfg *slackWebhookConfig) func(string } type slackWebhookConfig struct { - WebhookURL string `mapstructure:"webhookURL"` - Icon string `mapstructure:"icon"` - MessageURL string `mapstructure:"messageURL"` + WebhookURL string `mapstructure:"webhookURL"` + Icon string `mapstructure:"icon"` + MessageURL string `mapstructure:"messageURL"` + SubjectTemplate string `mapstructure:"subjectTemplate"` + BodyTemplate string `mapstructure:"bodyTemplate"` } -func (n *notifier) addService(cfg config.NotifyService) error { +func (n *notifier) addService(cfg config.NotifyService) (err error) { + be := backend{} + + switch cfg.SubjectTemplate { + case nil: + be.subject = defaultSubjectTemplate + default: + be.subject, err = template.New("subject").Funcs(common.FuncMap).Funcs(alertFm).Parse(*cfg.SubjectTemplate) + if err != nil { + return err + } + } + + switch cfg.BodyTemplate { + case nil: + be.body = defaultBodyTemplate + default: + be.body, err = template.New("body").Funcs(common.FuncMap).Funcs(alertFm).Parse(*cfg.BodyTemplate) + if err != nil { + return err + } + } + + be.Notify = notify.New() + switch cfg.Provider { case "slackwebhook": swc := &slackWebhookConfig{ @@ -74,20 +160,31 @@ func (n *notifier) addService(cfg config.NotifyService) error { Header: make(stdhttp.Header), Method: stdhttp.MethodPost, URL: swc.WebhookURL, - BuildPayload: n.buildSlackWebhookPayload(swc), + BuildPayload: buildSlackWebhookPayload(swc), }) - n.UseServices(hs) + be.UseServices(hs) default: return fmt.Errorf("unknown provider '%s'", cfg.Provider) } + n.backends = append(n.backends, be) + + return nil +} + +func (n *notifier) Send(ctx context.Context, alerts []alert.Alert) error { + for _, be := range n.backends { + err := be.Send(ctx, alerts) + if err != nil { + return err + } + } + return nil } func New(cfg config.Notify) (Notifier, error) { - n := ¬ifier{ - Notify: notify.NewWithServices(), - } + n := new(notifier) for _, s := range cfg { err := n.addService(s) diff --git a/pkg/talkgroups/talkgroup.go b/pkg/talkgroups/talkgroup.go index cd8f9c9..288e488 100644 --- a/pkg/talkgroups/talkgroup.go +++ b/pkg/talkgroups/talkgroup.go @@ -12,6 +12,11 @@ type Talkgroup struct { Learned bool `json:"learned"` } +type Names struct { + System string + Talkgroup string +} + type ID struct { System uint32 `json:"sys"` Talkgroup uint32 `json:"tg"` -- 2.47.0