This commit is contained in:
Daniel 2024-08-04 08:41:35 -04:00
parent 95452e4e02
commit aff226940f
6 changed files with 104 additions and 8 deletions

View file

@ -2,6 +2,8 @@ package calls
import (
"dynatron.me/x/stillbox/pkg/gordio/auth"
"dynatron.me/x/stillbox/pkg/pb"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
)
@ -24,3 +26,37 @@ type Call struct {
TalkgroupLabel *string
TalkgroupTag *string
}
func toInt64Slice(s []int) []int64 {
n := make([]int64, len(s))
for i := range s {
n[i] = int64(s[i])
}
return n
}
func toInt32Slice(s []int) []int32 {
n := make([]int32, len(s))
for i := range s {
n[i] = int32(s[i])
}
return n
}
func (c *Call) ToPB() *pb.Call {
return &pb.Call{
AudioName: c.AudioName,
AudioType: c.AudioType,
DateTime: timestamppb.New(c.DateTime),
System: int32(c.System),
Talkgroup: int32(c.Talkgroup),
Source: int32(c.Source),
Frequency: int64(c.Frequency),
Frequencies: toInt64Slice(c.Frequencies),
Patches: toInt32Slice(c.Patches),
Sources: toInt32Slice(c.Sources),
Audio: c.Audio,
}
}

View file

@ -2,6 +2,9 @@ package nexus
import (
"sync"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/pb"
)
type Nexus struct {
@ -10,6 +13,8 @@ type Nexus struct {
clients map[*client]struct{}
*wsManager
callCh chan *calls.Call
}
type Registry interface {
@ -21,6 +26,7 @@ type Registry interface {
func New() *Nexus {
n := &Nexus{
clients: make(map[*client]struct{}),
callCh: make(chan *calls.Call, 256),
}
n.wsManager = newWsManager(n)
@ -28,6 +34,33 @@ func New() *Nexus {
return n
}
func (n *Nexus) Go(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case call, ok := <-n.callCh:
if !ok {
return
}
go n.emitCall(call)
}
}
}
func (n *Nexus) emitCall(call *calls.Call) {
message := &pb.Message{
ToClientMessage: &pb.Message_Call{Call: call.ToPB()},
}
n.RLock()
defer n.RUnlock()
for cl, _ := range n.clients {
cl.Send(message)
}
}
func (n *Nexus) Register(c Client) {
n.Lock()
defer n.Unlock()
@ -39,5 +72,6 @@ func (n *Nexus) Unregister(c Client) {
n.Lock()
defer n.Unlock()
delete(n.clients, c.(*client))
cl := c.(*client)
delete(n.clients, cl)
}

View file

@ -109,7 +109,7 @@ func (conn *wsConn) writePump() {
case msg, ok := <-conn.out:
conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// nexus closed
// nexus closed us
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

View file

@ -39,7 +39,8 @@ func New(cfg *config.Config) (*Server, error) {
nex: nexus.New(),
}
srv.sinks.Register("database", sinks.NewDatabaseSink(db))
srv.sinks.Register("database", sinks.NewDatabaseSink())
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex))
srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv))
r.Use(middleware.RequestID)

View file

@ -12,13 +12,10 @@ import (
)
type DatabaseSink struct {
db *database.DB
}
func NewDatabaseSink(db *database.DB) *DatabaseSink {
return &DatabaseSink{
db: db,
}
func NewDatabaseSink() *DatabaseSink {
return &DatabaseSink{}
}
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {

28
pkg/gordio/sinks/nexus.go Normal file
View file

@ -0,0 +1,28 @@
package sinks
import (
"context"
"dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/gordio/nexus"
)
type NexusSink struct {
nexus *nexus.Nexus
}
func NewNexusSink(nexus *nexus.Nexus) *NexusSink {
ns := &NexusSink{
nexus: nexus,
}
return ns
}
func (ns *NexusSink) SinkType() string {
return "nexus"
}
func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error {
return nil
}