From 64504ee9d237a9806456a59fb43bb653bc9adfe8 Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Tue, 6 Aug 2024 13:54:15 -0400 Subject: [PATCH] moar --- pkg/calls/filter.go | 66 +++++++++++++++++++++++++++++++------- pkg/gordio/nexus/client.go | 6 ++-- pkg/gordio/nexus/nexus.go | 3 ++ 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/pkg/calls/filter.go b/pkg/calls/filter.go index 097862e..26e478c 100644 --- a/pkg/calls/filter.go +++ b/pkg/calls/filter.go @@ -1,6 +1,12 @@ package calls -type filterQuery struct { +import ( + "cmp" + "encoding/json" + "slices" +) + +type FilterQuery struct { Query string Params []interface{} } @@ -16,25 +22,25 @@ func (t Talkgroup) Pack() int64 { } type Filter struct { - Talkgroups []Talkgroup `json:"talkgroups"` - TalkgroupsNot []Talkgroup `json:"talkgroupsNot"` - TalkgroupTagsAll []string `json:"talkgroupTagsAll"` - TalkgroupTagsAny []string `json:"talkgroupTagsAny"` - TalkgroupTagsNot []string `json:"talkgroupTagsNot"` + Talkgroups []Talkgroup `json:"talkgroups,omitempty"` + TalkgroupsNot []Talkgroup `json:"talkgroupsNot,omitempty"` + TalkgroupTagsAll []string `json:"talkgroupTagsAll,omitempty"` + TalkgroupTagsAny []string `json:"talkgroupTagsAny,omitempty"` + TalkgroupTagsNot []string `json:"talkgroupTagsNot,omitempty"` talkgroups map[Talkgroup]bool talkgroupTagsAll map[string]bool talkgroupTagsAny map[string]bool talkgroupTagsNot map[string]bool - query *filterQuery + query *FilterQuery } func queryParams(s string, p ...any) (string, []any) { return s, p } -func (f *Filter) filterQuery() filterQuery { +func (f *Filter) filterQuery() FilterQuery { var q string var args []interface{} @@ -42,11 +48,11 @@ func (f *Filter) filterQuery() filterQuery { `((talkgroups.id = ANY(?) OR talkgroups.tags @> ARRAY[?]) OR (talkgroups.tags && ARRAY[?])) AND (talkgroups.id != ANY(?) AND NOT talkgroups.tags @> ARRAY[?])`, f.Talkgroups, f.TalkgroupTagsAny, f.TalkgroupTagsAll, f.TalkgroupsNot, f.TalkgroupTagsNot) - return filterQuery{Query: q, Params: args} + return FilterQuery{Query: q, Params: args} } -func (f *Filter) Packed(tg []Talkgroup) []int64 { - s := make([]int64, len(f.Talkgroups)) +func PackedTGs(tg []Talkgroup) []int64 { + s := make([]int64, len(tg)) for i, v := range tg { s[i] = v.Pack() @@ -55,7 +61,7 @@ func (f *Filter) Packed(tg []Talkgroup) []int64 { return s } -func (f *Filter) Compile() *Filter { +func (f *Filter) compile() *Filter { f.talkgroups = make(map[Talkgroup]bool) for _, tg := range f.Talkgroups { f.talkgroups[tg] = true @@ -85,6 +91,42 @@ func (f *Filter) Compile() *Filter { return f } +func (f *Filter) normalize() { + tgSort := func(a, b Talkgroup) int { + if n := cmp.Compare(a.System, b.System); n != 0 { + return n + } + + return cmp.Compare(a.Talkgroup, b.Talkgroup) + } + + slices.SortFunc(f.Talkgroups, tgSort) + slices.SortFunc(f.TalkgroupsNot, tgSort) + slices.SortFunc(f.TalkgroupTagsAll, cmp.Compare) + slices.SortFunc(f.TalkgroupTagsAny, cmp.Compare) + slices.SortFunc(f.TalkgroupTagsNot, cmp.Compare) +} + +func (f *Filter) cacheKey() string { + f.normalize() + buf, err := json.Marshal(f) + if err != nil { + panic(err) + } + + return string(buf) +} + +func (f *Filter) Test(call *Call) bool { + return false +} + type FilterCache struct { cache map[string]Filter } + +func NewFilterCache() *FilterCache { + return &FilterCache{ + cache: make(map[string]Filter), + } +} diff --git a/pkg/gordio/nexus/client.go b/pkg/gordio/nexus/client.go index 4801343..0237f35 100644 --- a/pkg/gordio/nexus/client.go +++ b/pkg/gordio/nexus/client.go @@ -4,6 +4,7 @@ import ( "io" "sync" + "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/live" "dynatron.me/x/stillbox/pkg/pb" @@ -25,8 +26,9 @@ type client struct { Connection - live live.Listener - nexus *Nexus + filter *calls.Filter + live live.Listener + nexus *Nexus } type Connection interface { diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 9a9409e..65350be 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -61,6 +61,9 @@ func (n *Nexus) broadcastCallToClients(call *calls.Call) { defer n.Unlock() for cl, _ := range n.clients { + if cl.filter != nil && !cl.filter.Test(call) { + continue + } if cl.Send(message) { // we already hold the lock, and the channel is closed anyway delete(n.clients, cl)