fix channel stuff

This commit is contained in:
Daniel 2024-08-04 14:55:15 -04:00
parent bfd6a8545b
commit 3fb4df9690
4 changed files with 25 additions and 14 deletions

View file

@ -115,13 +115,17 @@ func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusUnauthorized) http.Error(w, err.Error(), http.StatusUnauthorized)
return return
} }
http.SetCookie(w, &http.Cookie{ cookie := &http.Cookie{
Name: "jwt", Name: "jwt",
Value: tok, Value: tok,
HttpOnly: true, HttpOnly: true,
Secure: !a.allowInsecureCookie(r), Secure: !a.allowInsecureCookie(r),
Domain: a.cfg.Domain, }
})
if cookie.Secure {
cookie.Domain = a.cfg.Domain
}
http.SetCookie(w, cookie)
jr := struct { jr := struct {
JWT string `json:"jwt"` JWT string `json:"jwt"`

View file

@ -32,7 +32,7 @@ type Connection interface {
io.Closer io.Closer
CloseCh() CloseCh()
Send(*pb.Message) Send(*pb.Message) (closed bool)
} }
func (n *Nexus) NewClient(conn Connection) Client { func (n *Nexus) NewClient(conn Connection) Client {

View file

@ -44,7 +44,7 @@ func (n *Nexus) Go(done <-chan struct{}) {
return return
} }
go n.broadcastCallToClients(call) n.broadcastCallToClients(call)
case <-done: case <-done:
return return
} }
@ -56,16 +56,17 @@ func (n *Nexus) BroadcastCall(call *calls.Call) {
} }
func (n *Nexus) broadcastCallToClients(call *calls.Call) { func (n *Nexus) broadcastCallToClients(call *calls.Call) {
log.Info().Msg("broadcast")
message := &pb.Message{ message := &pb.Message{
ToClientMessage: &pb.Message_Call{Call: call.ToPB()}, ToClientMessage: &pb.Message_Call{Call: call.ToPB()},
} }
n.RLock() n.Lock()
defer n.RUnlock() defer n.Unlock()
for cl, _ := range n.clients { for cl, _ := range n.clients {
log.Info().Msg("client") if cl.Send(message) {
cl.Send(message) // we already hold the lock, and the channel is closed anyway
delete(n.clients, cl)
}
} }
} }

View file

@ -39,8 +39,15 @@ type wsConn struct {
out chan *pb.Message out chan *pb.Message
} }
func (w *wsConn) Send(msg *pb.Message) { func (w *wsConn) Send(msg *pb.Message) (closed bool) {
w.out <- msg select {
case w.out <- msg:
default:
close(w.out)
return true
}
return false
} }
func newWsConn(c *websocket.Conn) *wsConn { func newWsConn(c *websocket.Conn) *wsConn {
@ -79,7 +86,7 @@ func (conn *wsConn) readPump(reg Registry, c Client) {
defer func() { defer func() {
reg.Unregister(c) reg.Unregister(c)
conn.Close() conn.Close()
log.Info().Msg("readpump exiting") conn.CloseCh()
}() }()
conn.SetReadLimit(maxMessageSize) conn.SetReadLimit(maxMessageSize)
@ -107,7 +114,6 @@ func (conn *wsConn) writePump() {
defer func() { defer func() {
pingTicker.Stop() pingTicker.Stop()
conn.Close() conn.Close()
log.Info().Msg("writepump exiting")
}() }()
for { for {