2024-08-04 00:55:28 -04:00
|
|
|
package nexus
|
|
|
|
|
|
|
|
import (
|
2024-08-06 11:19:30 -04:00
|
|
|
"context"
|
2024-08-04 00:55:28 -04:00
|
|
|
"io"
|
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
2024-08-16 16:47:39 -04:00
|
|
|
"dynatron.me/x/stillbox/pkg/gordio/database"
|
2024-08-04 00:55:28 -04:00
|
|
|
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
maxMessageSize = 1024 * 1024 * 10 // 10MB
|
|
|
|
|
|
|
|
pongWait = 60 * time.Second
|
|
|
|
pingInterval = (pongWait * 9) / 10
|
|
|
|
writeWait = 10 * time.Second
|
|
|
|
|
|
|
|
qSize = 256 // 256 messages
|
|
|
|
)
|
|
|
|
|
|
|
|
type wsManager struct {
|
|
|
|
Registry
|
|
|
|
}
|
|
|
|
|
|
|
|
func newWsManager(r Registry) *wsManager {
|
|
|
|
return &wsManager{
|
|
|
|
Registry: r,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type wsConn struct {
|
|
|
|
*websocket.Conn
|
|
|
|
|
2024-10-19 14:14:15 -04:00
|
|
|
out chan ToClient
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
2024-10-19 14:14:15 -04:00
|
|
|
func (w *wsConn) Send(msg ToClient) (closed bool) {
|
2024-08-04 14:55:15 -04:00
|
|
|
select {
|
|
|
|
case w.out <- msg:
|
|
|
|
default:
|
2024-08-05 10:24:24 -04:00
|
|
|
log.Debug().Str("conn", w.RemoteAddr().String()).Msg("send channel not ready, closing")
|
2024-08-04 14:55:15 -04:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
func newWsConn(c *websocket.Conn) *wsConn {
|
2024-08-04 20:39:03 -04:00
|
|
|
wc := &wsConn{
|
2024-08-04 00:55:28 -04:00
|
|
|
Conn: c,
|
2024-10-19 14:14:15 -04:00
|
|
|
out: make(chan ToClient, qSize),
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
2024-08-04 20:39:03 -04:00
|
|
|
|
|
|
|
return wc
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
|
|
ReadBufferSize: 1024,
|
|
|
|
WriteBufferSize: 1024,
|
|
|
|
}
|
|
|
|
|
2024-08-04 10:56:46 -04:00
|
|
|
func (w *wsConn) CloseCh() {
|
|
|
|
close(w.out)
|
|
|
|
}
|
|
|
|
|
2024-08-04 00:55:28 -04:00
|
|
|
func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
|
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("upgrade failed")
|
|
|
|
http.Error(w, "upgrade failed", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-10-20 12:26:32 -04:00
|
|
|
ctx := r.Context()
|
2024-08-04 20:39:03 -04:00
|
|
|
wsc := newWsConn(conn)
|
|
|
|
cli := wm.NewClient(wsc)
|
2024-08-04 00:55:28 -04:00
|
|
|
wm.Register(cli)
|
|
|
|
|
2024-10-20 12:26:32 -04:00
|
|
|
go wsc.readPump(ctx, wm, cli)
|
2024-08-04 00:55:28 -04:00
|
|
|
go wsc.writePump()
|
2024-10-20 12:26:32 -04:00
|
|
|
cli.Hello(ctx)
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
2024-08-06 11:19:30 -04:00
|
|
|
func (conn *wsConn) readPump(ctx context.Context, reg Registry, c Client) {
|
2024-08-04 00:55:28 -04:00
|
|
|
defer func() {
|
|
|
|
reg.Unregister(c)
|
2024-08-04 14:55:15 -04:00
|
|
|
conn.CloseCh()
|
2024-08-04 00:55:28 -04:00
|
|
|
}()
|
|
|
|
|
2024-08-16 16:47:39 -04:00
|
|
|
db := database.FromCtx(ctx)
|
|
|
|
ctx, cancel := context.WithCancel(database.CtxWithDB(context.Background(), db))
|
|
|
|
defer cancel()
|
|
|
|
|
2024-08-04 00:55:28 -04:00
|
|
|
conn.SetReadLimit(maxMessageSize)
|
2024-08-11 14:48:17 -04:00
|
|
|
err := conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("SetReadDeadline")
|
|
|
|
}
|
2024-08-04 00:55:28 -04:00
|
|
|
conn.SetPongHandler(func(string) error {
|
2024-08-11 14:48:17 -04:00
|
|
|
return conn.SetReadDeadline(time.Now().Add(pongWait))
|
2024-08-04 00:55:28 -04:00
|
|
|
})
|
|
|
|
for {
|
|
|
|
_, message, err := conn.ReadMessage()
|
|
|
|
if err != nil {
|
2024-10-20 22:10:54 -04:00
|
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
|
2024-08-05 10:24:24 -04:00
|
|
|
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("unexpected close")
|
2024-08-04 07:39:52 -04:00
|
|
|
return
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
2024-10-21 12:24:10 -04:00
|
|
|
log.Debug().Str("conn", conn.RemoteAddr().String()).Msg("closing connection")
|
2024-08-04 00:55:28 -04:00
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2024-08-06 11:19:30 -04:00
|
|
|
go c.HandleMessage(ctx, message)
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (conn *wsConn) writePump() {
|
|
|
|
pingTicker := time.NewTicker(pingInterval)
|
|
|
|
defer func() {
|
|
|
|
pingTicker.Stop()
|
|
|
|
conn.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg, ok := <-conn.out:
|
2024-08-11 14:48:17 -04:00
|
|
|
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("SetWriteDeadline")
|
|
|
|
}
|
2024-10-21 12:24:10 -04:00
|
|
|
if !ok { // channel is closed
|
2024-08-04 00:55:28 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
w, err := conn.NextWriter(websocket.BinaryMessage)
|
|
|
|
if err != nil {
|
2024-08-05 10:24:24 -04:00
|
|
|
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("nextWriter error")
|
2024-08-04 00:55:28 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-10-19 14:14:15 -04:00
|
|
|
conn.writeToClient(w, msg)
|
2024-08-04 00:55:28 -04:00
|
|
|
|
|
|
|
if err := w.Close(); err != nil {
|
2024-08-05 10:24:24 -04:00
|
|
|
log.Debug().Err(err).Str("conn", conn.RemoteAddr().String()).Msg("close error")
|
2024-08-04 00:55:28 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
case <-pingTicker.C:
|
2024-08-11 14:48:17 -04:00
|
|
|
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("SetWriteDeadline")
|
|
|
|
}
|
2024-08-04 00:55:28 -04:00
|
|
|
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
2024-08-05 10:24:24 -04:00
|
|
|
log.Debug().Err(err).Msg("x ping failed")
|
2024-08-04 00:55:28 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-10-19 14:14:15 -04:00
|
|
|
func (conn *wsConn) writeToClient(w io.WriteCloser, msg ToClient) {
|
|
|
|
packWrite := func(msg ToClient) {
|
2024-08-04 00:55:28 -04:00
|
|
|
packed, err := proto.Marshal(msg)
|
|
|
|
if err != nil {
|
|
|
|
log.Error().Err(err).Msg("pack message")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-08-11 14:48:17 -04:00
|
|
|
_, _ = w.Write(packed)
|
2024-08-04 00:55:28 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
packWrite(msg)
|
|
|
|
|
|
|
|
// add queued messages to current payload
|
|
|
|
nQ := len(conn.out)
|
|
|
|
for i := 0; i < nQ; i++ {
|
|
|
|
packWrite(<-conn.out)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-08-04 07:39:52 -04:00
|
|
|
func (n *wsManager) PrivateRoutes(r chi.Router) {
|
2024-08-04 00:55:28 -04:00
|
|
|
r.HandleFunc("/ws", n.serveWS)
|
|
|
|
}
|