Add talkgroup activity alerting #17

Merged
amigan merged 11 commits from alerting into trunk 2024-10-31 00:20:48 -04:00
12 changed files with 356 additions and 40 deletions
Showing only changes of commit ea4698a7b4 - Show all commits

View file

@ -29,3 +29,9 @@ alerting:
lookbackDays: 7
halfLife: 30m
recent: 2h
alertThreshold: 0.5
renotify: 30m
notify:
- provider: slackwebhook
config:
webhookURL: "http://somewhere"

1
go.mod
View file

@ -50,6 +50,7 @@ require (
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nikoksr/notify v1.0.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect

2
go.sum
View file

@ -105,6 +105,8 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nikoksr/notify v1.0.1 h1:HkUi4YHASwo3N8UEtDz9GRyEuGyX2Qwe9C6qKK24TYo=
github.com/nikoksr/notify v1.0.1/go.mod h1:w9zFImNfVM7l/gkKFAiyJITKzdC1GH458t4XqMkasZA=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=

View file

@ -1,6 +1,9 @@
package common
import (
"fmt"
"strconv"
"github.com/spf13/cobra"
)
@ -42,3 +45,10 @@ func PtrOrNull[T comparable](val T) *T {
return &val
}
func FmtFloat(v float64, places ...int) string {
if len(places) > 0 {
return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v)
}
return fmt.Sprintf("%.4f", v)
}

View file

@ -1,25 +1,34 @@
package alerting
import (
"bytes"
"context"
"fmt"
"net/http"
"sort"
"strconv"
"sync"
"text/template"
"time"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
const (
ScoreThreshold = -1
CountThreshold = 1.0
NotificationSubject = "Stillbox Alert"
DefaultRenotify = 30 * time.Minute
alerterTickInterval = time.Minute
)
@ -40,6 +49,9 @@ type alerter struct {
scores trending.Scores[cl.Talkgroup]
lastScore time.Time
sim *Simulation
notifyCache map[cl.Talkgroup]time.Time
renotify time.Duration
notifier notify.Notifier
}
type offsetClock time.Duration
@ -66,6 +78,13 @@ func WithClock(clock timeseries.Clock) AlertOption {
}
}
// WithNotifier sets the notifier
func WithNotifier(n notify.Notifier) AlertOption {
return func(as *alerter) {
as.notifier = n
}
}
// New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, opts ...AlertOption) Alerter {
if !cfg.Enable {
@ -74,7 +93,13 @@ func New(cfg config.Alerting, opts ...AlertOption) Alerter {
as := &alerter{
cfg: cfg,
notifyCache: make(map[cl.Talkgroup]time.Time),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
}
if cfg.Renotify != nil {
as.renotify = cfg.Renotify.Duration()
}
for _, opt := range opts {
@ -105,6 +130,11 @@ func (as *alerter) Go(ctx context.Context) {
select {
case now := <-ticker.C:
as.score(ctx, now)
err := as.notify(ctx)
if err != nil {
log.Error().Err(err).Msg("notify")
}
as.cleanCache()
case <-ctx.Done():
ticker.Stop()
return
@ -113,6 +143,145 @@ func (as *alerter) Go(ctx context.Context) {
}
const notificationTemplStr = `{{ range . -}}
{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }})
{{ end }}`
var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr))
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
as.RLock()
defer as.RUnlock()
ns := make([]notification, 0, len(as.scores))
ctx := r.Context()
for _, s := range as.scores {
n, err := makeNotification(ctx, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ns = append(ns, n)
}
err := as.sendNotification(ctx, ns)
if err != nil {
log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("Sent"))
}
// notify iterates the scores and sends out any necessary notifications
func (as *alerter) notify(ctx context.Context) error {
if as.notifier == nil {
return nil
}
now := time.Now()
as.Lock()
defer as.Unlock()
var notifications []notification
for _, s := range as.scores {
if s.Score > as.cfg.AlertThreshold {
if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify {
as.notifyCache[s.ID] = time.Now()
n, err := makeNotification(ctx, s)
if err != nil {
return err
}
notifications = append(notifications, n)
}
}
}
if len(notifications) > 0 {
return as.sendNotification(ctx, notifications)
}
return nil
}
type notification struct {
TGName string
Score trending.Score[cl.Talkgroup]
}
// sendNotification renders and sends the notification.
func (as *alerter) sendNotification(ctx context.Context, n []notification) error {
msgBuffer := new(bytes.Buffer)
err := notificationTemplate.Execute(msgBuffer, n)
if err != nil {
return fmt.Errorf("notification template render: %w", err)
}
log.Debug().Str("msg", msgBuffer.String()).Msg("notifying")
return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String())
}
// makeNotification creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) {
d := notification{
Score: tg,
}
db := database.FromCtx(ctx)
tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup))
switch err {
case nil:
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(tg.ID.System))
}
if tgRecord.Name != nil {
d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup))
}
case pgx.ErrNoRows:
system, err := db.GetSystemName(ctx, int(tg.ID.System))
switch err {
case nil:
d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup))
case pgx.ErrNoRows:
d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup))
default:
return d, fmt.Errorf("sendNotification get system: %w", err)
}
default:
return d, fmt.Errorf("sendNotification get talkgroup: %w", err)
}
return d, nil
}
// cleanCache clears the cache of aged-out entries
func (as *alerter) cleanCache() {
if as.notifier == nil {
return
}
now := time.Now()
as.Lock()
defer as.Unlock()
for k, t := range as.notifyCache {
if now.Sub(t) > as.renotify {
delete(as.notifyCache, k)
}
}
}
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
@ -125,16 +294,17 @@ func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
return ts
}
func (as *alerter) startBackfill(ctx context.Context) {
func (as *alerter) startBackfill(ctx context.Context) error {
now := time.Now()
since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays))
log.Debug().Time("since", since).Msg("starting stats backfill")
count, err := as.backfill(ctx, since, now)
if err != nil {
log.Error().Err(err).Msg("backfill failed")
return
return fmt.Errorf("backfill failed: %w", err)
}
log.Debug().Int("callsCount", count).Str("in", time.Now().Sub(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished")
return nil
}
func (as *alerter) score(ctx context.Context, now time.Time) {

View file

@ -3,16 +3,15 @@ package alerting
import (
_ "embed"
"errors"
"fmt"
"html/template"
"net/http"
"strconv"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending"
@ -29,12 +28,7 @@ type stats interface {
var (
funcMap = template.FuncMap{
"f": func(v float64, places ...int) string {
if len(places) > 0 {
return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v)
}
return fmt.Sprintf("%.4f", v)
},
"f": common.FmtFloat,
"dict": func(values ...interface{}) (map[string]interface{}, error) {
if len(values)%2 != 0 {
return nil, errors.New("invalid dict call")
@ -66,6 +60,7 @@ var (
func (as *alerter) PrivateRoutes(r chi.Router) {
r.Get("/tgstats", as.tgStatsHandler)
r.Post("/tgstats", as.simulateHandler)
r.Get("/testnotify", as.testNotifyHandler)
}
func (as *noopAlerter) PrivateRoutes(r chi.Router) {}

View file

@ -21,6 +21,7 @@ type Config struct {
Listen string `yaml:"listen"`
Public bool `yaml:"public"`
RateLimit RateLimit `yaml:"rateLimit"`
Notify Notify `yaml:"notify"`
configPath string
}
@ -57,6 +58,23 @@ type Alerting struct {
LookbackDays uint `yaml:"lookbackDays"`
HalfLife jsontime.Duration `yaml:"halfLife"`
Recent jsontime.Duration `yaml:"recent"`
AlertThreshold float64 `yaml:"alertThreshold"`
Renotify *jsontime.Duration `yaml:"renotify,omitempty"`
}
type Notify []NotifyService
type NotifyService struct {
Provider string `json:"provider"`
Config map[string]interface{} `json:"config"`
}
func (n *NotifyService) GetS(k, defaultVal string) string {
if v, has := n.Config[k].(string); has {
return v
}
return defaultVal
}
func (rl *RateLimit) Verify() bool {

View file

@ -20,6 +20,7 @@ type Querier interface {
DeleteUser(ctx context.Context, username string) error
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetDatabaseSize(ctx context.Context) (string, error)
GetSystemName(ctx context.Context, systemID int) (string, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error)
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)

View file

@ -19,6 +19,17 @@ func (q *Queries) BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []str
return err
}
const getSystemName = `-- name: GetSystemName :one
SELECT name FROM systems WHERE id = $1
`
func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, error) {
row := q.db.QueryRow(ctx, getSystemName, systemID)
var name string
err := row.Scan(&name)
return name, err
}
const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
WHERE id = systg2id($1, $2)

View file

@ -0,0 +1,89 @@
package notify
import (
"fmt"
stdhttp "net/http"
"time"
"dynatron.me/x/stillbox/pkg/gordio/config"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/http"
)
type Notifier interface {
notify.Notifier
}
type notifier struct {
*notify.Notify
cfg []config.NotifyService
}
func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(string, string) any {
icon := cfg.GetS("icon", "🚨")
url := cfg.GetS("messageURL", "")
type Attachment struct {
Title string `json:"title"`
Text string `json:"text"`
Fallback string `json:"fallback"`
Footer string `json:"footer"`
TitleLink string `json:"title_link"`
Timestamp int64 `json:"ts"`
}
return func(subject, message string) any {
m := struct {
Username string `json:"username"`
Attachments []Attachment `json:"attachments"`
IconEmoji string `json:"icon_emoji"`
}{
Username: "Stillbox",
Attachments: []Attachment{
{
Title: subject,
Text: message,
TitleLink: url,
Timestamp: time.Now().Unix(),
},
},
IconEmoji: icon,
}
return m
}
}
func (n *notifier) addService(cfg config.NotifyService) error {
switch cfg.Provider {
case "slackwebhook":
hs := http.New()
hs.AddReceivers(&http.Webhook{
ContentType: "application/json",
Header: make(stdhttp.Header),
Method: stdhttp.MethodPost,
URL: cfg.GetS("webhookURL", ""),
BuildPayload: n.buildSlackWebhookPayload(cfg),
})
n.UseServices(hs)
default:
return fmt.Errorf("unknown provider '%s'", cfg.Provider)
}
return nil
}
func New(cfg config.Notify) (Notifier, error) {
n := &notifier{
Notify: notify.NewWithServices(),
}
for _, s := range cfg {
err := n.addService(s)
if err != nil {
return nil, err
}
}
return n, nil
}

View file

@ -11,6 +11,7 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/nexus"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/pkg/gordio/sources"
"github.com/go-chi/chi/v5"
@ -31,6 +32,7 @@ type Server struct {
nex *nexus.Nexus
logger *Logger
alerter alerting.Alerter
notifier notify.Notifier
hup chan os.Signal
}
@ -48,7 +50,14 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
ctx = database.CtxWithDB(ctx, db)
r := chi.NewRouter()
authenticator := auth.NewAuthenticator(cfg.Auth)
notifier, err := notify.New(cfg.Notify)
if err != nil {
return nil, err
}
srv := &Server{
auth: authenticator,
conf: cfg,
@ -56,7 +65,8 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
r: r,
nex: nexus.New(),
logger: logger,
alerter: alerting.New(cfg.Alerting),
alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)),
notifier: notifier,
}
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)

View file

@ -50,3 +50,6 @@ TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE;
-- name: GetSystemName :one
SELECT name FROM systems WHERE id = sqlc.arg(system_id);