From d9aaac3b0cda6d7012625ee8256eeee20ba10e1f Mon Sep 17 00:00:00 2001 From: Daniel Ponte Date: Mon, 21 Oct 2024 22:19:31 -0400 Subject: [PATCH] Safety improvements --- pkg/gordio/nexus/client.go | 7 ++++++- pkg/gordio/nexus/nexus.go | 13 ++++++++++++- pkg/gordio/nexus/websocket.go | 6 +++--- pkg/gordio/sinks/sinks.go | 19 +++++++++++++++---- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pkg/gordio/nexus/client.go b/pkg/gordio/nexus/client.go index 475bb9c..63e1e18 100644 --- a/pkg/gordio/nexus/client.go +++ b/pkg/gordio/nexus/client.go @@ -2,6 +2,7 @@ package nexus import ( "context" + "errors" "io" "runtime" "sync" @@ -41,13 +42,17 @@ type ToClient interface { protoreflect.ProtoMessage } +var ( + ErrSentToClosed = errors.New("sent to closed connection") +) + type Connection interface { io.Closer CloseCh() Shutdown() - Send(ToClient) (closed bool) + Send(ToClient) error } func (n *Nexus) NewClient(conn Connection) Client { diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 99462c6..14d52ad 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -6,6 +6,8 @@ import ( "dynatron.me/x/stillbox/pkg/calls" "dynatron.me/x/stillbox/pkg/pb" + + "github.com/rs/zerolog/log" ) type Nexus struct { @@ -67,9 +69,13 @@ func (n *Nexus) broadcastCallToClients(ctx context.Context, call *calls.Call) { continue } - if cl.Send(message) { + switch err := cl.Send(message); err { + case ErrSentToClosed: // we already hold the lock, and the channel is closed anyway delete(n.clients, cl) + case nil: + default: + log.Error().Err(err).Msg("broadcast send failed") } } } @@ -90,6 +96,11 @@ func (n *Nexus) Unregister(c Client) { } func (n *Nexus) Shutdown() { + n.Lock() + defer n.Unlock() + + close(n.callCh) + for c := range n.clients { c.Shutdown() } diff --git a/pkg/gordio/nexus/websocket.go b/pkg/gordio/nexus/websocket.go index efcc331..d7b7d5c 100644 --- a/pkg/gordio/nexus/websocket.go +++ b/pkg/gordio/nexus/websocket.go @@ -38,15 +38,15 @@ type wsConn struct { out chan ToClient } -func (w *wsConn) Send(msg ToClient) (closed bool) { +func (w *wsConn) Send(msg ToClient) error { select { case w.out <- msg: default: log.Debug().Str("conn", w.RemoteAddr().String()).Msg("send channel not ready, closing") - return true + return ErrSentToClosed } - return false + return nil } func newWsConn(c *websocket.Conn) *wsConn { diff --git a/pkg/gordio/sinks/sinks.go b/pkg/gordio/sinks/sinks.go index eb0a8ad..7495370 100644 --- a/pkg/gordio/sinks/sinks.go +++ b/pkg/gordio/sinks/sinks.go @@ -2,6 +2,8 @@ package sinks import ( "context" + "sync" + "golang.org/x/sync/errgroup" "dynatron.me/x/stillbox/pkg/calls" @@ -22,10 +24,16 @@ type sinkInstance struct { Required bool } -type Sinks []sinkInstance +type Sinks struct { + sync.RWMutex + sinks []sinkInstance +} func (s *Sinks) Register(name string, toAdd Sink, required bool) { - *s = append(*s, sinkInstance{ + s.Lock() + defer s.Unlock() + + s.sinks = append(s.sinks, sinkInstance{ Name: name, Sink: toAdd, Required: required, @@ -33,9 +41,12 @@ func (s *Sinks) Register(name string, toAdd Sink, required bool) { } func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) error { + s.Lock() + defer s.Unlock() + g, ctx := errgroup.WithContext(ctx) - for i := range *s { - sink := (*s)[i] + for i := range s.sinks { + sink := s.sinks[i] g.Go(sink.callEmitter(ctx, call)) }