finally fixed

This commit is contained in:
Daniel 2024-08-04 20:39:03 -04:00
parent 0eb2fdd293
commit e822b4b461
3 changed files with 6 additions and 16 deletions

View file

@ -5,8 +5,6 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/calls" "dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
"github.com/rs/zerolog/log"
) )
type Nexus struct { type Nexus struct {
@ -40,12 +38,10 @@ func (n *Nexus) Go(done <-chan struct{}) {
for { for {
select { select {
case call, ok := <-n.callCh: case call, ok := <-n.callCh:
log.Debug().Msg("call received from ch")
if !ok { if !ok {
return return
} }
log.Debug().Msg("broadcasting call")
n.broadcastCallToClients(call) n.broadcastCallToClients(call)
case <-done: case <-done:
return return
@ -55,7 +51,6 @@ func (n *Nexus) Go(done <-chan struct{}) {
func (n *Nexus) BroadcastCall(call *calls.Call) { func (n *Nexus) BroadcastCall(call *calls.Call) {
n.callCh <- call n.callCh <- call
log.Debug().Msg("call sent to ch")
} }
func (n *Nexus) broadcastCallToClients(call *calls.Call) { func (n *Nexus) broadcastCallToClients(call *calls.Call) {
@ -66,9 +61,7 @@ func (n *Nexus) broadcastCallToClients(call *calls.Call) {
defer n.Unlock() defer n.Unlock()
for cl, _ := range n.clients { for cl, _ := range n.clients {
log.Debug().Msg("sending")
if cl.Send(message) { if cl.Send(message) {
log.Debug().Msg("channel was closed")
// we already hold the lock, and the channel is closed anyway // we already hold the lock, and the channel is closed anyway
delete(n.clients, cl) delete(n.clients, cl)
} }

View file

@ -40,10 +40,8 @@ type wsConn struct {
} }
func (w *wsConn) Send(msg *pb.Message) (closed bool) { func (w *wsConn) Send(msg *pb.Message) (closed bool) {
log.Debug().Msg("sending wsc")
select { select {
case w.out <- msg: case w.out <- msg:
log.Debug().Str("msg", msg.String()).Msg("sent wsc")
default: default:
close(w.out) close(w.out)
return true return true
@ -53,10 +51,12 @@ func (w *wsConn) Send(msg *pb.Message) (closed bool) {
} }
func newWsConn(c *websocket.Conn) *wsConn { func newWsConn(c *websocket.Conn) *wsConn {
return &wsConn{ wc := &wsConn{
Conn: c, Conn: c,
out: make(chan *pb.Message), out: make(chan *pb.Message, qSize),
} }
return wc
} }
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
@ -76,10 +76,10 @@ func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
return return
} }
cli := wm.NewClient(newWsConn(conn)) wsc := newWsConn(conn)
cli := wm.NewClient(wsc)
wm.Register(cli) wm.Register(cli)
wsc := newWsConn(conn)
go wsc.readPump(wm, cli) go wsc.readPump(wm, cli)
go wsc.writePump() go wsc.writePump()
} }

View file

@ -5,8 +5,6 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/calls" "dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/gordio/nexus" "dynatron.me/x/stillbox/pkg/gordio/nexus"
"github.com/rs/zerolog/log"
) )
type NexusSink struct { type NexusSink struct {
@ -26,7 +24,6 @@ func (ns *NexusSink) SinkType() string {
} }
func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error { func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error {
log.Debug().Msg("nexus Call()")
ns.nexus.BroadcastCall(call) ns.nexus.BroadcastCall(call)
return nil return nil
} }