stillbox/pkg/nexus/websocket.go

188 lines
3.9 KiB
Go
Raw Normal View History

2024-08-04 00:55:28 -04:00
package nexus
import (
2024-08-06 11:19:30 -04:00
"context"
2024-08-04 00:55:28 -04:00
"io"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)
const (
maxMessageSize = 1024 * 1024 * 10 // 10MB
pongWait = 60 * time.Second
pingInterval = (pongWait * 9) / 10
writeWait = 10 * time.Second
qSize = 256 // 256 messages
)
type wsManager struct {
Registry
}
func newWsManager(r Registry) *wsManager {
return &wsManager{
Registry: r,
}
}
type wsConn struct {
*websocket.Conn
2024-10-19 14:14:15 -04:00
out chan ToClient
2024-08-04 00:55:28 -04:00
}
2024-10-21 22:19:31 -04:00
func (w *wsConn) Send(msg ToClient) error {
2024-08-04 14:55:15 -04:00
select {
case w.out <- msg:
default:
2024-08-05 10:24:24 -04:00
log.Debug().Str("conn", w.RemoteAddr().String()).Msg("send channel not ready, closing")
2024-10-21 22:19:31 -04:00
return ErrSentToClosed
2024-08-04 14:55:15 -04:00
}
2024-10-21 22:19:31 -04:00
return nil
2024-08-04 00:55:28 -04:00
}
func newWsConn(c *websocket.Conn) *wsConn {
2024-08-04 20:39:03 -04:00
wc := &wsConn{
2024-08-04 00:55:28 -04:00
Conn: c,
2024-10-19 14:14:15 -04:00
out: make(chan ToClient, qSize),
2024-08-04 00:55:28 -04:00
}
2024-08-04 20:39:03 -04:00
return wc
2024-08-04 00:55:28 -04:00
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
2024-08-04 10:56:46 -04:00
func (w *wsConn) CloseCh() {
close(w.out)
}
2024-08-04 00:55:28 -04:00
func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error().Err(err).Msg("upgrade failed")
http.Error(w, "upgrade failed", http.StatusInternalServerError)
return
}
2024-10-20 12:26:32 -04:00
ctx := r.Context()
2024-08-04 20:39:03 -04:00
wsc := newWsConn(conn)
cli := wm.NewClient(wsc)
2024-08-04 00:55:28 -04:00
wm.Register(cli)
2024-10-20 12:26:32 -04:00
go wsc.readPump(ctx, wm, cli)
2024-08-04 00:55:28 -04:00
go wsc.writePump()
2024-10-20 12:26:32 -04:00
cli.Hello(ctx)
2024-08-04 00:55:28 -04:00
}
2024-10-21 13:49:20 -04:00
func (conn *wsConn) Shutdown() {
2024-10-31 16:50:08 -04:00
_ = conn.Conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, ""), time.Now().Add(writeWait))
2024-10-21 13:49:20 -04:00
}
2024-08-06 11:19:30 -04:00
func (conn *wsConn) readPump(ctx context.Context, reg Registry, c Client) {
2024-08-04 00:55:28 -04:00
defer func() {
reg.Unregister(c)
2024-08-04 14:55:15 -04:00
conn.CloseCh()
2024-08-04 00:55:28 -04:00
}()
conn.SetReadLimit(maxMessageSize)
2024-08-11 14:48:17 -04:00
err := conn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
log.Error().Err(err).Msg("SetReadDeadline")
}
2024-08-04 00:55:28 -04:00
conn.SetPongHandler(func(string) error {
2024-08-11 14:48:17 -04:00
return conn.SetReadDeadline(time.Now().Add(pongWait))
2024-08-04 00:55:28 -04:00
})
for {
_, message, err := conn.ReadMessage()
if err != nil {
2024-10-20 22:10:54 -04:00
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("unexpected close")
2024-08-04 07:39:52 -04:00
return
2024-08-04 00:55:28 -04:00
}
2024-10-21 12:24:10 -04:00
log.Debug().Str("conn", conn.RemoteAddr().String()).Msg("closing connection")
2024-08-04 00:55:28 -04:00
break
}
2024-10-21 14:04:50 -04:00
go c.HandleMessage(context.WithoutCancel(ctx), message)
2024-08-04 00:55:28 -04:00
}
}
func (conn *wsConn) writePump() {
pingTicker := time.NewTicker(pingInterval)
defer func() {
pingTicker.Stop()
conn.Close()
}()
for {
select {
case msg, ok := <-conn.out:
2024-08-11 14:48:17 -04:00
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
log.Error().Err(err).Msg("SetWriteDeadline")
}
2024-10-21 12:24:10 -04:00
if !ok { // channel is closed
2024-08-04 00:55:28 -04:00
return
}
w, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("nextWriter error")
2024-08-04 00:55:28 -04:00
return
}
2024-10-19 14:14:15 -04:00
conn.writeToClient(w, msg)
2024-08-04 00:55:28 -04:00
if err := w.Close(); err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("close error")
2024-08-04 00:55:28 -04:00
return
}
case <-pingTicker.C:
2024-08-11 14:48:17 -04:00
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
log.Error().Err(err).Msg("SetWriteDeadline")
}
2024-08-04 00:55:28 -04:00
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
2024-08-05 10:24:24 -04:00
log.Debug().Err(err).Msg("x ping failed")
2024-08-04 00:55:28 -04:00
return
}
}
}
}
2024-10-19 14:14:15 -04:00
func (conn *wsConn) writeToClient(w io.WriteCloser, msg ToClient) {
packWrite := func(msg ToClient) {
2024-08-04 00:55:28 -04:00
packed, err := proto.Marshal(msg)
if err != nil {
log.Error().Err(err).Msg("pack message")
return
}
2024-08-11 14:48:17 -04:00
_, _ = w.Write(packed)
2024-08-04 00:55:28 -04:00
}
packWrite(msg)
// add queued messages to current payload
nQ := len(conn.out)
for i := 0; i < nQ; i++ {
packWrite(<-conn.out)
}
}
2024-08-04 07:39:52 -04:00
func (n *wsManager) PrivateRoutes(r chi.Router) {
2024-08-04 00:55:28 -04:00
r.HandleFunc("/ws", n.serveWS)
}