Compare commits

..

9 commits

21 changed files with 572 additions and 112 deletions

1
go.mod
View file

@ -11,6 +11,7 @@ require (
github.com/go-chi/httprate v0.9.0 github.com/go-chi/httprate v0.9.0
github.com/go-chi/jwtauth/v5 v5.3.1 github.com/go-chi/jwtauth/v5 v5.3.1
github.com/go-chi/render v1.0.3 github.com/go-chi/render v1.0.3
github.com/go-viper/mapstructure/v2 v2.2.1
github.com/golang-migrate/migrate/v4 v4.17.1 github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3

2
go.sum
View file

@ -44,6 +44,8 @@ github.com/go-chi/jwtauth/v5 v5.3.1 h1:1ePWrjVctvp1tyBq5b/2ER8Th/+RbYc7x4qNsc5rh
github.com/go-chi/jwtauth/v5 v5.3.1/go.mod h1:6Fl2RRmWXs3tJYE1IQGX81FsPoGqDwq9c15j52R5q80= github.com/go-chi/jwtauth/v5 v5.3.1/go.mod h1:6Fl2RRmWXs3tJYE1IQGX81FsPoGqDwq9c15j52R5q80=
github.com/go-chi/render v1.0.3 h1:AsXqd2a1/INaIfUSKq3G5uA8weYx20FOsM7uSoCyyt4= github.com/go-chi/render v1.0.3 h1:AsXqd2a1/INaIfUSKq3G5uA8weYx20FOsM7uSoCyyt4=
github.com/go-chi/render v1.0.3/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0= github.com/go-chi/render v1.0.3/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=

View file

@ -169,16 +169,13 @@ func (as *alerter) eval(ctx context.Context, now time.Time, testMode bool) ([]Al
for _, s := range as.scores { for _, s := range as.scores {
origScore := s.Score origScore := s.Score
tgr, err := as.tgCache.TG(ctx, s.ID) tgr, err := as.tgCache.TG(ctx, s.ID)
if err == nil { if err == nil && !tgr.Talkgroup.Alert {
if !tgr.Talkgroup.Alert { continue
continue
}
s.Score *= float64(tgr.Talkgroup.Weight)
} }
if s.Score > as.cfg.AlertThreshold || testMode { if s.Score > as.cfg.AlertThreshold || testMode {
if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify { if old, inCache := as.alertCache[s.ID]; !inCache || now.Sub(old.Timestamp) > as.renotify {
s.Score = as.tgCache.ApplyAlertRules(s, now) s.Score *= as.tgCache.Weight(ctx, s.ID, now)
a, err := as.makeAlert(ctx, s, origScore) a, err := as.makeAlert(ctx, s, origScore)
if err != nil { if err != nil {
return nil, fmt.Errorf("makeAlert: %w", err) return nil, fmt.Errorf("makeAlert: %w", err)

View file

@ -69,20 +69,20 @@ func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
db := database.FromCtx(ctx) db := database.FromCtx(ctx)
tgs, err := db.GetTalkgroupsByPackedIDs(ctx, as.packedScoredTGs()) tgs, err := db.GetTalkgroupsWithLearnedByPackedIDs(ctx, as.packedScoredTGs())
if err != nil { if err != nil {
log.Error().Err(err).Msg("stats TG get failed") log.Error().Err(err).Msg("stats TG get failed")
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
tgMap := make(map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow, len(tgs)) tgMap := make(map[talkgroups.ID]database.GetTalkgroupsWithLearnedByPackedIDsRow, len(tgs))
for _, t := range tgs { for _, t := range tgs {
tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.ID)}] = t tgMap[talkgroups.ID{System: uint32(t.System.ID), Talkgroup: uint32(t.Talkgroup.Tgid)}] = t
} }
renderData := struct { renderData := struct {
TGs map[talkgroups.ID]database.GetTalkgroupsByPackedIDsRow TGs map[talkgroups.ID]database.GetTalkgroupsWithLearnedByPackedIDsRow
Scores trending.Scores[talkgroups.ID] Scores trending.Scores[talkgroups.ID]
LastScore time.Time LastScore time.Time
Simulation *Simulation Simulation *Simulation

View file

@ -85,8 +85,8 @@
{{ range .Scores }} {{ range .Scores }}
{{ $tg := (index $.TGs .ID) }} {{ $tg := (index $.TGs .ID) }}
<tr> <tr>
<td>{{ $tg.Name_2}}</td> <td>{{ $tg.System.Name}}</td>
<td>{{ $tg.Name}}</td> <td>{{ $tg.Talkgroup.Name}}</td>
<td>{{ .ID.Talkgroup }}</td> <td>{{ .ID.Talkgroup }}</td>
<td>{{ f .Count 0 }}</td> <td>{{ f .Count 0 }}</td>
<td>{{ f .RecentCount 0 }}</td> <td>{{ f .RecentCount 0 }}</td>

127
pkg/api/api.go Normal file
View file

@ -0,0 +1,127 @@
package api
import (
"encoding/json"
"errors"
"net/http"
"dynatron.me/x/stillbox/pkg/talkgroups"
"github.com/go-chi/chi/v5"
"github.com/go-viper/mapstructure/v2"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
type API interface {
Subrouter() http.Handler
}
type api struct {
tgs talkgroups.Store
}
func New(tgs talkgroups.Store) API {
s := &api{
tgs: tgs,
}
return s
}
func (a *api) Subrouter() http.Handler {
r := chi.NewMux()
r.Get("/talkgroup/{system:\\d+}/{id:\\d+}", a.talkgroup)
r.Get("/talkgroup/{system:\\d+}/", a.talkgroup)
r.Get("/talkgroup/", a.talkgroup)
return r
}
var statusMapping = map[error]int{
talkgroups.ErrNotFound: http.StatusNotFound,
pgx.ErrNoRows: http.StatusNotFound,
}
func httpCode(err error) int {
c, ok := statusMapping[err]
if ok {
return c
}
for e, c := range statusMapping { // check if err wraps an error we know about
if errors.Is(err, e) {
return c
}
}
return http.StatusInternalServerError
}
func (a *api) writeResponse(w http.ResponseWriter, r *http.Request, data interface{}, err error) {
if err != nil {
log.Error().Str("path", r.URL.Path).Err(err).Msg("request failed")
http.Error(w, err.Error(), httpCode(err))
return
}
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
err = enc.Encode(data)
if err != nil {
log.Error().Str("path", r.URL.Path).Err(err).Msg("response marshal failed")
http.Error(w, err.Error(), httpCode(err))
return
}
}
func decodeParams(d interface{}, r *http.Request) error {
params := chi.RouteContext(r.Context()).URLParams
m := make(map[string]string, len(params.Keys))
for i, k := range params.Keys {
m[k] = params.Values[i]
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Metadata: nil,
Result: d,
TagName: "param",
WeaklyTypedInput: true,
})
if err != nil {
return err
}
return dec.Decode(m)
}
func (a *api) badReq(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusBadRequest)
}
func (a *api) talkgroup(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
p := struct {
System *int `param:"system"`
ID *int `param:"id"`
}{}
err := decodeParams(&p, r)
if err != nil {
a.badReq(w, err)
return
}
var res interface{}
switch {
case p.System != nil && p.ID != nil:
res, err = a.tgs.TG(ctx, talkgroups.TG(*p.System, *p.ID))
case p.System != nil:
res, err = a.tgs.SystemTGs(ctx, int32(*p.System))
default:
res, err = a.tgs.TGs(ctx, nil)
}
a.writeResponse(w, r, res, err)
}

View file

@ -70,8 +70,11 @@ type NotifyService struct {
} }
func (n *NotifyService) GetS(k, defaultVal string) string { func (n *NotifyService) GetS(k, defaultVal string) string {
if v, has := n.Config[k].(string); has { if v, has := n.Config[k]; has {
return v if v, isString := v.(string); isString {
return v
}
log.Error().Str("configKey", k).Str("provider", n.Provider).Str("default", defaultVal).Msg("notify config value is not a string! using default")
} }
return defaultVal return defaultVal

11
pkg/database/extend.go Normal file
View file

@ -0,0 +1,11 @@
package database
func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetTalkgroup() Talkgroup { return d.Talkgroup }
func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetSystem() System { return d.System }
func (d GetTalkgroupsWithLearnedByPackedIDsRow) GetLearned() bool { return d.Learned }
func (g GetTalkgroupsWithLearnedRow) GetTalkgroup() Talkgroup { return g.Talkgroup }
func (g GetTalkgroupsWithLearnedRow) GetSystem() System { return g.System }
func (g GetTalkgroupsWithLearnedRow) GetLearned() bool { return g.Learned }
func (g GetTalkgroupsWithLearnedBySystemRow) GetTalkgroup() Talkgroup { return g.Talkgroup }
func (g GetTalkgroupsWithLearnedBySystemRow) GetSystem() System { return g.System }
func (g GetTalkgroupsWithLearnedBySystemRow) GetLearned() bool { return g.Learned }

View file

@ -26,10 +26,12 @@ type Querier interface {
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error) GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error) GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)
GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error)
GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error)
GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAllTagsRow, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]GetTalkgroupsWithAnyTagsRow, error)
GetTalkgroupsWithLearned(ctx context.Context) ([]GetTalkgroupsWithLearnedRow, error)
GetTalkgroupsWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsWithLearnedByPackedIDsRow, error)
GetTalkgroupsWithLearnedBySystem(ctx context.Context, system int32) ([]GetTalkgroupsWithLearnedBySystemRow, error)
GetUserByID(ctx context.Context, id int32) (User, error) GetUserByID(ctx context.Context, id int32) (User, error)
GetUserByUID(ctx context.Context, id int32) (User, error) GetUserByUID(ctx context.Context, id int32) (User, error)
GetUserByUsername(ctx context.Context, username string) (User, error) GetUserByUsername(ctx context.Context, username string) (User, error)

View file

@ -151,67 +151,6 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
return i, err return i, err
} }
const getTalkgroupWithLearnedByPackedIDs = `-- name: GetTalkgroupWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE
`
type GetTalkgroupWithLearnedByPackedIDsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupWithLearnedByPackedIDsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupWithLearnedByPackedIDs, dollar_1)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupWithLearnedByPackedIDsRow
for rows.Next() {
var i GetTalkgroupWithLearnedByPackedIDsRow
if err := rows.Scan(
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg SELECT tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id JOIN systems sys ON tg.system_id = sys.id
@ -342,6 +281,188 @@ func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) (
return items, nil return items, nil
} }
const getTalkgroupsWithLearned = `-- name: GetTalkgroupsWithLearned :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE ignored IS NOT TRUE
`
type GetTalkgroupsWithLearnedRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupsWithLearned(ctx context.Context) ([]GetTalkgroupsWithLearnedRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithLearned)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupsWithLearnedRow
for rows.Next() {
var i GetTalkgroupsWithLearnedRow
if err := rows.Scan(
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTalkgroupsWithLearnedByPackedIDs = `-- name: GetTalkgroupsWithLearnedByPackedIDs :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE
`
type GetTalkgroupsWithLearnedByPackedIDsRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupsWithLearnedByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsWithLearnedByPackedIDsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithLearnedByPackedIDs, dollar_1)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupsWithLearnedByPackedIDsRow
for rows.Next() {
var i GetTalkgroupsWithLearnedByPackedIDsRow
if err := rows.Scan(
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTalkgroupsWithLearnedBySystem = `-- name: GetTalkgroupsWithLearnedBySystem :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.system_id = $1
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = $1 AND ignored IS NOT TRUE
`
type GetTalkgroupsWithLearnedBySystemRow struct {
Talkgroup Talkgroup `json:"talkgroup"`
System System `json:"system"`
Learned bool `json:"learned"`
}
func (q *Queries) GetTalkgroupsWithLearnedBySystem(ctx context.Context, system int32) ([]GetTalkgroupsWithLearnedBySystemRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsWithLearnedBySystem, system)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupsWithLearnedBySystemRow
for rows.Next() {
var i GetTalkgroupsWithLearnedBySystemRow
if err := rows.Scan(
&i.Talkgroup.ID,
&i.Talkgroup.SystemID,
&i.Talkgroup.Tgid,
&i.Talkgroup.Name,
&i.Talkgroup.AlphaTag,
&i.Talkgroup.TgGroup,
&i.Talkgroup.Frequency,
&i.Talkgroup.Metadata,
&i.Talkgroup.Tags,
&i.Talkgroup.Alert,
&i.Talkgroup.AlertConfig,
&i.Talkgroup.Weight,
&i.System.ID,
&i.System.Name,
&i.Learned,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const setTalkgroupTags = `-- name: SetTalkgroupTags :exec const setTalkgroupTags = `-- name: SetTalkgroupTags :exec
UPDATE talkgroups SET tags = $3 UPDATE talkgroups SET tags = $3
WHERE id = systg2id($1, $2) WHERE id = systg2id($1, $2)

View file

@ -43,7 +43,46 @@ JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE WHERE tgl.system_id = $1 AND tgl.tgid = $2 AND ignored IS NOT TRUE
` `
const getTalkgroupsWithLearnedBySystemTest = `-- name: GetTalkgroupsWithLearnedBySystem :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.system_id = $1
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tg.system_id = $1 AND ignored IS NOT TRUE
`
const getTalkgroupsWithLearnedTest = `-- name: GetTalkgroupsWithLearned :many
SELECT
tg.id, tg.system_id, tg.tgid, tg.name, tg.alpha_tag, tg.tg_group, tg.frequency, tg.metadata, tg.tags, tg.alert, tg.alert_config, tg.weight, sys.id, sys.name,
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE ignored IS NOT TRUE
`
func TestQueryColumnsMatch(t *testing.T) { func TestQueryColumnsMatch(t *testing.T) {
require.Equal(t, getTalkgroupWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs) require.Equal(t, getTalkgroupsWithLearnedByPackedIDsTest, getTalkgroupWithLearnedByPackedIDs)
require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned) require.Equal(t, getTalkgroupWithLearnedTest, getTalkgroupWithLearned)
require.Equal(t, getTalkgroupsWithLearnedBySystemTest, getTalkgroupsWithLearnedBySystem)
require.Equal(t, getTalkgroupsWithLearnedTest, getTalkgroupsWithLearned)
} }

View file

@ -7,6 +7,7 @@ import (
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"github.com/go-viper/mapstructure/v2"
"github.com/nikoksr/notify" "github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/http" "github.com/nikoksr/notify/service/http"
) )
@ -19,9 +20,7 @@ type notifier struct {
*notify.Notify *notify.Notify
} }
func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(string, string) any { func (n *notifier) buildSlackWebhookPayload(cfg *slackWebhookConfig) func(string, string) any {
icon := cfg.GetS("icon", "🚨")
url := cfg.GetS("messageURL", "")
type Attachment struct { type Attachment struct {
Title string `json:"title"` Title string `json:"title"`
@ -42,27 +41,40 @@ func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(strin
{ {
Title: subject, Title: subject,
Text: message, Text: message,
TitleLink: url, TitleLink: cfg.MessageURL,
Timestamp: time.Now().Unix(), Timestamp: time.Now().Unix(),
}, },
}, },
IconEmoji: icon, IconEmoji: cfg.Icon,
} }
return m return m
} }
} }
type slackWebhookConfig struct {
WebhookURL string `mapstructure:"webhookURL"`
Icon string `mapstructure:"icon"`
MessageURL string `mapstructure:"messageURL"`
}
func (n *notifier) addService(cfg config.NotifyService) error { func (n *notifier) addService(cfg config.NotifyService) error {
switch cfg.Provider { switch cfg.Provider {
case "slackwebhook": case "slackwebhook":
swc := &slackWebhookConfig{
Icon: "🚨",
}
err := mapstructure.Decode(cfg.Config, &swc)
if err != nil {
return err
}
hs := http.New() hs := http.New()
hs.AddReceivers(&http.Webhook{ hs.AddReceivers(&http.Webhook{
ContentType: "application/json", ContentType: "application/json",
Header: make(stdhttp.Header), Header: make(stdhttp.Header),
Method: stdhttp.MethodPost, Method: stdhttp.MethodPost,
URL: cfg.GetS("webhookURL", ""), URL: swc.WebhookURL,
BuildPayload: n.buildSlackWebhookPayload(cfg), BuildPayload: n.buildSlackWebhookPayload(swc),
}) })
n.UseServices(hs) n.UseServices(hs)
default: default:

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.33.0 // protoc-gen-go v1.33.0
// protoc v5.28.2 // protoc v5.28.3
// source: stillbox.proto // source: stillbox.proto
package pb package pb

View file

@ -36,6 +36,7 @@ func (s *Server) setupRoutes() {
s.nex.PrivateRoutes(r) s.nex.PrivateRoutes(r)
s.auth.PrivateRoutes(r) s.auth.PrivateRoutes(r)
s.alerter.PrivateRoutes(r) s.alerter.PrivateRoutes(r)
r.Mount("/api", s.api.Subrouter())
}) })
r.Group(func(r chi.Router) { r.Group(func(r chi.Router) {

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"dynatron.me/x/stillbox/pkg/alerting" "dynatron.me/x/stillbox/pkg/alerting"
"dynatron.me/x/stillbox/pkg/api"
"dynatron.me/x/stillbox/pkg/auth" "dynatron.me/x/stillbox/pkg/auth"
"dynatron.me/x/stillbox/pkg/config" "dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database" "dynatron.me/x/stillbox/pkg/database"
@ -36,6 +37,7 @@ type Server struct {
notifier notify.Notifier notifier notify.Notifier
hup chan os.Signal hup chan os.Signal
tgs talkgroups.Store tgs talkgroups.Store
api api.API
} }
func New(ctx context.Context, cfg *config.Config) (*Server, error) { func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -59,6 +61,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
} }
tgCache := talkgroups.NewCache() tgCache := talkgroups.NewCache()
api := api.New(tgCache)
srv := &Server{ srv := &Server{
auth: authenticator, auth: authenticator,
@ -70,6 +73,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)), alerter: alerting.New(cfg.Alerting, tgCache, alerting.WithNotifier(notifier)),
notifier: notifier, notifier: notifier,
tgs: tgCache, tgs: tgCache,
api: api,
} }
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)

View file

@ -17,6 +17,7 @@ func (s *Server) huppers() []hupper {
return []hupper{ return []hupper{
s.logger, s.logger,
s.auth, s.auth,
s.tgs,
} }
} }

View file

@ -6,7 +6,6 @@ import (
"time" "time"
"dynatron.me/x/stillbox/internal/ruletime" "dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
) )
type AlertConfig struct { type AlertConfig struct {
@ -50,15 +49,15 @@ func (ac *AlertConfig) UnmarshalTGRules(tg ID, confBytes []byte) error {
return nil return nil
} }
func (ac *AlertConfig) ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 { func (ac *AlertConfig) ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64 {
ac.RLock() ac.RLock()
s, has := ac.m[score.ID] s, has := ac.m[id]
ac.RUnlock() ac.RUnlock()
if !has { if !has {
return score.Score return 1.0
} }
final := score.Score final := 1.0
for _, ar := range s { for _, ar := range s {
if ar.MatchTime(t, coversOpts...) { if ar.MatchTime(t, coversOpts...) {

View file

@ -126,7 +126,7 @@ func TestAlertConfig(t *testing.T) {
ID: tc.tg, ID: tc.tg,
Score: tc.origScore, Score: tc.origScore,
} }
assert.Equal(t, tc.expectScore, toFixed(ac.ApplyAlertRules(cs, tc.t), 5)) assert.Equal(t, tc.expectScore, toFixed(cs.Score*ac.ApplyAlertRules(cs.ID, tc.t), 5))
}) })
} }
} }

View file

@ -6,26 +6,32 @@ import (
"sync" "sync"
"time" "time"
"dynatron.me/x/stillbox/pkg/database"
"dynatron.me/x/stillbox/internal/ruletime" "dynatron.me/x/stillbox/internal/ruletime"
"dynatron.me/x/stillbox/internal/trending"
"dynatron.me/x/stillbox/pkg/config"
"dynatron.me/x/stillbox/pkg/database"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type tgMap map[ID]Talkgroup type tgMap map[ID]*Talkgroup
type Store interface { type Store interface {
// TG retrieves a Talkgroup from the Store. // TG retrieves a Talkgroup from the Store.
TG(ctx context.Context, tg ID) (Talkgroup, error) TG(ctx context.Context, tg ID) (*Talkgroup, error)
// TGs retrieves many talkgroups from the Store.
TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error)
// SystemTGs retrieves all Talkgroups associated with a System.
SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error)
// SystemName retrieves a system name from the store. It returns the record and whether one was found. // SystemName retrieves a system name from the store. It returns the record and whether one was found.
SystemName(ctx context.Context, id int) (string, bool) SystemName(ctx context.Context, id int) (string, bool)
// ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score. // ApplyAlertRules applies the score's talkgroup alert rules to the call occurring at t and returns the weighted score.
ApplyAlertRules(score trending.Score[ID], t time.Time, coversOpts ...ruletime.CoversOption) float64 ApplyAlertRules(id ID, t time.Time, coversOpts ...ruletime.CoversOption) float64
// Hint hints the Store that the provided talkgroups will be asked for. // Hint hints the Store that the provided talkgroups will be asked for.
Hint(ctx context.Context, tgs []ID) error Hint(ctx context.Context, tgs []ID) error
@ -35,6 +41,12 @@ type Store interface {
// Invalidate invalidates any caching in the Store. // Invalidate invalidates any caching in the Store.
Invalidate() Invalidate()
// Weight returns the final weight of this talkgroup, including its static and rules-derived weight.
Weight(ctx context.Context, id ID, t time.Time) float64
// Hupper
HUP(*config.Config)
} }
type CtxStoreKeyT string type CtxStoreKeyT string
@ -54,6 +66,10 @@ func StoreFrom(ctx context.Context) Store {
return s return s
} }
func (t *cache) HUP(_ *config.Config) {
t.Invalidate()
}
func (t *cache) Invalidate() { func (t *cache) Invalidate() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -107,7 +123,7 @@ func (t *cache) Hint(ctx context.Context, tgs []ID) error {
return nil return nil
} }
func (t *cache) add(rec Talkgroup) error { func (t *cache) add(rec *Talkgroup) error {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -118,16 +134,70 @@ func (t *cache) add(rec Talkgroup) error {
return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig) return t.AlertConfig.UnmarshalTGRules(tg, rec.Talkgroup.AlertConfig)
} }
func rowToTalkgroup(r database.GetTalkgroupWithLearnedByPackedIDsRow) Talkgroup { type row interface {
return Talkgroup{ database.GetTalkgroupsWithLearnedByPackedIDsRow | database.GetTalkgroupsWithLearnedRow |
Talkgroup: r.Talkgroup, database.GetTalkgroupsWithLearnedBySystemRow
System: r.System, GetTalkgroup() database.Talkgroup
Learned: r.Learned, GetSystem() database.System
GetLearned() bool
}
func rowToTalkgroup[T row](r T) *Talkgroup {
return &Talkgroup{
Talkgroup: r.GetTalkgroup(),
System: r.GetSystem(),
Learned: r.GetLearned(),
} }
} }
func addToRowList[T row](t *cache, r []*Talkgroup, tgRecords []T) ([]*Talkgroup, error) {
for _, rec := range tgRecords {
tg := rowToTalkgroup(rec)
err := t.add(tg)
if err != nil {
return nil, err
}
r = append(r, tg)
}
return r, nil
}
func (t *cache) TGs(ctx context.Context, tgs IDs) ([]*Talkgroup, error) {
r := make([]*Talkgroup, 0, len(tgs))
var err error
if tgs != nil {
toGet := make(IDs, 0, len(tgs))
t.RLock()
for _, id := range tgs {
rec, has := t.tgs[id]
if has {
r = append(r, rec)
} else {
toGet = append(toGet, id)
}
}
t.RUnlock()
tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, toGet.Packed())
if err != nil {
return nil, err
}
return addToRowList(t, r, tgRecords)
}
// get all talkgroups
tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearned(ctx)
if err != nil {
return nil, err
}
return addToRowList(t, r, tgRecords)
}
func (t *cache) Load(ctx context.Context, tgs []int64) error { func (t *cache) Load(ctx context.Context, tgs []int64) error {
tgRecords, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, tgs) tgRecords, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, tgs)
if err != nil { if err != nil {
return err return err
} }
@ -145,7 +215,30 @@ func (t *cache) Load(ctx context.Context, tgs []int64) error {
var ErrNotFound = errors.New("talkgroup not found") var ErrNotFound = errors.New("talkgroup not found")
func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) { func (t *cache) Weight(ctx context.Context, id ID, tm time.Time) float64 {
tg, err := t.TG(ctx, id)
if err != nil {
return 1.0
}
m := float64(tg.Weight)
m *= t.AlertConfig.ApplyAlertRules(id, tm)
return float64(m)
}
func (t *cache) SystemTGs(ctx context.Context, systemID int32) ([]*Talkgroup, error) {
recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedBySystem(ctx, systemID)
if err != nil {
return nil, err
}
r := make([]*Talkgroup, 0, len(recs))
return addToRowList(t, r, recs)
}
func (t *cache) TG(ctx context.Context, tg ID) (*Talkgroup, error) {
t.RLock() t.RLock()
rec, has := t.tgs[tg] rec, has := t.tgs[tg]
t.RUnlock() t.RUnlock()
@ -154,18 +247,18 @@ func (t *cache) TG(ctx context.Context, tg ID) (Talkgroup, error) {
return rec, nil return rec, nil
} }
recs, err := database.FromCtx(ctx).GetTalkgroupWithLearnedByPackedIDs(ctx, []int64{tg.Pack()}) recs, err := database.FromCtx(ctx).GetTalkgroupsWithLearnedByPackedIDs(ctx, []int64{tg.Pack()})
switch err { switch err {
case nil: case nil:
case pgx.ErrNoRows: case pgx.ErrNoRows:
return Talkgroup{}, ErrNotFound return nil, ErrNotFound
default: default:
log.Error().Err(err).Msg("TG() cache add db get") log.Error().Err(err).Msg("TG() cache add db get")
return Talkgroup{}, errors.Join(ErrNotFound, err) return nil, errors.Join(ErrNotFound, err)
} }
if len(recs) < 1 { if len(recs) < 1 {
return Talkgroup{}, ErrNotFound return nil, ErrNotFound
} }
err = t.add(rowToTalkgroup(recs[0])) err = t.add(rowToTalkgroup(recs[0]))

View file

@ -13,8 +13,19 @@ type Talkgroup struct {
} }
type ID struct { type ID struct {
System uint32 System uint32 `json:"sys"`
Talkgroup uint32 Talkgroup uint32 `json:"tg"`
}
type IDs []ID
func (ids *IDs) Packed() []int64 {
r := make([]int64, len(*ids))
for i := range *ids {
r[i] = (*ids)[i].Pack()
}
return r
} }
func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID { func TG[T int | uint | int64 | uint64 | int32 | uint32](sys, tgid T) ID {

View file

@ -51,7 +51,7 @@ FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE; WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE;
-- name: GetTalkgroupWithLearnedByPackedIDs :many -- name: GetTalkgroupsWithLearnedByPackedIDs :many
SELECT SELECT
sqlc.embed(tg), sqlc.embed(sys), sqlc.embed(tg), sqlc.embed(sys),
FALSE learned FALSE learned
@ -69,5 +69,41 @@ FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id JOIN systems sys ON tgl.system_id = sys.id
WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE; WHERE systg2id(tgl.system_id, tgl.tgid) = ANY($1::INT8[]) AND ignored IS NOT TRUE;
-- name: GetTalkgroupsWithLearnedBySystem :many
SELECT
sqlc.embed(tg), sqlc.embed(sys),
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.system_id = @system
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = @system AND ignored IS NOT TRUE;
-- name: GetTalkgroupsWithLearned :many
SELECT
sqlc.embed(tg), sqlc.embed(sys),
FALSE learned
FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
UNION
SELECT
tgl.id::INT8, tgl.system_id::INT4, tgl.tgid::INT4, tgl.name,
tgl.alpha_tag, tgl.alpha_tag, NULL::INTEGER, NULL::JSONB,
CASE WHEN tgl.alpha_tag IS NULL THEN NULL ELSE ARRAY[tgl.alpha_tag] END,
TRUE, NULL::JSONB, 1.0, sys.id, sys.name,
TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE ignored IS NOT TRUE;
-- name: GetSystemName :one -- name: GetSystemName :one
SELECT name FROM systems WHERE id = sqlc.arg(system_id); SELECT name FROM systems WHERE id = sqlc.arg(system_id);