Notify and alerting

This commit is contained in:
Daniel 2024-10-31 00:10:53 -04:00
parent 26846e02bd
commit 0d793cb31a
13 changed files with 357 additions and 42 deletions

View file

@ -29,3 +29,9 @@ alerting:
lookbackDays: 7 lookbackDays: 7
halfLife: 30m halfLife: 30m
recent: 2h recent: 2h
alertThreshold: 0.5
renotify: 30m
notify:
- provider: slackwebhook
config:
webhookURL: "http://somewhere"

1
go.mod
View file

@ -50,6 +50,7 @@ require (
github.com/lestrrat-go/option v1.0.1 // indirect github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nikoksr/notify v1.0.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect github.com/segmentio/asm v1.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect

2
go.sum
View file

@ -105,6 +105,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nikoksr/notify v1.0.1 h1:HkUi4YHASwo3N8UEtDz9GRyEuGyX2Qwe9C6qKK24TYo=
github.com/nikoksr/notify v1.0.1/go.mod h1:w9zFImNfVM7l/gkKFAiyJITKzdC1GH458t4XqMkasZA=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=

View file

@ -1,6 +1,9 @@
package common package common
import ( import (
"fmt"
"strconv"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -42,3 +45,10 @@ func PtrOrNull[T comparable](val T) *T {
return &val return &val
} }
func FmtFloat(v float64, places ...int) string {
if len(places) > 0 {
return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v)
}
return fmt.Sprintf("%.4f", v)
}

View file

@ -1,25 +1,34 @@
package alerting package alerting
import ( import (
"bytes"
"context" "context"
"fmt"
"net/http"
"sort" "sort"
"strconv"
"sync" "sync"
"text/template"
"time" "time"
cl "dynatron.me/x/stillbox/pkg/calls" cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks" "dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/internal/timeseries" "dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
const ( const (
ScoreThreshold = -1 ScoreThreshold = -1
CountThreshold = 1.0 CountThreshold = 1.0
NotificationSubject = "Stillbox Alert"
DefaultRenotify = 30 * time.Minute
alerterTickInterval = time.Minute alerterTickInterval = time.Minute
) )
@ -34,12 +43,15 @@ type Alerter interface {
type alerter struct { type alerter struct {
sync.RWMutex sync.RWMutex
clock timeseries.Clock clock timeseries.Clock
cfg config.Alerting cfg config.Alerting
scorer trending.Scorer[cl.Talkgroup] scorer trending.Scorer[cl.Talkgroup]
scores trending.Scores[cl.Talkgroup] scores trending.Scores[cl.Talkgroup]
lastScore time.Time lastScore time.Time
sim *Simulation sim *Simulation
notifyCache map[cl.Talkgroup]time.Time
renotify time.Duration
notifier notify.Notifier
} }
type offsetClock time.Duration type offsetClock time.Duration
@ -66,6 +78,13 @@ func WithClock(clock timeseries.Clock) AlertOption {
} }
} }
// WithNotifier sets the notifier
func WithNotifier(n notify.Notifier) AlertOption {
return func(as *alerter) {
as.notifier = n
}
}
// New creates a new Alerter using the provided configuration. // New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, opts ...AlertOption) Alerter { func New(cfg config.Alerting, opts ...AlertOption) Alerter {
if !cfg.Enable { if !cfg.Enable {
@ -73,8 +92,14 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter {
} }
as := &alerter{ as := &alerter{
cfg: cfg, cfg: cfg,
clock: timeseries.DefaultClock, notifyCache: make(map[cl.Talkgroup]time.Time),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
}
if cfg.Renotify != nil {
as.renotify = cfg.Renotify.Duration()
} }
for _, opt := range opts { for _, opt := range opts {
@ -105,6 +130,11 @@ func (as *alerter) Go(ctx context.Context) {
select { select {
case now := <-ticker.C: case now := <-ticker.C:
as.score(ctx, now) as.score(ctx, now)
err := as.notify(ctx)
if err != nil {
log.Error().Err(err).Msg("notify")
}
as.cleanCache()
case <-ctx.Done(): case <-ctx.Done():
ticker.Stop() ticker.Stop()
return return
@ -113,6 +143,145 @@ func (as *alerter) Go(ctx context.Context) {
} }
const notificationTemplStr = `{{ range . -}}
{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }})
{{ end }}`
var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr))
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
as.RLock()
defer as.RUnlock()
ns := make([]notification, 0, len(as.scores))
ctx := r.Context()
for _, s := range as.scores {
n, err := makeNotification(ctx, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ns = append(ns, n)
}
err := as.sendNotification(ctx, ns)
if err != nil {
log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("Sent"))
}
// notify iterates the scores and sends out any necessary notifications
func (as *alerter) notify(ctx context.Context) error {
if as.notifier == nil {
return nil
}
now := time.Now()
as.Lock()
defer as.Unlock()
var notifications []notification
for _, s := range as.scores {
if s.Score > as.cfg.AlertThreshold {
if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify {
as.notifyCache[s.ID] = time.Now()
n, err := makeNotification(ctx, s)
if err != nil {
return err
}
notifications = append(notifications, n)
}
}
}
if len(notifications) > 0 {
return as.sendNotification(ctx, notifications)
}
return nil
}
type notification struct {
TGName string
Score trending.Score[cl.Talkgroup]
}
// sendNotification renders and sends the notification.
func (as *alerter) sendNotification(ctx context.Context, n []notification) error {
msgBuffer := new(bytes.Buffer)
err := notificationTemplate.Execute(msgBuffer, n)
if err != nil {
return fmt.Errorf("notification template render: %w", err)
}
log.Debug().Str("msg", msgBuffer.String()).Msg("notifying")
return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String())
}
// makeNotification creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) {
d := notification{
Score: tg,
}
db := database.FromCtx(ctx)
tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup))
switch err {
case nil:
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(tg.ID.System))
}
if tgRecord.Name != nil {
d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup))
}
case pgx.ErrNoRows:
system, err := db.GetSystemName(ctx, int(tg.ID.System))
switch err {
case nil:
d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup))
case pgx.ErrNoRows:
d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup))
default:
return d, fmt.Errorf("sendNotification get system: %w", err)
}
default:
return d, fmt.Errorf("sendNotification get talkgroup: %w", err)
}
return d, nil
}
// cleanCache clears the cache of aged-out entries
func (as *alerter) cleanCache() {
if as.notifier == nil {
return
}
now := time.Now()
as.Lock()
defer as.Unlock()
for k, t := range as.notifyCache {
if now.Sub(t) > as.renotify {
delete(as.notifyCache, k)
}
}
}
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries { func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities( ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{ []timeseries.Granularity{
@ -125,16 +294,17 @@ func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
return ts return ts
} }
func (as *alerter) startBackfill(ctx context.Context) { func (as *alerter) startBackfill(ctx context.Context) error {
now := time.Now() now := time.Now()
since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays)) since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays))
log.Debug().Time("since", since).Msg("starting stats backfill") log.Debug().Time("since", since).Msg("starting stats backfill")
count, err := as.backfill(ctx, since, now) count, err := as.backfill(ctx, since, now)
if err != nil { if err != nil {
log.Error().Err(err).Msg("backfill failed") return fmt.Errorf("backfill failed: %w", err)
return
} }
log.Debug().Int("callsCount", count).Str("in", time.Now().Sub(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished") log.Debug().Int("callsCount", count).Str("in", time.Now().Sub(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished")
return nil
} }
func (as *alerter) score(ctx context.Context, now time.Time) { func (as *alerter) score(ctx context.Context, now time.Time) {

View file

@ -18,8 +18,7 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
// A Simulation simulates what happens to the alerter during a specified time // A Simulation simulates what happens to the alerter during a specified time period using past data from the database.
// period using past data from the database.
type Simulation struct { type Simulation struct {
// normal Alerting config // normal Alerting config
config.Alerting config.Alerting

View file

@ -3,16 +3,15 @@ package alerting
import ( import (
_ "embed" _ "embed"
"errors" "errors"
"fmt"
"html/template" "html/template"
"net/http" "net/http"
"strconv"
"time" "time"
"dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime" "dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending" "dynatron.me/x/stillbox/internal/trending"
@ -29,12 +28,7 @@ type stats interface {
var ( var (
funcMap = template.FuncMap{ funcMap = template.FuncMap{
"f": func(v float64, places ...int) string { "f": common.FmtFloat,
if len(places) > 0 {
return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v)
}
return fmt.Sprintf("%.4f", v)
},
"dict": func(values ...interface{}) (map[string]interface{}, error) { "dict": func(values ...interface{}) (map[string]interface{}, error) {
if len(values)%2 != 0 { if len(values)%2 != 0 {
return nil, errors.New("invalid dict call") return nil, errors.New("invalid dict call")
@ -66,6 +60,7 @@ var (
func (as *alerter) PrivateRoutes(r chi.Router) { func (as *alerter) PrivateRoutes(r chi.Router) {
r.Get("/tgstats", as.tgStatsHandler) r.Get("/tgstats", as.tgStatsHandler)
r.Post("/tgstats", as.simulateHandler) r.Post("/tgstats", as.simulateHandler)
r.Get("/testnotify", as.testNotifyHandler)
} }
func (as *noopAlerter) PrivateRoutes(r chi.Router) {} func (as *noopAlerter) PrivateRoutes(r chi.Router) {}

View file

@ -21,6 +21,7 @@ type Config struct {
Listen string `yaml:"listen"` Listen string `yaml:"listen"`
Public bool `yaml:"public"` Public bool `yaml:"public"`
RateLimit RateLimit `yaml:"rateLimit"` RateLimit RateLimit `yaml:"rateLimit"`
Notify Notify `yaml:"notify"`
configPath string configPath string
} }
@ -53,10 +54,27 @@ type RateLimit struct {
} }
type Alerting struct { type Alerting struct {
Enable bool `yaml:"enable"` Enable bool `yaml:"enable"`
LookbackDays uint `yaml:"lookbackDays"` LookbackDays uint `yaml:"lookbackDays"`
HalfLife jsontime.Duration `yaml:"halfLife"` HalfLife jsontime.Duration `yaml:"halfLife"`
Recent jsontime.Duration `yaml:"recent"` Recent jsontime.Duration `yaml:"recent"`
AlertThreshold float64 `yaml:"alertThreshold"`
Renotify *jsontime.Duration `yaml:"renotify,omitempty"`
}
type Notify []NotifyService
type NotifyService struct {
Provider string `json:"provider"`
Config map[string]interface{} `json:"config"`
}
func (n *NotifyService) GetS(k, defaultVal string) string {
if v, has := n.Config[k].(string); has {
return v
}
return defaultVal
} }
func (rl *RateLimit) Verify() bool { func (rl *RateLimit) Verify() bool {

View file

@ -20,6 +20,7 @@ type Querier interface {
DeleteUser(ctx context.Context, username string) error DeleteUser(ctx context.Context, username string) error
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error) GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetDatabaseSize(ctx context.Context) (string, error) GetDatabaseSize(ctx context.Context) (string, error)
GetSystemName(ctx context.Context, systemID int) (string, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error)
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)

View file

@ -19,6 +19,17 @@ func (q *Queries) BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []str
return err return err
} }
const getSystemName = `-- name: GetSystemName :one
SELECT name FROM systems WHERE id = $1
`
func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, error) {
row := q.db.QueryRow(ctx, getSystemName, systemID)
var name string
err := row.Scan(&name)
return name, err
}
const getTalkgroup = `-- name: GetTalkgroup :one const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
WHERE id = systg2id($1, $2) WHERE id = systg2id($1, $2)

View file

@ -0,0 +1,89 @@
package notify
import (
"fmt"
stdhttp "net/http"
"time"
"dynatron.me/x/stillbox/pkg/gordio/config"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/http"
)
type Notifier interface {
notify.Notifier
}
type notifier struct {
*notify.Notify
cfg []config.NotifyService
}
func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(string, string) any {
icon := cfg.GetS("icon", "🚨")
url := cfg.GetS("messageURL", "")
type Attachment struct {
Title string `json:"title"`
Text string `json:"text"`
Fallback string `json:"fallback"`
Footer string `json:"footer"`
TitleLink string `json:"title_link"`
Timestamp int64 `json:"ts"`
}
return func(subject, message string) any {
m := struct {
Username string `json:"username"`
Attachments []Attachment `json:"attachments"`
IconEmoji string `json:"icon_emoji"`
}{
Username: "Stillbox",
Attachments: []Attachment{
{
Title: subject,
Text: message,
TitleLink: url,
Timestamp: time.Now().Unix(),
},
},
IconEmoji: icon,
}
return m
}
}
func (n *notifier) addService(cfg config.NotifyService) error {
switch cfg.Provider {
case "slackwebhook":
hs := http.New()
hs.AddReceivers(&http.Webhook{
ContentType: "application/json",
Header: make(stdhttp.Header),
Method: stdhttp.MethodPost,
URL: cfg.GetS("webhookURL", ""),
BuildPayload: n.buildSlackWebhookPayload(cfg),
})
n.UseServices(hs)
default:
return fmt.Errorf("unknown provider '%s'", cfg.Provider)
}
return nil
}
func New(cfg config.Notify) (Notifier, error) {
n := &notifier{
Notify: notify.NewWithServices(),
}
for _, s := range cfg {
err := n.addService(s)
if err != nil {
return nil, err
}
}
return n, nil
}

View file

@ -11,6 +11,7 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database" "dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/nexus" "dynatron.me/x/stillbox/pkg/gordio/nexus"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks" "dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/pkg/gordio/sources" "dynatron.me/x/stillbox/pkg/gordio/sources"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -22,16 +23,17 @@ import (
const shutdownTimeout = 5 * time.Second const shutdownTimeout = 5 * time.Second
type Server struct { type Server struct {
auth *auth.Auth auth *auth.Auth
conf *config.Config conf *config.Config
db *database.DB db *database.DB
r *chi.Mux r *chi.Mux
sources sources.Sources sources sources.Sources
sinks sinks.Sinks sinks sinks.Sinks
nex *nexus.Nexus nex *nexus.Nexus
logger *Logger logger *Logger
alerter alerting.Alerter alerter alerting.Alerter
hup chan os.Signal notifier notify.Notifier
hup chan os.Signal
} }
func New(ctx context.Context, cfg *config.Config) (*Server, error) { func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -48,15 +50,23 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
ctx = database.CtxWithDB(ctx, db) ctx = database.CtxWithDB(ctx, db)
r := chi.NewRouter() r := chi.NewRouter()
authenticator := auth.NewAuthenticator(cfg.Auth) authenticator := auth.NewAuthenticator(cfg.Auth)
notifier, err := notify.New(cfg.Notify)
if err != nil {
return nil, err
}
srv := &Server{ srv := &Server{
auth: authenticator, auth: authenticator,
conf: cfg, conf: cfg,
db: db, db: db,
r: r, r: r,
nex: nexus.New(), nex: nexus.New(),
logger: logger, logger: logger,
alerter: alerting.New(cfg.Alerting), alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)),
notifier: notifier,
} }
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)

View file

@ -50,3 +50,6 @@ TRUE learned
FROM talkgroups_learned tgl FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE; WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE;
-- name: GetSystemName :one
SELECT name FROM systems WHERE id = sqlc.arg(system_id);