Add talkgroup activity alerting #17

Merged
amigan merged 11 commits from alerting into trunk 2024-10-31 00:20:48 -04:00
38 changed files with 2749 additions and 111 deletions

2
.gitignore vendored
View file

@ -1,4 +1,5 @@
config.yaml
config.test.yaml
mydb.sql
client/calls/
!client/calls/.gitkeep
@ -6,3 +7,4 @@ client/calls/
/calls
Session.vim
*.log
*.dlv

View file

@ -4,12 +4,15 @@ BUILDDATE!=date '+%Y-%m-%e'
LDFLAGS=-ldflags="-X '${VPKG}.Version=${VER}' -X '${VPKG}.Built=${BUILDDATE}'"
all: checkcalls
go build -o gordio ${LDFLAGS} ./cmd/gordio/
go build -o calls ${LDFLAGS} ./cmd/calls/
go build -o gordio ${GOFLAGS} ${LDFLAGS} ./cmd/gordio/
go build -o calls ${GOFLAGS} ${LDFLAGS} ./cmd/calls/
buildpprof:
go build -o gordio-pprof ${GOFLAGS} ${LDFLAGS} -tags pprof ./cmd/gordio
clean:
rm -rf client/calls/ && mkdir client/calls && touch client/calls/.gitkeep
rm -f gordio calls
rm -f gordio calls gordio-pprof
checkcalls:
@test -e client/calls/index.html || make getcalls

View file

@ -24,3 +24,14 @@ rateLimit:
enable: true
requests: 200
over: 2m
alerting:
enable: true
lookbackDays: 7
halfLife: 30m
recent: 2h
alertThreshold: 0.5
renotify: 30m
notify:
- provider: slackwebhook
config:
webhookURL: "http://somewhere"

26
go.mod
View file

@ -1,38 +1,39 @@
module dynatron.me/x/stillbox
go 1.22.5
go 1.23.2
require (
dynatron.me/x/go-minimp3 v0.0.0-20240805171536-7ea857e216d6
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/go-audio/wav v1.1.0
github.com/go-chi/chi v1.5.5
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
github.com/go-chi/httprate v0.9.0
github.com/go-chi/jwtauth/v5 v5.3.1
github.com/go-chi/render v1.0.3
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/hajimehoshi/oto v1.0.1
github.com/jackc/pgx/v5 v5.6.0
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300
golang.org/x/crypto v0.21.0
golang.org/x/sync v0.5.0
golang.org/x/term v0.18.0
google.golang.org/protobuf v1.33.0
golang.org/x/crypto v0.28.0
golang.org/x/sync v0.8.0
golang.org/x/term v0.25.0
google.golang.org/protobuf v1.35.1
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/ajg/form v1.5.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/go-audio/audio v1.0.0 // indirect
github.com/go-audio/riff v1.0.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@ -48,14 +49,17 @@ require (
github.com/lestrrat-go/jwx/v2 v2.0.20 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/nikoksr/notify v1.0.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/exp/shiny v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/image v0.14.0 // indirect
golang.org/x/mobile v0.0.0-20231127183840-76ac6878050a // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
)

66
go.sum
View file

@ -6,8 +6,10 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@ -32,8 +34,6 @@ github.com/go-audio/riff v1.0.0 h1:d8iCGbDvox9BfLagY94fBynxSPHO80LmZCaOsmKxokA=
github.com/go-audio/riff v1.0.0/go.mod h1:l3cQwc85y79NQFCRB7TiPoNiaijp6q8Z0Uv38rVG498=
github.com/go-audio/wav v1.1.0 h1:jQgLtbqBzY7G+BM8fXF7AHUk1uHUviWS4X39d5rsL2g=
github.com/go-audio/wav v1.1.0/go.mod h1:mpe9qfwbScEbkd8uybLuIpTgHyrISw/OTuvjUW2iGtE=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
@ -51,10 +51,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hajimehoshi/oto v1.0.1 h1:8AMnq0Yr2YmzaiqTg/k1Yzd6IygUGk2we9nmjgbgPn4=
@ -97,12 +97,16 @@ github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nikoksr/notify v1.0.1 h1:HkUi4YHASwo3N8UEtDz9GRyEuGyX2Qwe9C6qKK24TYo=
github.com/nikoksr/notify v1.0.1/go.mod h1:w9zFImNfVM7l/gkKFAiyJITKzdC1GH458t4XqMkasZA=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
@ -111,12 +115,14 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
@ -128,14 +134,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 h1:XQdibLKagjdevRB6vAjVY4qbSr8rQ610YzTkWcxzxSI=
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300/go.mod h1:FNa/dfN95vAYCNFrIKRrlRo+MBLbwmR9Asa5f2ljmBI=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp/shiny v0.0.0-20240719175910-8a7402abbf56 h1:8jM66xzUJjNInw31Y8bic4AYSLVChztDRT93+kmofUY=
golang.org/x/exp/shiny v0.0.0-20240719175910-8a7402abbf56/go.mod h1:3F+MieQB7dRYLTmnncoFbb1crS5lfQoTfDgQy6K4N0o=
@ -145,30 +151,28 @@ golang.org/x/image v0.14.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE
golang.org/x/mobile v0.0.0-20190415191353-3e0bab5405d6/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mobile v0.0.0-20231127183840-76ac6878050a h1:sYbmY3FwUWCBTodZL1S3JUuOvaW6kM2o+clDzzDNBWg=
golang.org/x/mobile v0.0.0-20231127183840-76ac6878050a/go.mod h1:Ede7gF0KGoHlj822RtphAHK1jLdrcuRBZg0sF1Q+SPc=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View file

@ -1,6 +1,9 @@
package common
import (
"fmt"
"strconv"
"github.com/spf13/cobra"
)
@ -42,3 +45,10 @@ func PtrOrNull[T comparable](val T) *T {
return &val
}
func FmtFloat(v float64, places ...int) string {
if len(places) > 0 {
return fmt.Sprintf("%."+strconv.Itoa(places[0])+"f", v)
}
return fmt.Sprintf("%.4f", v)
}

View file

@ -0,0 +1,107 @@
package jsontime
import (
"encoding/json"
"strings"
"time"
"github.com/araddon/dateparse"
"gopkg.in/yaml.v3"
)
type Time time.Time
func (t *Time) UnmarshalYAML(n *yaml.Node) error {
var s string
err := n.Decode(&s)
if err != nil {
return err
}
tm, err := dateparse.ParseAny(s)
if err != nil {
return err
}
*t = Time(tm)
return nil
}
func (t *Time) UnmarshalJSON(b []byte) error {
s := strings.Trim(string(b), `"`)
tm, err := dateparse.ParseAny(s)
if err != nil {
return err
}
*t = Time(tm)
return nil
}
func (t Time) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Time(t))
}
func (t Time) String() string {
return time.Time(t).String()
}
func (t Time) Time() time.Time {
return time.Time(t)
}
type Duration time.Duration
func (d *Duration) UnmarshalYAML(n *yaml.Node) error {
var s string
err := n.Decode(&s)
if err != nil {
return err
}
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(dur)
return nil
}
func (d *Duration) UnmarshalJSON(b []byte) error {
s := strings.Trim(string(b), `"`)
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(dur)
return nil
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d))
}
func (d Duration) Duration() time.Duration {
return time.Duration(d)
}
func (d Duration) String() string {
return time.Duration(d).String()
}
func ParseDuration(s string) (Duration, error) {
d, err := time.ParseDuration(s)
return Duration(d), err
}
func ParseAny(s string, opt ...dateparse.ParserOption) (Time, error) {
t, err := dateparse.ParseAny(s, opt...)
return Time(t), err
}
func ParseInLocal(s string, opt ...dateparse.ParserOption) (Time, error) {
t, err := dateparse.ParseIn(s, time.Now().Location(), opt...)
return Time(t), err
}

View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Neri Marschik
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,112 @@
# go-time-series
[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE)
[![GoDoc](https://godoc.org/github.com/codesuki/go-time-series?status.svg)](https://godoc.org/github.com/codesuki/go-time-series)
[![Build Status](http://img.shields.io/travis/codesuki/go-time-series.svg?style=flat)](https://travis-ci.org/codesuki/go-time-series)
[![codecov](https://codecov.io/gh/codesuki/go-time-series/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-time-series)
Time series implementation in Go.
It is used in [go-trending](https://www.github.com/codesuki/go-trending) as a backend for a trending algorithm.
The time series supports storing counts at different granularities, e.g. seconds, minutes, hours, ....<br />
In case of go-trending the time series is configured to have recent data available at small granularity, i.e. the recent 60 seconds, and historical data available at large granularity, i.e. the last few hours, days of data.
A redis backend is planned.
* Simple interface
* Store time series data at different granularities
* Use your own clock implementation, e.g. for testing or similar
## Examples
### Creating a time series with default settings
The default settings use `time.Now()` as clock and `time.Second * 60`, `time.Minute * 60` and `time.Hour * 24` as granularities.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
```
### Creating a customized time series
You can specify the clock and/or granularities to use. A clock must implement the `timeseries.Clock` interface.
```go
import "github.com/codesuki/go-time-series"
...
type clock struct {}
func (c *clock) Now() {
return time.Time{} // always returns the zero time
}
var myClock clock
...
ts, err := timeseries.NewTimeSeries(
timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
{Granularity: time.Minute, Count: 60},
{Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: 7},
}),
timeseries.WithClock(&myClock),
)
if err != nil {
// handle error
}
```
### Filling the time series
To fill the time series with counts, e.g. events, you can use two different functions.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
ts.Increase(2) // adds 2 to the counter at the current time
ts.IncreaseAtTime(3, time.Now().Add(-2 * time.Minute)) // adds 3 to the counter 2 minutes ago
```
### Querying the time series
The `Range()` function takes 2 arguments, i.e. the start and end of a time span.
`Recent()` is a small helper function that just uses `clock.Now()` as `end` in `Range`.
Please refer to the [documentation](https://godoc.org/github.com/codesuki/go-time-series) for how `Range()` works exactly. There are some details depending on what range you query and what range is available.
```go
import "github.com/codesuki/go-time-series"
...
ts, err := timeseries.NewTimeSeries()
if err != nil {
// handle error
}
ts.Increase(2) // adds 2 to the counter at the current time
// 1s passes
ts.Increase(3)
// 1s passes
ts.Recent(5 * time.Second) // returns 5
ts.Range(time.Now().Add(-5 * time.Second), time.Now()) // returns 5
```
## Documentation
GoDoc is located [here](https://godoc.org/github.com/codesuki/go-time-series)
## License
go-time-series is [MIT licensed](./LICENSE).

View file

@ -0,0 +1,133 @@
package timeseries
import (
"time"
"github.com/rs/zerolog/log"
)
type level struct {
clock Clock
granularity time.Duration
length int
end time.Time
oldest int
newest int
buckets []int
}
func newLevel(clock Clock, granularity time.Duration, length int) level {
level := level{clock: clock, granularity: granularity, length: length}
level.init()
return level
}
func (l *level) init() {
buckets := make([]int, l.length)
l.buckets = buckets
l.clear(time.Time{})
}
func (l *level) clear(time time.Time) {
l.oldest = 1
l.newest = 0
l.end = time.Truncate(l.granularity)
for i := range l.buckets {
l.buckets[i] = 0
}
}
func (l *level) duration() time.Duration {
return l.granularity*time.Duration(l.length) - l.granularity
}
func (l *level) earliest() time.Time {
return l.end.Add(-l.duration())
}
func (l *level) latest() time.Time {
return l.end
}
func (l *level) increaseAtTime(amount int, time time.Time) {
difference := l.end.Sub(time.Truncate(l.granularity))
if difference < 0 {
// this cannot be negative because we advance before
// can at least be 0
log.Error().Time("time", time).Msg("level.increaseTime was called with a time in the future")
}
// l.length-1 because the newest element is always l.length-1 away from oldest
steps := (l.length - 1) - int(difference/l.granularity)
index := (l.oldest + steps) % l.length
l.buckets[index] += amount
}
func (l *level) advance(target time.Time) {
if !l.end.Before(target) {
return
}
for target.After(l.end) {
l.end = l.end.Add(l.granularity)
l.buckets[l.oldest] = 0
l.newest = l.oldest
l.oldest = (l.oldest + 1) % len(l.buckets)
}
}
// TODO: find a better way to handle latest parameter
// The parameter is used to avoid the overlap computation if end overlaps with the current time.
// Probably will find away when implementing redis version.
func (l *level) sumInterval(start, end time.Time, latest time.Time) float64 {
if start.Before(l.earliest()) {
start = l.earliest()
}
if end.After(l.latest()) {
end = l.latest()
}
idx := 0
// this is how many time steps start is away from earliest
startSteps := start.Sub(l.earliest()) / l.granularity
idx += int(startSteps)
currentTime := l.earliest()
currentTime = currentTime.Add(startSteps * l.granularity)
sum := 0.0
for idx < l.length && currentTime.Before(end) {
nextTime := currentTime.Add(l.granularity)
if nextTime.After(latest) {
nextTime = latest
}
if nextTime.Before(start) {
// the case nextTime.Before(start) happens when start is after latest
// therefore we don't have data and can return
break
}
count := float64(l.buckets[(l.oldest+idx)%l.length])
if currentTime.Before(start) || nextTime.After(end) {
// current bucket overlaps time range
overlapStart := max(currentTime, start)
overlapEnd := min(nextTime, end)
overlap := overlapEnd.Sub(overlapStart).Seconds() / l.granularity.Seconds()
count *= overlap
}
sum += count
idx++
currentTime = currentTime.Add(l.granularity)
}
return sum
}
func min(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func max(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}

View file

@ -0,0 +1,274 @@
package timeseries
import (
"errors"
"time"
)
// Explanation
// Have several granularity buckets
// 1s, 1m, 5m, ...
// The buckets will be in circular arrays
//
// For example we could have
// 60 1s buckets to make up 1 minute
// 60 1m buckets to make up 1 hour
// ...
// This would enable us to get the last 1 minute data at 1s granularity (every second)
//
// Date ranges are [start, end[
//
// Put:
// Every time an event comes we add it to all corresponding buckets
//
// Example:
// Event time = 12:00:00
// 1s bucket = 12:00:00
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:00:01
// 1s bucket = 12:00:01
// 1m bucket = 12:00:00
// 5m bucket = 12:00:00
//
// Event time = 12:01:01
// 1s bucket = 12:01:01
// 1m bucket = 12:01:00
// 5m bucket = 12:00:00
//
// Fetch:
// Given a time span we try to find the buckets with the finest granularity
// to satisfy the time span and return the sum of their contents
//
// Example:
// Now = 12:05:30
// Time span = 12:05:00 - 12:05:02
// Return sum of 1s buckets 0,1
//
// Now = 12:10:00
// Time span = 12:05:00 - 12:07:00
// Return sum of 1m buckets 5,6
//
// Now = 12:10:00
// Time span = 12:00:00 - 12:10:00 (last 10 minutes)
// Return sum of 5m buckets 0,1
//
// Now = 12:10:01
// Time span = 12:05:01 - 12:10:01 (last 5 minutes)
// Return sum of 5m buckets (59/(5*60))*1, (1/(5*60))*2
//
// Now = 12:10:01
// Time span = 12:04:01 - 12:10:01 (last 6 minutes)
// Return sum of 1m buckets (59/60)*4, 5, 6, 7, 8, 9, (1/60)*10
var (
// ErrBadRange indicates that the given range is invalid. Start should always be <= End
ErrBadRange = errors.New("timeseries: range is invalid")
// ErrBadGranularities indicates that the provided granularities are not strictly increasing
ErrBadGranularities = errors.New("timeseries: granularities must be strictly increasing and non empty")
// ErrRangeNotCovered indicates that the provided range lies outside the time series
ErrRangeNotCovered = errors.New("timeseries: range is not convered")
)
// defaultGranularities are used in case no granularities are provided to the constructor.
var defaultGranularities = []Granularity{
{time.Second, 60},
{time.Minute, 60},
{time.Hour, 24},
}
// Clock specifies the needed time related functions used by the time series.
// To use a custom clock implement the interface and pass it to the time series constructor.
// The default clock uses time.Now()
type Clock interface {
Now() time.Time
}
// DefaultClock is used in case no clock is provided to the constructor.
type defaultClock struct{}
var DefaultClock Clock = &defaultClock{}
func (c *defaultClock) Now() time.Time {
return time.Now()
}
// Granularity describes the granularity for one level of the time series.
// Count cannot be 0.
type Granularity struct {
Granularity time.Duration
Count int
}
type options struct {
clock Clock
granularities []Granularity
}
// Option configures the time series.
type Option func(*options)
// WithClock returns a Option that sets the clock used by the time series.
func WithClock(c Clock) Option {
return func(o *options) {
o.clock = c
}
}
// WithGranularities returns a Option that sets the granularites used by the time series.
func WithGranularities(g []Granularity) Option {
return func(o *options) {
o.granularities = g
}
}
type TimeSeries struct {
clock Clock
levels []level
pending int
pendingTime time.Time
latest time.Time
}
// NewTimeSeries creates a new time series with the provided options.
// If no options are provided default values are used.
func NewTimeSeries(os ...Option) (*TimeSeries, error) {
opts := options{}
for _, o := range os {
o(&opts)
}
if opts.clock == nil {
opts.clock = DefaultClock
}
if opts.granularities == nil {
opts.granularities = defaultGranularities
}
return newTimeSeries(opts.clock, opts.granularities)
}
func newTimeSeries(clock Clock, granularities []Granularity) (*TimeSeries, error) {
err := checkGranularities(granularities)
if err != nil {
return nil, err
}
return &TimeSeries{clock: clock, levels: createLevels(clock, granularities)}, nil
}
func checkGranularities(granularities []Granularity) error {
if len(granularities) == 0 {
return ErrBadGranularities
}
last := time.Duration(0)
for i := 0; i < len(granularities); i++ {
if granularities[i].Count == 0 {
return ErrBadGranularities
}
if granularities[i].Granularity <= last {
return ErrBadGranularities
}
last = granularities[i].Granularity
}
return nil
}
func createLevels(clock Clock, granularities []Granularity) []level {
levels := make([]level, len(granularities))
for i := range granularities {
levels[i] = newLevel(clock, granularities[i].Granularity, granularities[i].Count)
}
return levels
}
// Increase adds amount at current time.
func (t *TimeSeries) Increase(amount int) {
t.IncreaseAtTime(amount, t.clock.Now())
}
// IncreaseAtTime adds amount at a specific time.
func (t *TimeSeries) IncreaseAtTime(amount int, time time.Time) {
if time.After(t.latest) {
t.latest = time
}
if time.After(t.pendingTime) {
t.advance(time)
t.pending = amount
} else if time.After(t.pendingTime.Add(-t.levels[0].granularity)) {
t.pending++
} else {
t.increaseAtTime(amount, time)
}
}
func (t *TimeSeries) increaseAtTime(amount int, time time.Time) {
for i := range t.levels {
if time.Before(t.levels[i].latest().Add(-1 * t.levels[i].duration())) {
continue
}
t.levels[i].increaseAtTime(amount, time)
}
}
func (t *TimeSeries) advance(target time.Time) {
// we need this here because advance is called from other locations
// than IncreaseAtTime that don't check by themselves
if !target.After(t.pendingTime) {
return
}
t.advanceLevels(target)
t.handlePending()
}
func (t *TimeSeries) advanceLevels(target time.Time) {
for i := range t.levels {
if !target.Before(t.levels[i].latest().Add(t.levels[i].duration())) {
t.levels[i].clear(target)
continue
}
t.levels[i].advance(target)
}
}
func (t *TimeSeries) handlePending() {
t.increaseAtTime(t.pending, t.pendingTime)
t.pending = 0
t.pendingTime = t.levels[0].latest()
}
// Recent returns the sum over [now-duration, now).
func (t *TimeSeries) Recent(duration time.Duration) (float64, error) {
now := t.clock.Now()
return t.Range(now.Add(-duration), now)
}
// Range returns the sum over the given range [start, end).
// ErrBadRange is returned if start is after end.
// ErrRangeNotCovered is returned if the range lies outside the time series.
func (t *TimeSeries) Range(start, end time.Time) (float64, error) {
if start.After(end) {
return 0, ErrBadRange
}
t.advance(t.clock.Now())
if ok, err := t.intersects(start, end); !ok {
return 0, err
}
for i := range t.levels {
// use !start.Before so earliest() is included
// if we use earliest().Before() we won't get start
if !start.Before(t.levels[i].earliest()) {
return t.levels[i].sumInterval(start, end, t.latest), nil
}
}
return t.levels[len(t.levels)-1].sumInterval(start, end, t.latest), nil
}
func (t *TimeSeries) intersects(start, end time.Time) (bool, error) {
biggestLevel := t.levels[len(t.levels)-1]
if end.Before(biggestLevel.latest().Add(-biggestLevel.duration())) {
return false, ErrRangeNotCovered
}
if start.After(t.levels[0].latest()) {
return false, ErrRangeNotCovered
}
return true, nil
}

View file

@ -0,0 +1,334 @@
package timeseries
import (
"testing"
"time"
)
// TODO: do table based testing
func setup() (*TimeSeries, *clock.Mock) {
clock := clock.NewMock()
ts, _ := NewTimeSeries(
WithClock(clock),
WithGranularities(
[]Granularity{
{time.Second, 60},
{time.Minute, 60},
},
),
)
return ts, clock
}
func TestClock(t *testing.T) {
clock := &defaultClock{}
// there is a small chance this won't pass
if clock.Now().Truncate(time.Second) != time.Now().Truncate(time.Second) {
t.Errorf("default clock does not track time.Now")
}
}
func TestNewTimeSeries(t *testing.T) {
ts, err := NewTimeSeries()
if ts == nil {
t.Errorf("constructor returned nil")
}
if err != nil {
t.Errorf("should not return error")
}
}
func TestNewTimeSeriesWithGranularities(t *testing.T) {
granularities := []Granularity{
{time.Second, 60},
{time.Minute, 60},
{time.Hour, 24},
}
ts, err := NewTimeSeries(WithGranularities(granularities))
if ts == nil || err != nil {
t.Error("could not create time series")
}
badGranularities := []Granularity{
{time.Minute, 60},
{time.Second, 60},
{time.Hour, 24},
}
_, err = NewTimeSeries(WithGranularities(badGranularities))
if err != ErrBadGranularities {
t.Error("should not accept decreasing granularities")
}
badGranularities = []Granularity{
{time.Minute, 60},
{time.Second, 0},
{time.Hour, 24},
}
_, err = NewTimeSeries(WithGranularities(badGranularities))
if err != ErrBadGranularities {
t.Error("should not accept granularities with zero count")
}
_, err = NewTimeSeries(WithGranularities([]Granularity{}))
if err != ErrBadGranularities {
t.Error("should not accept empty granularities")
}
}
func TestNewTimeSeriesWithClock(t *testing.T) {
clock := clock.NewMock()
ts, _ := NewTimeSeries(WithClock(clock))
ts.Increase(2)
clock.Add(time.Second * 1)
ts.Increase(1)
res, _ := ts.Range(time.Unix(0, 0), time.Unix(1, 0))
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
}
func TestRecentSeconds(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 5)
ts.Increase(1)
clock.Add(time.Second * 1)
ts.Increase(2)
clock.Add(time.Second * 1)
ts.Increase(3)
res, _ := ts.Recent(time.Second)
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
res, _ = ts.Recent(2 * time.Second)
if res != 3 {
t.Errorf("expected %d got %f", 3, res)
}
// test earliest second
clock.Add(57 * time.Second) // time: 09:05:59
res, _ = ts.Recent(59 * time.Second)
if res != 6 {
t.Errorf("expected %d got %f", 6, res)
}
// test future time
clock.Add(1 * time.Second)
clock.Add(57 * time.Second) // time: 09:06:00
res, _ = ts.Recent(59 * time.Second)
if res != 0 {
t.Errorf("expected %d got %f", 0, res)
}
}
func TestRecentMinutes(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// test interpolation at beginning
// 59/60 * 60 + 1 + 60 = 120
res, _ := ts.Recent(2 * time.Minute)
if res != 120 {
t.Errorf("expected %d got %f", 120, res)
}
// test interpolation at end
// 60/2 = 30
res, _ = ts.Range(
clock.Now().Add(-2*time.Minute+-1*time.Second), // 09:01:00
clock.Now().Add(-1*time.Minute+-31*time.Second), // 09:01:30
)
if res != 30 {
t.Errorf("expected %d got %f", 30, res)
}
// get from earliest data point
clock.Add(time.Second*59 + time.Minute*56)
ts.Increase(60)
clock.Add(time.Minute * 1)
ts.Increase(70)
clock.Add(time.Minute * 59)
res, _ = ts.Recent(time.Minute * 60)
if res != 70 {
t.Errorf("expected %d got %f", 70, res)
}
}
func TestRecentWholeRange(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// 60 + 1 + 60 = 121
res, _ := ts.Recent(60 * time.Minute)
if res != 121 {
t.Errorf("expected %d got %f", 62, res)
}
}
func TestRecentWholeRangeBig(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// 60 + 1 + 60 = 121
res, _ := ts.Recent(120 * time.Minute)
if res != 121 {
t.Errorf("expected %d got %f", 121, res)
}
}
func TestRangeEndInFuture(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(1)
res, _ := ts.Range(clock.Now().Add(-1*time.Minute), clock.Now().Add(5*time.Minute))
if res != 0 {
t.Errorf("expected %d got %f", 0, res)
}
}
func TestRangeBadRange(t *testing.T) {
ts, clock := setup()
clock.Add(time.Minute * 1) // 09:01:00
ts.Increase(60)
clock.Add(time.Minute * 1) // 09:02:00
ts.Increase(1)
clock.Add(time.Minute * 1) // 09:03:00
ts.Increase(60)
clock.Add(time.Second * 1) // 09:03:01
ts.Increase(3)
// start is after end
_, err := ts.Range(clock.Now().Add(time.Minute), clock.Now())
if err != ErrBadRange {
t.Errorf("should return ErrBadRange")
}
// range is after end
_, err = ts.Range(clock.Now().Add(time.Minute), clock.Now().Add(5*time.Minute))
if err != ErrRangeNotCovered {
t.Errorf("should return ErrRangeNotCovered")
}
// range is before start
_, err = ts.Range(clock.Now().Add(-5*time.Hour), clock.Now().Add(-4*time.Hour))
if err != ErrRangeNotCovered {
t.Errorf("should return ErrRangeNotCovered")
}
}
func TestIncrease(t *testing.T) {
ts, clock := setup()
// time 12:00
ts.Increase(2)
clock.Add(time.Minute * 1) // time: 12:01:00
ts.Increase(4)
clock.Add(time.Minute * 1) // time: 12:02:00
ts.Increase(6)
clock.Add(time.Second * 10) // time: 12:02:10
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:20
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:30
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:40
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:02:50
ts.Increase(2)
clock.Add(time.Second * 10) // time: 12:03:00
ts.Increase(2)
// get range from 12:00:30 - 12:02:30
// 0.5 * 2 + 4 + 0.5 * 16 = 13
res, _ := ts.Range(clock.Now().Add(-time.Second*150), clock.Now().Add(-time.Second*30))
if res != 13 {
t.Errorf("expected %d got %f", 13, res)
}
// get range from 12:01:00 - 12:02:00
// = 4
res, _ = ts.Range(clock.Now().Add(-time.Second*120), clock.Now().Add(-time.Second*60))
if res != 4 {
t.Errorf("expected %d got %f", 4, res)
}
}
func TestIncreasePending(t *testing.T) {
ts, clock := setup()
ts.Increase(1) // this should advance and reset pending
ts.Increase(1) // this should increase pending
clock.Add(time.Second)
ts.Increase(1)
res, _ := ts.Recent(59 * time.Second)
if res != 2 {
t.Errorf("expected %d got %f", 2, res)
}
clock.Add(time.Second) // the latest data gets merged in because time advanced
res, _ = ts.Recent(59 * time.Second)
if res != 3 {
t.Errorf("expected %d got %f", 3, res)
}
}
func TestIncreaseAtTime(t *testing.T) {
ts, clock := setup()
ts.Increase(60) // time: 09:00:00
clock.Add(time.Second) // time: 09:00:01
ts.IncreaseAtTime(60, clock.Now().Add(-1*time.Minute)) // time: 08:59:01
ts.Increase(1) // time: 09:00:01
// from: 08:59:01 - 09:00:01
// (59/60 * 60) + 60 = 119
res, _ := ts.Recent(time.Minute)
if res != 119 {
t.Errorf("expected %d got %f", 119, res)
}
// from: 08:59:00 - 09:00:00
// 60
res, _ = ts.Range(
clock.Now().Add(-1*time.Minute+-1*time.Second),
clock.Now().Add(-1*time.Second),
)
if res != 60 {
t.Errorf("expected %d got %f", 60, res)
}
}

21
internal/trending/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Neri Marschik
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,74 @@
# go-trending
[![License](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](./LICENSE)
[![GoDoc](https://godoc.org/github.com/codesuki/go-trending?status.svg)](https://godoc.org/github.com/codesuki/go-trending)
[![Build Status](http://img.shields.io/travis/codesuki/go-trending.svg?style=flat)](https://travis-ci.org/codesuki/go-trending)
[![codecov](https://codecov.io/gh/codesuki/go-trending/branch/master/graph/badge.svg)](https://codecov.io/gh/codesuki/go-trending)
Trending algorithm based on the article [Trending at Instagram](http://instagram-engineering.tumblr.com/post/122961624217/trending-at-instagram). To detect trends an items current behavior is compared to its usual behavior. The more it differes the higher / lower the score. Items will start trending if the current usage is higher than its average usage. To avoid items quickly become non-trending again the scores are smoothed.
* Configurable and simple to use
* Use your own clock implementation, e.g. for testing or similar
* Use any time series implementation as backend that implements the TimeSeries interface
### Details
Uses a [time series](https://www.github.com/codesuki/go-time-series) for each item to keep track of its past behavior and get recent behavior with small granularity. Computes the [Kullback-Leibler divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence) between recent behavior and expected, i.e. past, bahavior. Then blends the current item score with its past [decayed](https://en.wikipedia.org/wiki/Exponential_decay) maximum score to get the final score.
## Examples
### Creating a default scorer
```go
import "github.com/codesuki/go-trending"
...
scorer := trending.NewScorer()
```
### Creating a customized scorer
**Parameters**
* **Time series:** is used for creating the backing `TimeSeries` objects
* **Half-life:** controls how long an item is trending after the activity went back to normal.
* **Recent duration:** controls how much data is used to compute the current state. If there is not much activity try looking at larger duration.
* **Storage duration:** controls how much historical data is used. Trends older than the storage duration won't have any effect on the computation. The time series in use should have at least as much storage duration as specified here.
```go
import "github.com/codesuki/go-trending"
...
func NewTimeSeries(id string) TimeSeries {
// create time series that satisfies the TimeSeries interface
return timeSeries
}
...
scorer := trending.NewScorer(
WithTimeSeries(NewTimeSeries),
WithHalflife(time.Hour),
WithRecentDuration(time.Minute),
WithStorageDuration(7 * 24 * time.Hour),
)
```
### Using the scorer
```go
import "github.com/codesuki/go-trending"
...
scorer := trending.NewScorer()
scorer.AddEvent("id", time)
// add more events. maybe using an event stream.
...
trendingItems := scorer.Score()
```
## Documentation
GoDoc is located [here](https://godoc.org/github.com/codesuki/go-trending)
## License
go-trending is [MIT licensed](./LICENSE).

103
internal/trending/item.go Normal file
View file

@ -0,0 +1,103 @@
package trending
import (
"math"
"time"
)
type item[K comparable] struct {
eventSeries TimeSeries
maxSeries SlidingWindow
max float64
maxTime time.Time
options *options[K]
// TODO: move outside of item because it's the same for all items
defaultExpectation float64
defaultHourlyCount float64
}
func newItem[K comparable](id K, options *options[K]) *item[K] {
defaultHourlyCount := float64(options.baseCount) * float64(options.storageDuration/time.Hour)
defaultExpectation := float64(options.baseCount) / float64(time.Hour/options.recentDuration)
return &item[K]{
eventSeries: options.creator(id),
maxSeries: options.slidingWindowCreator(id),
options: options,
defaultExpectation: defaultExpectation,
defaultHourlyCount: defaultHourlyCount,
}
}
func (i *item[K]) score() Score[K] {
recentCount, count := i.computeCounts()
if recentCount < i.options.countThreshold {
return Score[K]{}
}
if recentCount == count {
// we see this for the first time so there is no historical data
// use a sensible default like average/median over all items
count = recentCount + i.defaultHourlyCount
}
probability := recentCount / count
// order of those two lines is important.
// if we insert before reading we might just get the same value.
expectation := i.computeRecentMax()
i.maxSeries.Insert(probability)
if expectation == 0.0 {
expectation = i.defaultExpectation
}
klScore := computeKullbackLeibler(probability, expectation)
if klScore > i.max {
i.updateMax(klScore)
}
i.decayMax()
mixedScore := 5 * (klScore + i.max)
return Score[K]{
Score: mixedScore,
Probability: probability,
Expectation: expectation,
Maximum: i.max,
KLScore: klScore,
Count: count,
RecentCount: recentCount,
}
}
func (i *item[K]) computeCounts() (float64, float64) {
now := i.options.clock.Now()
totalCount, _ := i.eventSeries.Range(now.Add(-i.options.storageDuration), now)
count, _ := i.eventSeries.Range(now.Add(-i.options.recentDuration), now)
return count, totalCount
}
func (i *item[K]) computeRecentMax() float64 {
return i.maxSeries.Max()
}
func (i *item[K]) decayMax() {
i.updateMax(i.max * i.computeExponentialDecayMultiplier())
}
func (i *item[K]) updateMax(score float64) {
i.max = score
i.maxTime = i.options.clock.Now()
}
func (i *item[K]) computeExponentialDecayMultiplier() float64 {
return math.Pow(0.5, float64(i.options.clock.Now().Unix()-i.maxTime.Unix())/i.options.halfLife.Seconds())
}
func computeKullbackLeibler(probability float64, expectation float64) float64 {
if probability == 0.0 {
return 0.0
}
return probability * math.Log(probability/expectation)
}

View file

@ -0,0 +1,42 @@
package trending
type Score[K comparable] struct {
ID K
Score float64
Probability float64
Expectation float64
Maximum float64
KLScore float64
Count float64
RecentCount float64
}
type Scores[K comparable] []Score[K]
func (s Scores[K]) Len() int {
return len(s)
}
func (s Scores[K]) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s Scores[K]) Less(i, j int) bool {
return s[i].Score > s[j].Score
}
func (s Scores[K]) take(count int) Scores[K] {
if count >= len(s) {
return s
}
return s[0 : count-1]
}
func (s Scores[K]) threshold(t float64) Scores[K] {
for i := range s {
if s[i].Score < t {
return s[0:i]
}
}
return s
}

View file

@ -0,0 +1,116 @@
package slidingwindow
import "time"
// Clock specifies the needed time related functions used by the time series.
// To use a custom clock implement the interface and pass it to the time series constructor.
// The default clock uses time.Now()
type Clock interface {
Now() time.Time
}
type slidingWindow struct {
buffer []float64
length int
end time.Time
start time.Time
oldest int
newest int
step time.Duration
duration time.Duration
clock Clock
}
var defaultStep = time.Hour * 24
var defaultDuration = time.Hour * 24 * 7
type options struct {
clock Clock
step time.Duration
duration time.Duration
}
type option func(*options)
func WithStep(step time.Duration) option {
return func(o *options) {
o.step = step
}
}
func WithDuration(duration time.Duration) option {
return func(o *options) {
o.duration = duration
}
}
func WithClock(clock Clock) option {
return func(o *options) {
o.clock = clock
}
}
func NewSlidingWindow(os ...option) *slidingWindow {
opts := options{}
for _, o := range os {
o(&opts)
}
if opts.clock == nil {
panic("clock not set")
}
if opts.step.Nanoseconds() == 0 {
opts.step = defaultStep
}
if opts.duration.Nanoseconds() == 0 {
opts.duration = defaultDuration
}
return newSlidingWindow(opts.step, opts.duration, opts.clock)
}
func newSlidingWindow(step time.Duration, duration time.Duration, clock Clock) *slidingWindow {
length := int(duration / step)
now := clock.Now()
return &slidingWindow{
buffer: make([]float64, length),
length: length,
end: now.Truncate(step).Add(-duration),
start: now,
step: step,
duration: duration,
oldest: 1,
clock: clock,
}
}
func (sw *slidingWindow) Insert(score float64) {
sw.advance()
if score > sw.buffer[sw.newest] {
sw.buffer[sw.newest] = score
}
}
func (sw *slidingWindow) Max() float64 {
sw.advance()
max := 0.0
for i := range sw.buffer {
if sw.buffer[i] > max {
max = sw.buffer[i]
}
}
return max
}
func (sw *slidingWindow) advance() {
newEnd := sw.clock.Now().Truncate(sw.step).Add(-sw.duration)
for newEnd.After(sw.end) {
sw.end = sw.end.Add(sw.step)
sw.buffer[sw.oldest] = 0.0
sw.newest = sw.oldest
sw.oldest = (sw.oldest + 1) % sw.length
}
}

View file

@ -0,0 +1,204 @@
package trending
import (
"sort"
"time"
timeseries "dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending/slidingwindow"
)
// Algorithm:
// 1. Divide one week into 5 minutes bins
// The algorithm uses expected probability to compute its ranking.
// By choosing a one week span to compute the expectation the algorithm will forget old trends.
// 2. For every play event increase the counter in the current bin
// 3. Compute the KL Divergence with the following steps
// - Compute the probability of the last full bin (this should be the current 5 minutes sliding window)
// - Compute the expected probability over the past bins including the current bin
// - Compute KL Divergence (kld = p * ln(p/e))
// 4. Keep the highest KL Divergence score together with its timestamp
// 5. Compute exponential decay multiplier and multiply with highest KL Divergence
// 6. Blend current KL Divergence score with decayed high score
var defaultHalfLife = 2 * time.Hour
var defaultRecentDuration = 5 * time.Minute
var defaultStorageDuration = 7 * 24 * time.Hour
var defaultMaxResults = 100
var defaultBaseCount = 1
var defaultScoreThreshold = 0.01
var defaultCountThreshold = 3.0
type options[K comparable] struct {
creator TimeSeriesCreator[K]
slidingWindowCreator SlidingWindowCreator[K]
clock timeseries.Clock
halfLife time.Duration
recentDuration time.Duration
storageDuration time.Duration
maxResults int
baseCount int
scoreThreshold float64
countThreshold float64
}
type Option[K comparable] func(*options[K])
func WithTimeSeries[K comparable](creator TimeSeriesCreator[K]) Option[K] {
return func(o *options[K]) {
o.creator = creator
}
}
func WithSlidingWindow[K comparable](creator SlidingWindowCreator[K]) Option[K] {
return func(o *options[K]) {
o.slidingWindowCreator = creator
}
}
func WithHalfLife[K comparable](halfLife time.Duration) Option[K] {
return func(o *options[K]) {
o.halfLife = halfLife
}
}
func WithRecentDuration[K comparable](recentDuration time.Duration) Option[K] {
return func(o *options[K]) {
o.recentDuration = recentDuration
}
}
func WithStorageDuration[K comparable](storageDuration time.Duration) Option[K] {
return func(o *options[K]) {
o.storageDuration = storageDuration
}
}
func WithMaxResults[K comparable](maxResults int) Option[K] {
return func(o *options[K]) {
o.maxResults = maxResults
}
}
func WithScoreThreshold[K comparable](threshold float64) Option[K] {
return func(o *options[K]) {
o.scoreThreshold = threshold
}
}
func WithCountThreshold[K comparable](threshold float64) Option[K] {
return func(o *options[K]) {
o.countThreshold = threshold
}
}
func WithClock[K comparable](clock timeseries.Clock) Option[K] {
return func(o *options[K]) {
o.clock = clock
}
}
type Scorer[K comparable] struct {
options options[K]
items map[K]*item[K]
}
type SlidingWindow interface {
Insert(score float64)
Max() float64
}
type SlidingWindowCreator[K comparable] func(K) SlidingWindow
type TimeSeries interface {
IncreaseAtTime(amount int, time time.Time)
Range(start, end time.Time) (float64, error)
}
type TimeSeriesCreator[K comparable] func(K) TimeSeries
func NewMemoryTimeSeries[K comparable](id K) TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
{Granularity: time.Minute, Count: 10},
{Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: 7},
},
))
return ts
}
func NewScorer[K comparable](options ...Option[K]) Scorer[K] {
scorer := Scorer[K]{items: make(map[K]*item[K])}
for _, o := range options {
o(&scorer.options)
}
if scorer.options.creator == nil {
scorer.options.creator = NewMemoryTimeSeries[K]
}
if scorer.options.halfLife == 0 {
scorer.options.halfLife = defaultHalfLife
}
if scorer.options.recentDuration == 0 {
scorer.options.recentDuration = defaultRecentDuration
}
if scorer.options.storageDuration == 0 {
scorer.options.storageDuration = defaultStorageDuration
}
if scorer.options.maxResults == 0 {
scorer.options.maxResults = defaultMaxResults
}
if scorer.options.scoreThreshold == 0.0 {
scorer.options.scoreThreshold = defaultScoreThreshold
}
if scorer.options.countThreshold == 0.0 {
scorer.options.countThreshold = defaultCountThreshold
}
if scorer.options.baseCount == 0.0 {
scorer.options.baseCount = defaultBaseCount
}
if scorer.options.clock == nil {
scorer.options.clock = timeseries.DefaultClock
}
if scorer.options.slidingWindowCreator == nil {
scorer.options.slidingWindowCreator = func(id K) SlidingWindow {
return slidingwindow.NewSlidingWindow(
slidingwindow.WithStep(time.Hour*24),
slidingwindow.WithDuration(scorer.options.storageDuration),
slidingwindow.WithClock(scorer.options.clock),
)
}
}
return scorer
}
func (s *Scorer[K]) AddEvent(id K, time time.Time) {
item := s.items[id]
if item == nil {
item = newItem(id, &s.options)
s.items[id] = item
}
s.addToItem(item, time)
}
func (s *Scorer[K]) addToItem(item *item[K], tm time.Time) {
item.eventSeries.IncreaseAtTime(1, tm)
}
func (s *Scorer[K]) Score() Scores[K] {
var scores Scores[K]
for id, item := range s.items {
score := item.score()
score.ID = id
scores = append(scores, score)
}
sort.Sort(scores)
if s.options.scoreThreshold > 0 {
scores = scores.threshold(s.options.scoreThreshold)
}
return scores.take(s.options.maxResults)
}

View file

@ -0,0 +1,372 @@
package alerting
import (
"bytes"
"context"
"fmt"
"net/http"
"sort"
"strconv"
"sync"
"text/template"
"time"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/internal/timeseries"
"dynatron.me/x/stillbox/internal/trending"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
const (
ScoreThreshold = -1
CountThreshold = 1.0
NotificationSubject = "Stillbox Alert"
DefaultRenotify = 30 * time.Minute
alerterTickInterval = time.Minute
)
type Alerter interface {
sinks.Sink
Enabled() bool
Go(context.Context)
stats
}
type alerter struct {
sync.RWMutex
clock timeseries.Clock
cfg config.Alerting
scorer trending.Scorer[cl.Talkgroup]
scores trending.Scores[cl.Talkgroup]
lastScore time.Time
sim *Simulation
notifyCache map[cl.Talkgroup]time.Time
renotify time.Duration
notifier notify.Notifier
}
type offsetClock time.Duration
func (c *offsetClock) Now() time.Time {
return time.Now().Add(c.Duration())
}
func (c *offsetClock) Duration() time.Duration {
return time.Duration(*c)
}
// OffsetClock returns a clock whose Now() method returns the specified offset from the current time.
func OffsetClock(d time.Duration) offsetClock {
return offsetClock(d)
}
type AlertOption func(*alerter)
// WithClock makes the alerter use a simulated clock.
func WithClock(clock timeseries.Clock) AlertOption {
return func(as *alerter) {
as.clock = clock
}
}
// WithNotifier sets the notifier
func WithNotifier(n notify.Notifier) AlertOption {
return func(as *alerter) {
as.notifier = n
}
}
// New creates a new Alerter using the provided configuration.
func New(cfg config.Alerting, opts ...AlertOption) Alerter {
if !cfg.Enable {
return &noopAlerter{}
}
as := &alerter{
cfg: cfg,
notifyCache: make(map[cl.Talkgroup]time.Time),
clock: timeseries.DefaultClock,
renotify: DefaultRenotify,
}
if cfg.Renotify != nil {
as.renotify = cfg.Renotify.Duration()
}
for _, opt := range opts {
opt(as)
}
as.scorer = trending.NewScorer[cl.Talkgroup](
trending.WithTimeSeries(as.newTimeSeries),
trending.WithStorageDuration[cl.Talkgroup](time.Hour*24*time.Duration(cfg.LookbackDays)),
trending.WithRecentDuration[cl.Talkgroup](time.Duration(cfg.Recent)),
trending.WithHalfLife[cl.Talkgroup](time.Duration(cfg.HalfLife)),
trending.WithScoreThreshold[cl.Talkgroup](ScoreThreshold),
trending.WithCountThreshold[cl.Talkgroup](CountThreshold),
trending.WithClock[cl.Talkgroup](as.clock),
)
return as
}
// Go is the alerting loop. It does not start a goroutine.
func (as *alerter) Go(ctx context.Context) {
as.startBackfill(ctx)
as.score(ctx, time.Now())
ticker := time.NewTicker(alerterTickInterval)
for {
select {
case now := <-ticker.C:
as.score(ctx, now)
err := as.notify(ctx)
if err != nil {
log.Error().Err(err).Msg("notify")
}
as.cleanCache()
case <-ctx.Done():
ticker.Stop()
return
}
}
}
const notificationTemplStr = `{{ range . -}}
{{ .TGName }} is active with a score of {{ f .Score.Score 4 }}! ({{ f .Score.RecentCount 0 }} recent calls out of {{ .Score.Count }})
{{ end }}`
var notificationTemplate = template.Must(template.New("notification").Funcs(funcMap).Parse(notificationTemplStr))
func (as *alerter) testNotifyHandler(w http.ResponseWriter, r *http.Request) {
as.RLock()
defer as.RUnlock()
ns := make([]notification, 0, len(as.scores))
ctx := r.Context()
for _, s := range as.scores {
n, err := makeNotification(ctx, s)
if err != nil {
log.Error().Err(err).Msg("test notificaiton")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ns = append(ns, n)
}
err := as.sendNotification(ctx, ns)
if err != nil {
log.Error().Err(err).Msg("test notification send")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("Sent"))
}
// notify iterates the scores and sends out any necessary notifications
func (as *alerter) notify(ctx context.Context) error {
if as.notifier == nil {
return nil
}
now := time.Now()
as.Lock()
defer as.Unlock()
var notifications []notification
for _, s := range as.scores {
if s.Score > as.cfg.AlertThreshold {
if t, inCache := as.notifyCache[s.ID]; !inCache || now.Sub(t) > as.renotify {
as.notifyCache[s.ID] = time.Now()
n, err := makeNotification(ctx, s)
if err != nil {
return err
}
notifications = append(notifications, n)
}
}
}
if len(notifications) > 0 {
return as.sendNotification(ctx, notifications)
}
return nil
}
type notification struct {
TGName string
Score trending.Score[cl.Talkgroup]
}
// sendNotification renders and sends the notification.
func (as *alerter) sendNotification(ctx context.Context, n []notification) error {
msgBuffer := new(bytes.Buffer)
err := notificationTemplate.Execute(msgBuffer, n)
if err != nil {
return fmt.Errorf("notification template render: %w", err)
}
log.Debug().Str("msg", msgBuffer.String()).Msg("notifying")
return as.notifier.Send(ctx, NotificationSubject, msgBuffer.String())
}
// makeNotification creates a notification for later rendering by the template.
// It takes a talkgroup Score as input.
func makeNotification(ctx context.Context, tg trending.Score[cl.Talkgroup]) (notification, error) {
d := notification{
Score: tg,
}
db := database.FromCtx(ctx)
tgRecord, err := db.GetTalkgroupWithLearned(ctx, int(tg.ID.System), int(tg.ID.Talkgroup))
switch err {
case nil:
if tgRecord.SystemName == "" {
tgRecord.SystemName = strconv.Itoa(int(tg.ID.System))
}
if tgRecord.Name != nil {
d.TGName = fmt.Sprintf("%s %s", tgRecord.SystemName, *tgRecord.Name)
} else {
d.TGName = fmt.Sprintf("%s:%d", tgRecord.SystemName, int(tg.ID.Talkgroup))
}
case pgx.ErrNoRows:
system, err := db.GetSystemName(ctx, int(tg.ID.System))
switch err {
case nil:
d.TGName = fmt.Sprintf("%s:%d", system, int(tg.ID.Talkgroup))
case pgx.ErrNoRows:
d.TGName = fmt.Sprintf("%d:%d", int(tg.ID.System), int(tg.ID.Talkgroup))
default:
return d, fmt.Errorf("sendNotification get system: %w", err)
}
default:
return d, fmt.Errorf("sendNotification get talkgroup: %w", err)
}
return d, nil
}
// cleanCache clears the cache of aged-out entries
func (as *alerter) cleanCache() {
if as.notifier == nil {
return
}
now := time.Now()
as.Lock()
defer as.Unlock()
for k, t := range as.notifyCache {
if now.Sub(t) > as.renotify {
delete(as.notifyCache, k)
}
}
}
func (as *alerter) newTimeSeries(id cl.Talkgroup) trending.TimeSeries {
ts, _ := timeseries.NewTimeSeries(timeseries.WithGranularities(
[]timeseries.Granularity{
{Granularity: time.Second, Count: 60},
{Granularity: time.Minute, Count: 10},
{Granularity: time.Hour, Count: 24},
{Granularity: time.Hour * 24, Count: int(as.cfg.LookbackDays)},
},
), timeseries.WithClock(as.clock))
return ts
}
func (as *alerter) startBackfill(ctx context.Context) error {
now := time.Now()
since := now.Add(-24 * time.Hour * time.Duration(as.cfg.LookbackDays))
log.Debug().Time("since", since).Msg("starting stats backfill")
count, err := as.backfill(ctx, since, now)
if err != nil {
return fmt.Errorf("backfill failed: %w", err)
}
log.Debug().Int("callsCount", count).Str("in", time.Now().Sub(now).String()).Int("tgCount", as.scorer.Score().Len()).Msg("backfill finished")
return nil
}
func (as *alerter) score(ctx context.Context, now time.Time) {
as.Lock()
defer as.Unlock()
as.scores = as.scorer.Score()
as.lastScore = now
sort.Sort(as.scores)
}
func (as *alerter) backfill(ctx context.Context, since time.Time, until time.Time) (count int, err error) {
db := database.FromCtx(ctx)
const backfillStatsQuery = `SELECT system, talkgroup, call_date FROM calls WHERE call_date > $1 AND call_date < $2 ORDER BY call_date ASC`
rows, err := db.Query(ctx, backfillStatsQuery, since, until)
if err != nil {
return count, err
}
defer rows.Close()
as.Lock()
defer as.Unlock()
for rows.Next() {
var tg cl.Talkgroup
var callDate time.Time
if err := rows.Scan(&tg.System, &tg.Talkgroup, &callDate); err != nil {
return count, err
}
as.scorer.AddEvent(tg, callDate)
if as.sim != nil { // step the simulator if it is active
as.sim.stepClock(callDate)
}
count++
}
if err := rows.Err(); err != nil {
return count, err
}
return count, nil
}
func (as *alerter) SinkType() string {
return "alerting"
}
func (as *alerter) Call(ctx context.Context, call *cl.Call) error {
as.Lock()
defer as.Unlock()
as.scorer.AddEvent(call.TalkgroupTuple(), call.DateTime)
return nil
}
func (*alerter) Enabled() bool { return true }
// noopAlerter is used when alerting is disabled.
type noopAlerter struct{}
func (*noopAlerter) SinkType() string { return "noopAlerter" }
func (*noopAlerter) Call(_ context.Context, _ *cl.Call) error { return nil }
func (*noopAlerter) Go(_ context.Context) {}
func (*noopAlerter) Enabled() bool { return false }

View file

@ -0,0 +1,172 @@
package alerting
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"time"
"dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending"
cl "dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config"
"github.com/rs/zerolog/log"
)
// A Simulation simulates what happens to the alerter during a specified time
// period using past data from the database.
type Simulation struct {
// normal Alerting config
config.Alerting
// ScoreStart is the time when scoring begins
ScoreStart jsontime.Time `json:"scoreStart"`
// ScoreEnd is the time when the score simulator ends. Left blank, it defaults to time.Now()
ScoreEnd jsontime.Time `json:"scoreEnd"`
// SimInterval is the interval at which the scorer will be called
SimInterval jsontime.Duration `json:"simInterval"`
clock offsetClock `json:"-"`
*alerter `json:"-"`
}
func (s *Simulation) verify() error {
switch {
case !s.ScoreEnd.Time().IsZero() && s.ScoreStart.Time().After(s.ScoreEnd.Time()):
return errors.New("end is before start")
case s.LookbackDays > 14:
return errors.New("lookback days >14")
}
return nil
}
// stepClock is called by backfill during simulation operations.
func (s *Simulation) stepClock(t time.Time) {
now := s.clock.Now()
step := t.Sub(s.lastScore)
if step > time.Duration(s.SimInterval) {
s.clock += offsetClock(s.SimInterval)
s.scores = s.scorer.Score()
s.lastScore = now
}
}
// Simulate begins the simulation using the DB handle from ctx. It returns final scores.
func (s *Simulation) Simulate(ctx context.Context) trending.Scores[cl.Talkgroup] {
now := time.Now()
s.Enable = true
s.alerter = New(s.Alerting, WithClock(&s.clock)).(*alerter)
if time.Time(s.ScoreEnd).IsZero() {
s.ScoreEnd = jsontime.Time(now)
}
log.Debug().Time("scoreStart", s.ScoreStart.Time()).
Time("scoreEnd", s.ScoreEnd.Time()).
Str("interval", s.SimInterval.String()).
Uint("lookbackDays", s.LookbackDays).
Msg("simulation start")
scoreEnd := time.Time(s.ScoreEnd)
// compute lookback start Time
sinceLookback := time.Time(scoreEnd).Add(-24 * time.Hour * time.Duration(s.LookbackDays))
// backfill from lookback start until score start
s.backfill(ctx, sinceLookback, time.Time(s.ScoreStart))
// initial score
s.scores = s.scorer.Score()
s.lastScore = time.Time(s.ScoreStart)
ssT := time.Time(s.ScoreStart)
nowDiff := now.Sub(time.Time(ssT))
// and set the clock offset to it
s.clock -= offsetClock(nowDiff)
// turn on sim mode
s.alerter.sim = s
// compute time since score start until now
// backfill from scorestart until now. sim is enabled, so scoring will be done by stepClock()
s.backfill(ctx, time.Time(s.ScoreStart), scoreEnd)
s.lastScore = scoreEnd
sort.Sort(s.scores)
return s.scores
}
// simulateHandler is the POST endpoint handler.
func (as *alerter) simulateHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
s := new(Simulation)
switch r.Header.Get("Content-Type") {
case "application/json":
err := json.NewDecoder(r.Body).Decode(s)
if err != nil {
err = fmt.Errorf("simulate decode: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
default:
err := r.ParseForm()
if err != nil {
err = fmt.Errorf("simulate form parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
lbd, err := strconv.Atoi(r.Form["lookbackDays"][0])
if err != nil {
err = fmt.Errorf("lookbackDays parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.LookbackDays = uint(lbd)
s.HalfLife, err = jsontime.ParseDuration(r.Form["halfLife"][0])
if err != nil {
err = fmt.Errorf("halfLife parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.Recent, err = jsontime.ParseDuration(r.Form["recent"][0])
if err != nil {
err = fmt.Errorf("recent parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.SimInterval, err = jsontime.ParseDuration(r.Form["simInterval"][0])
if err != nil {
err = fmt.Errorf("simInterval parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.ScoreStart, err = jsontime.ParseInLocal(r.Form["scoreStart"][0])
if err != nil {
err = fmt.Errorf("scoreStart parse: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.ScoreEnd, err = jsontime.ParseInLocal(r.Form["scoreEnd"][0])
if err != nil {
s.ScoreEnd = jsontime.Time{}
}
}
err := s.verify()
if err != nil {
err = fmt.Errorf("simulation profile verify: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.Simulate(ctx)
s.tgStatsHandler(w, r)
}

View file

@ -0,0 +1,108 @@
package alerting
import (
_ "embed"
"errors"
"html/template"
"net/http"
"time"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/internal/common"
"dynatron.me/x/stillbox/internal/jsontime"
"dynatron.me/x/stillbox/internal/trending"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
)
//go:embed stats.html
var statsTemplateFile string
type stats interface {
PrivateRoutes(chi.Router)
}
var (
funcMap = template.FuncMap{
"f": common.FmtFloat,
"dict": func(values ...interface{}) (map[string]interface{}, error) {
if len(values)%2 != 0 {
return nil, errors.New("invalid dict call")
}
dict := make(map[string]interface{}, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].(string)
if !ok {
return nil, errors.New("dict keys must be strings")
}
dict[key] = values[i+1]
}
return dict, nil
},
"formTime": func(t jsontime.Time) string {
return time.Time(t).Format("2006-01-02T15:04")
},
"ago": func(s string) (string, error) {
d, err := time.ParseDuration(s)
if err != nil {
return "", err
}
return time.Now().Add(-d).Format("2006-01-02T15:04"), nil
},
}
statTmpl = template.Must(template.New("stats").Funcs(funcMap).Parse(statsTemplateFile))
)
func (as *alerter) PrivateRoutes(r chi.Router) {
r.Get("/tgstats", as.tgStatsHandler)
r.Post("/tgstats", as.simulateHandler)
r.Get("/testnotify", as.testNotifyHandler)
}
func (as *noopAlerter) PrivateRoutes(r chi.Router) {}
func (as *alerter) tgStatsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
db := database.FromCtx(ctx)
packed := make([]int64, 0, len(as.scores))
for _, s := range as.scores {
packed = append(packed, s.ID.Pack())
}
tgs, err := db.GetTalkgroupsByPackedIDs(ctx, packed)
if err != nil {
log.Error().Err(err).Msg("stats TG get failed")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tgMap := make(map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow, len(tgs))
for _, t := range tgs {
tgMap[calls.Talkgroup{System: uint32(t.SystemID), Talkgroup: uint32(t.ID)}] = t
}
renderData := struct {
TGs map[calls.Talkgroup]database.GetTalkgroupsByPackedIDsRow
Scores trending.Scores[calls.Talkgroup]
LastScore time.Time
Simulation *Simulation
Config config.Alerting
}{
TGs: tgMap,
Scores: as.scores,
LastScore: as.lastScore,
Config: as.cfg,
Simulation: as.sim,
}
w.WriteHeader(http.StatusOK)
err = statTmpl.Execute(w, renderData)
if err != nil {
log.Error().Err(err).Msg("stat template exec")
}
}

View file

@ -0,0 +1,109 @@
<!DOCTYPE html>
<html>
<head>
<title>Stats</title>
<style>
*, *:before, *:after {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background: #105469;
font-family: sans-serif;
}
table, #simform {
background: #012B39;
border-radius: 0.25em;
border-collapse: collapse;
margin-top: 1em;
margin-bottom: 1em;
margin-left: auto;
margin-right: auto;
}
th {
border-bottom: 1px solid #364043;
color: #E2B842;
font-size: 0.85em;
font-weight: 600;
padding: 0.5em 1em;
text-align: left;
}
td, #simform {
color: #fff;
font-weight: 400;
padding: 0.65em 1em;
}
.disabled td {
color: #4F5F64;
}
tbody tr {
transition: background 0.25s ease;
}
tbody tr:hover {
background: #014055;
}
</style>
</head>
<body>
<div id="simform">
<form action="/tgstats" method="POST">
{{ define "simform" }}
<label for="lookbackDays">Lookback Days</label> <input id="lookbackDays" name="lookbackDays" type="number" min="1" max="14" value="{{ .Lookback }}" />
<label for="halfLife">Half life</label> <input id="halfLife" name="halfLife" type="text" value="{{ .HalfLife }}" />
<label for="recent">Recent</label> <input id="recent" name="recent" type="text" value="{{ .Recent }}" />
<label for="simInterval">Sim Interval</label> <input id="simInterval" name="simInterval" type="text" value="{{ .SimInterval }}" />
<label for="scoreStart">Score Start</label> <input id="scoreStart" name="scoreStart" type="datetime-local" value="{{ .ScoreStart }}" />
<label for="scoreEnd">Score End</label> <input id="scoreEnd" name="scoreEnd" type="datetime-local" value="{{ .ScoreEnd }}" />
<input type="submit" value="Simulate" />
{{end}}
{{ if .Simulation }}
{{ template "simform" dict "Lookback" .Simulation.LookbackDays "HalfLife" .Simulation.HalfLife "Recent" .Simulation.Recent "ScoreStart" (formTime .Simulation.ScoreStart) "ScoreEnd" (formTime .Simulation.ScoreEnd) "SimInterval" .Simulation.SimInterval }}
{{ else }}
{{ template "simform" dict "Lookback" .Config.LookbackDays "HalfLife" .Config.HalfLife "Recent" .Config.Recent "ScoreStart" (ago "72h") "ScoreEnd" "" "SimInterval" "5m" }}
{{ end }}
</form>
</div>
<table>
{{ if .Simulation }}
<tr>
<td colspan="10">Simulating from {{ formTime .Simulation.ScoreStart }} until {{ formTime .Simulation.ScoreEnd }}</td>
</tr>
{{ end }}
<tr>
<th>System</th>
<th>TG</th>
<th>TG ID</th>
<th>Count</th>
<th>Recent</th>
<th>Score</th>
<th>Probab</th>
<th>Expect</th>
<th>Max</th>
<th>KL</th>
</tr>
{{ range .Scores }}
{{ $tg := (index $.TGs .ID) }}
<tr>
<td>{{ $tg.Name_2}}</td>
<td>{{ $tg.Name}}</td>
<td>{{ .ID.Talkgroup }}</td>
<td>{{ f .Count 0 }}</td>
<td>{{ f .RecentCount 0 }}</td>
<td>{{ f .Score }}</td>
<td>{{ f .Probability }}</td>
<td>{{ f .Expectation }}</td>
<td>{{ f .Maximum }}</td>
<td>{{ f .KLScore }}</td>
</tr>
{{else}}
<tr>
<td colspan="10">No Data</td>
</tr>
{{ end }}
<tr>
<td colspan="10">Last updated at {{ .LastScore }}</td>
</tr>
</table>
</body>
</html>

View file

@ -2,6 +2,7 @@ package auth
import (
"context"
"encoding/json"
"net/http"
"strconv"
"strings"
@ -168,18 +169,35 @@ func (a *Auth) routeRefresh(w http.ResponseWriter, r *http.Request) {
}
func (a *Auth) routeAuth(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
var creds struct {
Username string `json:"username"`
Password string `json:"password"`
}
var err error
switch r.Header.Get("Content-Type") {
case "application/json":
err = json.NewDecoder(r.Body).Decode(&creds)
default:
err = r.ParseForm()
if err != nil {
break
}
creds.Username, creds.Password = r.PostFormValue("username"), r.PostFormValue("password")
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
username, password := r.PostFormValue("username"), r.PostFormValue("password")
if username == "" || password == "" {
if creds.Username == "" || creds.Password == "" {
http.Error(w, "blank credentials", http.StatusBadRequest)
return
}
tok, err := a.Login(r.Context(), username, password)
tok, err := a.Login(r.Context(), creds.Username, creds.Password)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return

View file

@ -5,6 +5,8 @@ import (
"sync"
"time"
"dynatron.me/x/stillbox/internal/jsontime"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
@ -14,10 +16,12 @@ type Config struct {
DB DB `yaml:"db"`
CORS CORS `yaml:"cors"`
Auth Auth `yaml:"auth"`
Alerting Alerting `yaml:"alerting"`
Log []Logger `yaml:"log"`
Listen string `yaml:"listen"`
Public bool `yaml:"public"`
RateLimit RateLimit `yaml:"rateLimit"`
Notify Notify `yaml:"notify"`
configPath string
}
@ -49,6 +53,30 @@ type RateLimit struct {
verifyError sync.Once
}
type Alerting struct {
Enable bool `yaml:"enable"`
LookbackDays uint `yaml:"lookbackDays"`
HalfLife jsontime.Duration `yaml:"halfLife"`
Recent jsontime.Duration `yaml:"recent"`
AlertThreshold float64 `yaml:"alertThreshold"`
Renotify *jsontime.Duration `yaml:"renotify,omitempty"`
}
type Notify []NotifyService
type NotifyService struct {
Provider string `json:"provider"`
Config map[string]interface{} `json:"config"`
}
func (n *NotifyService) GetS(k, defaultVal string) string {
if v, has := n.Config[k].(string); has {
return v
}
return defaultVal
}
func (rl *RateLimit) Verify() bool {
if rl.Enable {
if rl.Requests > 0 && rl.Over > 0 {

View file

@ -7,9 +7,9 @@ package database
import (
"context"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
)
const addCall = `-- name: AddCall :one
@ -36,22 +36,22 @@ RETURNING id
`
type AddCallParams struct {
Submitter *int32 `json:"submitter"`
System int `json:"system"`
Talkgroup int `json:"talkgroup"`
CallDate time.Time `json:"call_date"`
AudioName *string `json:"audio_name"`
AudioBlob []byte `json:"audio_blob"`
AudioType *string `json:"audio_type"`
AudioUrl *string `json:"audio_url"`
Duration *int32 `json:"duration"`
Frequency int `json:"frequency"`
Frequencies []int `json:"frequencies"`
Patches []int `json:"patches"`
TgLabel *string `json:"tg_label"`
TgAlphaTag *string `json:"tg_alpha_tag"`
TgGroup *string `json:"tg_group"`
Source int `json:"source"`
Submitter *int32 `json:"submitter"`
System int `json:"system"`
Talkgroup int `json:"talkgroup"`
CallDate pgtype.Timestamptz `json:"call_date"`
AudioName *string `json:"audio_name"`
AudioBlob []byte `json:"audio_blob"`
AudioType *string `json:"audio_type"`
AudioUrl *string `json:"audio_url"`
Duration *int32 `json:"duration"`
Frequency int `json:"frequency"`
Frequencies []int `json:"frequencies"`
Patches []int `json:"patches"`
TgLabel *string `json:"tg_label"`
TgAlphaTag *string `json:"tg_alpha_tag"`
TgGroup *string `json:"tg_group"`
Source int `json:"source"`
}
func (q *Queries) AddCall(ctx context.Context, arg AddCallParams) (uuid.UUID, error) {

View file

@ -13,8 +13,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)
// This file will eventually turn into a postgres driver.
// DB is a database handle.
type DB struct {
*pgxpool.Pool
@ -40,7 +38,12 @@ func NewClient(ctx context.Context, conf config.DB) (*DB, error) {
m.Close()
pool, err := pgxpool.New(ctx, conf.Connect)
pgConf, err := pgxpool.ParseConfig(conf.Connect)
if err != nil {
return nil, err
}
pool, err := pgxpool.NewWithConfig(ctx, pgConf)
if err != nil {
return nil, err
}

View file

@ -11,6 +11,17 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
type Alert struct {
ID int32 `json:"id"`
Time pgtype.Timestamptz `json:"time"`
Talkgroup int64 `json:"talkgroup"`
SystemID int32 `json:"system_id"`
Tgid int32 `json:"tgid"`
Weight *float32 `json:"weight"`
Score *float32 `json:"score"`
Metadata []byte `json:"metadata"`
}
type ApiKey struct {
ID int32 `json:"id"`
Owner int `json:"owner"`
@ -21,24 +32,24 @@ type ApiKey struct {
}
type Call struct {
ID uuid.UUID `json:"id"`
Submitter *int32 `json:"submitter"`
System int `json:"system"`
Talkgroup int `json:"talkgroup"`
CallDate time.Time `json:"call_date"`
AudioName *string `json:"audio_name"`
AudioBlob []byte `json:"audio_blob"`
Duration *int32 `json:"duration"`
AudioType *string `json:"audio_type"`
AudioUrl *string `json:"audio_url"`
Frequency int `json:"frequency"`
Frequencies []int `json:"frequencies"`
Patches []int `json:"patches"`
TgLabel *string `json:"tg_label"`
TgAlphaTag *string `json:"tg_alpha_tag"`
TgGroup *string `json:"tg_group"`
Source int `json:"source"`
Transcript *string `json:"transcript"`
ID uuid.UUID `json:"id"`
Submitter *int32 `json:"submitter"`
System int `json:"system"`
Talkgroup int `json:"talkgroup"`
CallDate pgtype.Timestamptz `json:"call_date"`
AudioName *string `json:"audio_name"`
AudioBlob []byte `json:"audio_blob"`
Duration *int32 `json:"duration"`
AudioType *string `json:"audio_type"`
AudioUrl *string `json:"audio_url"`
Frequency int `json:"frequency"`
Frequencies []int `json:"frequencies"`
Patches []int `json:"patches"`
TgLabel *string `json:"tg_label"`
TgAlphaTag *string `json:"tg_alpha_tag"`
TgGroup *string `json:"tg_group"`
Source int `json:"source"`
Transcript *string `json:"transcript"`
}
type Incident struct {
@ -78,6 +89,8 @@ type Talkgroup struct {
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Notify bool `json:"notify"`
Weight float32 `json:"weight"`
}
type TalkgroupsLearned struct {

View file

@ -20,10 +20,12 @@ type Querier interface {
DeleteUser(ctx context.Context, username string) error
GetAPIKey(ctx context.Context, apiKey string) (ApiKey, error)
GetDatabaseSize(ctx context.Context) (string, error)
GetSystemName(ctx context.Context, systemID int) (string, error)
GetTalkgroup(ctx context.Context, systemID int, tgid int) (Talkgroup, error)
GetTalkgroupIDsByTags(ctx context.Context, anytags []string, alltags []string, nottags []string) ([]GetTalkgroupIDsByTagsRow, error)
GetTalkgroupTags(ctx context.Context, sys int, tg int) ([]string, error)
GetTalkgroupWithLearned(ctx context.Context, systemID int, tgid int) (GetTalkgroupWithLearnedRow, error)
GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error)
GetTalkgroupsWithAllTags(ctx context.Context, tags []string) ([]Talkgroup, error)
GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) ([]Talkgroup, error)
GetUserByID(ctx context.Context, id int32) (User, error)

View file

@ -19,8 +19,19 @@ func (q *Queries) BulkSetTalkgroupTags(ctx context.Context, iD int64, tags []str
return err
}
const getSystemName = `-- name: GetSystemName :one
SELECT name FROM systems WHERE id = $1
`
func (q *Queries) GetSystemName(ctx context.Context, systemID int) (string, error) {
row := q.db.QueryRow(ctx, getSystemName, systemID)
var name string
err := row.Scan(&name)
return name, err
}
const getTalkgroup = `-- name: GetTalkgroup :one
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
WHERE id = systg2id($1, $2)
`
@ -37,6 +48,8 @@ func (q *Queries) GetTalkgroup(ctx context.Context, systemID int, tgid int) (Tal
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Weight,
)
return i, err
}
@ -137,8 +150,64 @@ func (q *Queries) GetTalkgroupWithLearned(ctx context.Context, systemID int, tgi
return i, err
}
const getTalkgroupsByPackedIDs = `-- name: GetTalkgroupsByPackedIDs :many
SELECT tg.id, system_id, tgid, tg.name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight, sys.id, sys.name FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[])
`
type GetTalkgroupsByPackedIDsRow struct {
ID int64 `json:"id"`
SystemID int32 `json:"system_id"`
Tgid int32 `json:"tgid"`
Name *string `json:"name"`
AlphaTag *string `json:"alpha_tag"`
TgGroup *string `json:"tg_group"`
Frequency *int32 `json:"frequency"`
Metadata []byte `json:"metadata"`
Tags []string `json:"tags"`
Notify bool `json:"notify"`
Weight float32 `json:"weight"`
ID_2 int `json:"id_2"`
Name_2 string `json:"name_2"`
}
func (q *Queries) GetTalkgroupsByPackedIDs(ctx context.Context, dollar_1 []int64) ([]GetTalkgroupsByPackedIDsRow, error) {
rows, err := q.db.Query(ctx, getTalkgroupsByPackedIDs, dollar_1)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetTalkgroupsByPackedIDsRow
for rows.Next() {
var i GetTalkgroupsByPackedIDsRow
if err := rows.Scan(
&i.ID,
&i.SystemID,
&i.Tgid,
&i.Name,
&i.AlphaTag,
&i.TgGroup,
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Weight,
&i.ID_2,
&i.Name_2,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getTalkgroupsWithAllTags = `-- name: GetTalkgroupsWithAllTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
WHERE tags && ARRAY[$1]
`
@ -161,6 +230,8 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Weight,
); err != nil {
return nil, err
}
@ -173,7 +244,7 @@ func (q *Queries) GetTalkgroupsWithAllTags(ctx context.Context, tags []string) (
}
const getTalkgroupsWithAnyTags = `-- name: GetTalkgroupsWithAnyTags :many
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags FROM talkgroups
SELECT id, system_id, tgid, name, alpha_tag, tg_group, frequency, metadata, tags, notify, weight FROM talkgroups
WHERE tags @> ARRAY[$1]
`
@ -196,6 +267,8 @@ func (q *Queries) GetTalkgroupsWithAnyTags(ctx context.Context, tags []string) (
&i.Frequency,
&i.Metadata,
&i.Tags,
&i.Notify,
&i.Weight,
); err != nil {
return nil, err
}

View file

@ -0,0 +1,89 @@
package notify
import (
"fmt"
stdhttp "net/http"
"time"
"dynatron.me/x/stillbox/pkg/gordio/config"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/http"
)
type Notifier interface {
notify.Notifier
}
type notifier struct {
*notify.Notify
cfg []config.NotifyService
}
func (n *notifier) buildSlackWebhookPayload(cfg config.NotifyService) func(string, string) any {
icon := cfg.GetS("icon", "🚨")
url := cfg.GetS("messageURL", "")
type Attachment struct {
Title string `json:"title"`
Text string `json:"text"`
Fallback string `json:"fallback"`
Footer string `json:"footer"`
TitleLink string `json:"title_link"`
Timestamp int64 `json:"ts"`
}
return func(subject, message string) any {
m := struct {
Username string `json:"username"`
Attachments []Attachment `json:"attachments"`
IconEmoji string `json:"icon_emoji"`
}{
Username: "Stillbox",
Attachments: []Attachment{
{
Title: subject,
Text: message,
TitleLink: url,
Timestamp: time.Now().Unix(),
},
},
IconEmoji: icon,
}
return m
}
}
func (n *notifier) addService(cfg config.NotifyService) error {
switch cfg.Provider {
case "slackwebhook":
hs := http.New()
hs.AddReceivers(&http.Webhook{
ContentType: "application/json",
Header: make(stdhttp.Header),
Method: stdhttp.MethodPost,
URL: cfg.GetS("webhookURL", ""),
BuildPayload: n.buildSlackWebhookPayload(cfg),
})
n.UseServices(hs)
default:
return fmt.Errorf("unknown provider '%s'", cfg.Provider)
}
return nil
}
func New(cfg config.Notify) (Notifier, error) {
n := &notifier{
Notify: notify.NewWithServices(),
}
for _, s := range cfg {
err := n.addService(s)
if err != nil {
return nil, err
}
}
return n, nil
}

View file

@ -0,0 +1,6 @@
//go:build !pprof
// +build !pprof
package server
func (s *Server) installPprof() {}

View file

@ -0,0 +1,17 @@
//go:build pprof
// +build pprof
package server
import (
"net/http/pprof"
)
func (s *Server) installPprof() {
r := s.r
r.HandleFunc("/debug/pprof/", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
r.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

View file

@ -28,11 +28,14 @@ func (s *Server) setupRoutes() {
r := s.r
r.Use(middleware.WithValue(database.DBCTXKeyValue, s.db))
s.installPprof()
r.Group(func(r chi.Router) {
// authenticated routes
r.Use(s.auth.VerifyMiddleware(), s.auth.AuthMiddleware())
s.nex.PrivateRoutes(r)
s.auth.PrivateRoutes(r)
s.alerter.PrivateRoutes(r)
})
r.Group(func(r chi.Router) {

View file

@ -6,10 +6,12 @@ import (
"os"
"time"
"dynatron.me/x/stillbox/pkg/gordio/alerting"
"dynatron.me/x/stillbox/pkg/gordio/auth"
"dynatron.me/x/stillbox/pkg/gordio/config"
"dynatron.me/x/stillbox/pkg/gordio/database"
"dynatron.me/x/stillbox/pkg/gordio/nexus"
"dynatron.me/x/stillbox/pkg/gordio/notify"
"dynatron.me/x/stillbox/pkg/gordio/sinks"
"dynatron.me/x/stillbox/pkg/gordio/sources"
"github.com/go-chi/chi/v5"
@ -21,15 +23,17 @@ import (
const shutdownTimeout = 5 * time.Second
type Server struct {
auth *auth.Auth
conf *config.Config
db *database.DB
r *chi.Mux
sources sources.Sources
sinks sinks.Sinks
nex *nexus.Nexus
logger *Logger
hup chan os.Signal
auth *auth.Auth
conf *config.Config
db *database.DB
r *chi.Mux
sources sources.Sources
sinks sinks.Sinks
nex *nexus.Nexus
logger *Logger
alerter alerting.Alerter
notifier notify.Notifier
hup chan os.Signal
}
func New(ctx context.Context, cfg *config.Config) (*Server, error) {
@ -43,19 +47,35 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
return nil, err
}
ctx = database.CtxWithDB(ctx, db)
r := chi.NewRouter()
authenticator := auth.NewAuthenticator(cfg.Auth)
notifier, err := notify.New(cfg.Notify)
if err != nil {
return nil, err
}
srv := &Server{
auth: authenticator,
conf: cfg,
db: db,
r: r,
nex: nexus.New(),
logger: logger,
auth: authenticator,
conf: cfg,
db: db,
r: r,
nex: nexus.New(),
logger: logger,
alerter: alerting.New(cfg.Alerting, alerting.WithNotifier(notifier)),
notifier: notifier,
}
srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db), true)
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex), false)
if srv.alerter.Enabled() {
srv.sinks.Register("alerting", srv.alerter, false)
}
srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv))
r.Use(middleware.RequestID)
@ -88,6 +108,7 @@ func (s *Server) Go(ctx context.Context) error {
}
go s.nex.Go(ctx)
go s.alerter.Go(ctx)
var err error
go func() {
@ -95,6 +116,8 @@ func (s *Server) Go(ctx context.Context) error {
}()
<-ctx.Done()
s.sinks.Shutdown()
ctxShutdown, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()

View file

@ -8,6 +8,7 @@ import (
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/gordio/database"
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog/log"
)
@ -24,6 +25,7 @@ func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
log.Debug().Str("call", call.String()).Msg("received dontStore call")
return nil
}
dbCall, err := s.db.AddCall(ctx, s.toAddCallParams(call))
if err != nil {
return fmt.Errorf("add call: %w", err)
@ -43,7 +45,7 @@ func (s *DatabaseSink) toAddCallParams(call *calls.Call) database.AddCallParams
Submitter: call.Submitter.Int32Ptr(),
System: call.System,
Talkgroup: call.Talkgroup,
CallDate: call.DateTime,
CallDate: pgtype.Timestamptz{Time: call.DateTime, Valid: true},
AudioName: common.PtrOrNull(call.AudioName),
AudioBlob: call.Audio,
AudioType: common.PtrOrNull(call.AudioType),

View file

@ -40,6 +40,13 @@ func (s *Sinks) Register(name string, toAdd Sink, required bool) {
})
}
func (s *Sinks) Shutdown() {
s.Lock()
defer s.Unlock()
s.sinks = nil
}
func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) error {
s.Lock()
defer s.Unlock()

View file

@ -53,13 +53,13 @@ CREATE TABLE IF NOT EXISTS talkgroups(
tg_group TEXT,
frequency INTEGER,
metadata JSONB,
tags TEXT[] NOT NULL DEFAULT '{}'
tags TEXT[] NOT NULL DEFAULT '{}',
notify BOOLEAN NOT NULL DEFAULT 'true',
weight REAL NOT NULL DEFAULT 1.0
);
CREATE INDEX IF NOT EXISTS talkgroup_id_tags ON talkgroups USING GIN (tags);
CREATE TABLE IF NOT EXISTS talkgroups_learned(
id SERIAL PRIMARY KEY,
system_id INTEGER REFERENCES systems(id) NOT NULL,
@ -70,6 +70,17 @@ CREATE TABLE IF NOT EXISTS talkgroups_learned(
UNIQUE (system_id, tgid, name)
);
CREATE TABLE IF NOT EXISTS alerts(
id SERIAL PRIMARY KEY,
time TIMESTAMPTZ NOT NULL,
talkgroup INT8 REFERENCES talkgroups(id) NOT NULL,
system_id INT4 REFERENCES systems(id) NOT NULL GENERATED ALWAYS AS (talkgroup >> 32) STORED,
tgid INT4 NOT NULL GENERATED ALWAYS AS (talkgroup & x'ffffffff'::BIGINT) STORED,
weight REAL,
score REAL,
metadata JSONB
);
CREATE OR REPLACE FUNCTION learn_talkgroup()
RETURNS TRIGGER AS $$
BEGIN
@ -91,7 +102,7 @@ CREATE TABLE IF NOT EXISTS calls(
submitter INTEGER REFERENCES api_keys(id) ON DELETE SET NULL,
system INTEGER NOT NULL,
talkgroup INTEGER NOT NULL,
call_date TIMESTAMP NOT NULL,
call_date TIMESTAMPTZ NOT NULL,
audio_name TEXT,
audio_blob BYTEA,
duration INTEGER,
@ -107,7 +118,6 @@ CREATE TABLE IF NOT EXISTS calls(
transcript TEXT
);
CREATE OR REPLACE TRIGGER learn_tg AFTER INSERT ON calls
FOR EACH ROW EXECUTE FUNCTION learn_talkgroup();

View file

@ -28,6 +28,11 @@ WHERE id = ANY($1);
SELECT * FROM talkgroups
WHERE id = systg2id(sqlc.arg(system_id), sqlc.arg(tgid));
-- name: GetTalkgroupsByPackedIDs :many
SELECT * FROM talkgroups tg
JOIN systems sys ON tg.system_id = sys.id
WHERE tg.id = ANY($1::INT8[]);
-- name: GetTalkgroupWithLearned :one
SELECT
tg.id, tg.system_id, sys.name system_name, tg.tgid, tg.name,
@ -45,3 +50,6 @@ TRUE learned
FROM talkgroups_learned tgl
JOIN systems sys ON tgl.system_id = sys.id
WHERE tgl.system_id = sqlc.arg(system_id) AND tgl.tgid = sqlc.arg(tgid) AND ignored IS NOT TRUE;
-- name: GetSystemName :one
SELECT name FROM systems WHERE id = sqlc.arg(system_id);