stillbox/pkg/gordio/nexus/websocket.go

179 lines
3.5 KiB
Go
Raw Normal View History

2024-08-04 00:55:28 -04:00
package nexus
import (
"io"
"net/http"
"time"
"dynatron.me/x/stillbox/pkg/pb"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)
const (
maxMessageSize = 1024 * 1024 * 10 // 10MB
pongWait = 60 * time.Second
pingInterval = (pongWait * 9) / 10
writeWait = 10 * time.Second
qSize = 256 // 256 messages
)
type wsManager struct {
Registry
}
func newWsManager(r Registry) *wsManager {
return &wsManager{
Registry: r,
}
}
type wsConn struct {
*websocket.Conn
out chan *pb.Message
}
2024-08-04 14:55:15 -04:00
func (w *wsConn) Send(msg *pb.Message) (closed bool) {
select {
case w.out <- msg:
default:
2024-08-05 10:24:24 -04:00
log.Debug().Str("conn", w.RemoteAddr().String()).Msg("send channel not ready, closing")
2024-08-04 14:55:15 -04:00
close(w.out)
return true
}
return false
2024-08-04 00:55:28 -04:00
}
func newWsConn(c *websocket.Conn) *wsConn {
2024-08-04 20:39:03 -04:00
wc := &wsConn{
2024-08-04 00:55:28 -04:00
Conn: c,
2024-08-04 20:39:03 -04:00
out: make(chan *pb.Message, qSize),
2024-08-04 00:55:28 -04:00
}
2024-08-04 20:39:03 -04:00
return wc
2024-08-04 00:55:28 -04:00
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
2024-08-04 10:56:46 -04:00
func (w *wsConn) CloseCh() {
close(w.out)
}
2024-08-04 00:55:28 -04:00
func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error().Err(err).Msg("upgrade failed")
http.Error(w, "upgrade failed", http.StatusInternalServerError)
return
}
2024-08-04 20:39:03 -04:00
wsc := newWsConn(conn)
cli := wm.NewClient(wsc)
2024-08-04 00:55:28 -04:00
wm.Register(cli)
go wsc.readPump(wm, cli)
go wsc.writePump()
}
func (conn *wsConn) readPump(reg Registry, c Client) {
defer func() {
reg.Unregister(c)
conn.Close()
2024-08-04 14:55:15 -04:00
conn.CloseCh()
2024-08-04 00:55:28 -04:00
}()
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("unexpected close")
2024-08-04 07:39:52 -04:00
return
2024-08-04 00:55:28 -04:00
}
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("closing connection")
2024-08-04 00:55:28 -04:00
break
}
go c.HandleMessage(message)
}
}
func (conn *wsConn) writePump() {
pingTicker := time.NewTicker(pingInterval)
defer func() {
pingTicker.Stop()
conn.Close()
}()
for {
select {
case msg, ok := <-conn.out:
conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
2024-08-04 08:41:35 -04:00
// nexus closed us
2024-08-04 00:55:28 -04:00
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("nextWriter error")
2024-08-04 00:55:28 -04:00
return
}
conn.writeMessage(w, msg)
if err := w.Close(); err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("close error")
2024-08-04 00:55:28 -04:00
return
}
case <-pingTicker.C:
conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Msg("x ping failed")
2024-08-04 00:55:28 -04:00
return
}
}
}
}
func (conn *wsConn) writeMessage(w io.WriteCloser, msg *pb.Message) {
packWrite := func(msg *pb.Message) {
packed, err := proto.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("pack message")
return
}
w.Write(packed)
}
packWrite(msg)
// add queued messages to current payload
nQ := len(conn.out)
for i := 0; i < nQ; i++ {
packWrite(<-conn.out)
}
}
2024-08-04 07:39:52 -04:00
func (n *wsManager) PrivateRoutes(r chi.Router) {
2024-08-04 00:55:28 -04:00
r.HandleFunc("/ws", n.serveWS)
}