diff --git a/pkg/gordio/auth/jwt.go b/pkg/gordio/auth/jwt.go index c5c86f3..71eaf47 100644 --- a/pkg/gordio/auth/jwt.go +++ b/pkg/gordio/auth/jwt.go @@ -115,13 +115,17 @@ func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusUnauthorized) return } - http.SetCookie(w, &http.Cookie{ + cookie := &http.Cookie{ Name: "jwt", Value: tok, HttpOnly: true, Secure: !a.allowInsecureCookie(r), - Domain: a.cfg.Domain, - }) + } + + if cookie.Secure { + cookie.Domain = a.cfg.Domain + } + http.SetCookie(w, cookie) jr := struct { JWT string `json:"jwt"` diff --git a/pkg/gordio/nexus/client.go b/pkg/gordio/nexus/client.go index 177d4ef..f79949b 100644 --- a/pkg/gordio/nexus/client.go +++ b/pkg/gordio/nexus/client.go @@ -32,7 +32,7 @@ type Connection interface { io.Closer CloseCh() - Send(*pb.Message) + Send(*pb.Message) (closed bool) } func (n *Nexus) NewClient(conn Connection) Client { diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 294c1fc..75e8b6a 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -44,7 +44,7 @@ func (n *Nexus) Go(done <-chan struct{}) { return } - go n.broadcastCallToClients(call) + n.broadcastCallToClients(call) case <-done: return } @@ -56,16 +56,17 @@ func (n *Nexus) BroadcastCall(call *calls.Call) { } func (n *Nexus) broadcastCallToClients(call *calls.Call) { - log.Info().Msg("broadcast") message := &pb.Message{ ToClientMessage: &pb.Message_Call{Call: call.ToPB()}, } - n.RLock() - defer n.RUnlock() + n.Lock() + defer n.Unlock() for cl, _ := range n.clients { - log.Info().Msg("client") - cl.Send(message) + if cl.Send(message) { + // we already hold the lock, and the channel is closed anyway + delete(n.clients, cl) + } } } diff --git a/pkg/gordio/nexus/websocket.go b/pkg/gordio/nexus/websocket.go index cd0ba6d..21c344b 100644 --- a/pkg/gordio/nexus/websocket.go +++ b/pkg/gordio/nexus/websocket.go @@ -39,8 +39,15 @@ type wsConn struct { out chan *pb.Message } -func (w *wsConn) Send(msg *pb.Message) { - w.out <- msg +func (w *wsConn) Send(msg *pb.Message) (closed bool) { + select { + case w.out <- msg: + default: + close(w.out) + return true + } + + return false } func newWsConn(c *websocket.Conn) *wsConn { @@ -79,7 +86,7 @@ func (conn *wsConn) readPump(reg Registry, c Client) { defer func() { reg.Unregister(c) conn.Close() - log.Info().Msg("readpump exiting") + conn.CloseCh() }() conn.SetReadLimit(maxMessageSize) @@ -107,7 +114,6 @@ func (conn *wsConn) writePump() { defer func() { pingTicker.Stop() conn.Close() - log.Info().Msg("writepump exiting") }() for {