diff --git a/pkg/gordio/calls/call.go b/pkg/gordio/calls/call.go index 4eb3ae2..9f37ae2 100644 --- a/pkg/gordio/calls/call.go +++ b/pkg/gordio/calls/call.go @@ -2,6 +2,8 @@ package calls import ( "dynatron.me/x/stillbox/pkg/gordio/auth" + "dynatron.me/x/stillbox/pkg/pb" + "google.golang.org/protobuf/types/known/timestamppb" "time" ) @@ -24,3 +26,37 @@ type Call struct { TalkgroupLabel *string TalkgroupTag *string } + +func toInt64Slice(s []int) []int64 { + n := make([]int64, len(s)) + for i := range s { + n[i] = int64(s[i]) + } + + return n +} + +func toInt32Slice(s []int) []int32 { + n := make([]int32, len(s)) + for i := range s { + n[i] = int32(s[i]) + } + + return n +} + +func (c *Call) ToPB() *pb.Call { + return &pb.Call{ + AudioName: c.AudioName, + AudioType: c.AudioType, + DateTime: timestamppb.New(c.DateTime), + System: int32(c.System), + Talkgroup: int32(c.Talkgroup), + Source: int32(c.Source), + Frequency: int64(c.Frequency), + Frequencies: toInt64Slice(c.Frequencies), + Patches: toInt32Slice(c.Patches), + Sources: toInt32Slice(c.Sources), + Audio: c.Audio, + } +} diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 03175f2..501ad2a 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -2,6 +2,9 @@ package nexus import ( "sync" + + "dynatron.me/x/stillbox/pkg/gordio/calls" + "dynatron.me/x/stillbox/pkg/pb" ) type Nexus struct { @@ -10,6 +13,8 @@ type Nexus struct { clients map[*client]struct{} *wsManager + + callCh chan *calls.Call } type Registry interface { @@ -21,6 +26,7 @@ type Registry interface { func New() *Nexus { n := &Nexus{ clients: make(map[*client]struct{}), + callCh: make(chan *calls.Call, 256), } n.wsManager = newWsManager(n) @@ -28,6 +34,33 @@ func New() *Nexus { return n } +func (n *Nexus) Go(wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case call, ok := <-n.callCh: + if !ok { + return + } + + go n.emitCall(call) + } + } +} + +func (n *Nexus) emitCall(call *calls.Call) { + message := &pb.Message{ + ToClientMessage: &pb.Message_Call{Call: call.ToPB()}, + } + n.RLock() + defer n.RUnlock() + + for cl, _ := range n.clients { + cl.Send(message) + } +} + func (n *Nexus) Register(c Client) { n.Lock() defer n.Unlock() @@ -39,5 +72,6 @@ func (n *Nexus) Unregister(c Client) { n.Lock() defer n.Unlock() - delete(n.clients, c.(*client)) + cl := c.(*client) + delete(n.clients, cl) } diff --git a/pkg/gordio/nexus/websocket.go b/pkg/gordio/nexus/websocket.go index f33c632..55f520e 100644 --- a/pkg/gordio/nexus/websocket.go +++ b/pkg/gordio/nexus/websocket.go @@ -109,7 +109,7 @@ func (conn *wsConn) writePump() { case msg, ok := <-conn.out: conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { - // nexus closed + // nexus closed us conn.WriteMessage(websocket.CloseMessage, []byte{}) return } diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 0f4a4af..d36ec00 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -39,7 +39,8 @@ func New(cfg *config.Config) (*Server, error) { nex: nexus.New(), } - srv.sinks.Register("database", sinks.NewDatabaseSink(db)) + srv.sinks.Register("database", sinks.NewDatabaseSink()) + srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex)) srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) r.Use(middleware.RequestID) diff --git a/pkg/gordio/sinks/database.go b/pkg/gordio/sinks/database.go index 389f885..85e8565 100644 --- a/pkg/gordio/sinks/database.go +++ b/pkg/gordio/sinks/database.go @@ -12,13 +12,10 @@ import ( ) type DatabaseSink struct { - db *database.DB } -func NewDatabaseSink(db *database.DB) *DatabaseSink { - return &DatabaseSink{ - db: db, - } +func NewDatabaseSink() *DatabaseSink { + return &DatabaseSink{} } func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { diff --git a/pkg/gordio/sinks/nexus.go b/pkg/gordio/sinks/nexus.go new file mode 100644 index 0000000..e7f3a55 --- /dev/null +++ b/pkg/gordio/sinks/nexus.go @@ -0,0 +1,28 @@ +package sinks + +import ( + "context" + + "dynatron.me/x/stillbox/pkg/gordio/calls" + "dynatron.me/x/stillbox/pkg/gordio/nexus" +) + +type NexusSink struct { + nexus *nexus.Nexus +} + +func NewNexusSink(nexus *nexus.Nexus) *NexusSink { + ns := &NexusSink{ + nexus: nexus, + } + + return ns +} + +func (ns *NexusSink) SinkType() string { + return "nexus" +} + +func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error { + return nil +}