This commit is contained in:
Daniel Ponte 2024-08-06 13:54:15 -04:00
parent 93c742895f
commit 64504ee9d2
3 changed files with 61 additions and 14 deletions

View file

@ -1,6 +1,12 @@
package calls package calls
type filterQuery struct { import (
"cmp"
"encoding/json"
"slices"
)
type FilterQuery struct {
Query string Query string
Params []interface{} Params []interface{}
} }
@ -16,25 +22,25 @@ func (t Talkgroup) Pack() int64 {
} }
type Filter struct { type Filter struct {
Talkgroups []Talkgroup `json:"talkgroups"` Talkgroups []Talkgroup `json:"talkgroups,omitempty"`
TalkgroupsNot []Talkgroup `json:"talkgroupsNot"` TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"`
TalkgroupTagsAll []string `json:"talkgroupTagsAll"` TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"`
TalkgroupTagsAny []string `json:"talkgroupTagsAny"` TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"`
TalkgroupTagsNot []string `json:"talkgroupTagsNot"` TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"`
talkgroups map[Talkgroup]bool talkgroups map[Talkgroup]bool
talkgroupTagsAll map[string]bool talkgroupTagsAll map[string]bool
talkgroupTagsAny map[string]bool talkgroupTagsAny map[string]bool
talkgroupTagsNot map[string]bool talkgroupTagsNot map[string]bool
query *filterQuery query *FilterQuery
} }
func queryParams(s string, p ...any) (string, []any) { func queryParams(s string, p ...any) (string, []any) {
return s, p return s, p
} }
func (f *Filter) filterQuery() filterQuery { func (f *Filter) filterQuery() FilterQuery {
var q string var q string
var args []interface{} var args []interface{}
@ -42,11 +48,11 @@ func (f *Filter) filterQuery() filterQuery {
`((talkgroups.id = ANY(?) OR talkgroups.tags @> ARRAY[?]) OR (talkgroups.tags && ARRAY[?])) AND (talkgroups.id != ANY(?) AND NOT talkgroups.tags @> ARRAY[?])`, `((talkgroups.id = ANY(?) OR talkgroups.tags @> ARRAY[?]) OR (talkgroups.tags && ARRAY[?])) AND (talkgroups.id != ANY(?) AND NOT talkgroups.tags @> ARRAY[?])`,
f.Talkgroups, f.TalkgroupTagsAny, f.TalkgroupTagsAll, f.TalkgroupsNot, f.TalkgroupTagsNot) f.Talkgroups, f.TalkgroupTagsAny, f.TalkgroupTagsAll, f.TalkgroupsNot, f.TalkgroupTagsNot)
return filterQuery{Query: q, Params: args} return FilterQuery{Query: q, Params: args}
} }
func (f *Filter) Packed(tg []Talkgroup) []int64 { func PackedTGs(tg []Talkgroup) []int64 {
s := make([]int64, len(f.Talkgroups)) s := make([]int64, len(tg))
for i, v := range tg { for i, v := range tg {
s[i] = v.Pack() s[i] = v.Pack()
@ -55,7 +61,7 @@ func (f *Filter) Packed(tg []Talkgroup) []int64 {
return s return s
} }
func (f *Filter) Compile() *Filter { func (f *Filter) compile() *Filter {
f.talkgroups = make(map[Talkgroup]bool) f.talkgroups = make(map[Talkgroup]bool)
for _, tg := range f.Talkgroups { for _, tg := range f.Talkgroups {
f.talkgroups[tg] = true f.talkgroups[tg] = true
@ -85,6 +91,42 @@ func (f *Filter) Compile() *Filter {
return f return f
} }
func (f *Filter) normalize() {
tgSort := func(a, b Talkgroup) int {
if n := cmp.Compare(a.System, b.System); n != 0 {
return n
}
return cmp.Compare(a.Talkgroup, b.Talkgroup)
}
slices.SortFunc(f.Talkgroups, tgSort)
slices.SortFunc(f.TalkgroupsNot, tgSort)
slices.SortFunc(f.TalkgroupTagsAll, cmp.Compare)
slices.SortFunc(f.TalkgroupTagsAny, cmp.Compare)
slices.SortFunc(f.TalkgroupTagsNot, cmp.Compare)
}
func (f *Filter) cacheKey() string {
f.normalize()
buf, err := json.Marshal(f)
if err != nil {
panic(err)
}
return string(buf)
}
func (f *Filter) Test(call *Call) bool {
return false
}
type FilterCache struct { type FilterCache struct {
cache map[string]Filter cache map[string]Filter
} }
func NewFilterCache() *FilterCache {
return &FilterCache{
cache: make(map[string]Filter),
}
}

View file

@ -4,6 +4,7 @@ import (
"io" "io"
"sync" "sync"
"dynatron.me/x/stillbox/pkg/calls"
"dynatron.me/x/stillbox/pkg/live" "dynatron.me/x/stillbox/pkg/live"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
@ -25,8 +26,9 @@ type client struct {
Connection Connection
live live.Listener filter *calls.Filter
nexus *Nexus live live.Listener
nexus *Nexus
} }
type Connection interface { type Connection interface {

View file

@ -61,6 +61,9 @@ func (n *Nexus) broadcastCallToClients(call *calls.Call) {
defer n.Unlock() defer n.Unlock()
for cl, _ := range n.clients { for cl, _ := range n.clients {
if cl.filter != nil && !cl.filter.Test(call) {
continue
}
if cl.Send(message) { if cl.Send(message) {
// we already hold the lock, and the channel is closed anyway // we already hold the lock, and the channel is closed anyway
delete(n.clients, cl) delete(n.clients, cl)