diff --git a/pkg/gordio/auth/jwt.go b/pkg/gordio/auth/jwt.go index 306cbe8..c5c86f3 100644 --- a/pkg/gordio/auth/jwt.go +++ b/pkg/gordio/auth/jwt.go @@ -95,7 +95,7 @@ func (a *authenticator) PublicRoutes(r chi.Router) { func (a *authenticator) allowInsecureCookie(r *http.Request) bool { v, has := a.cfg.AllowInsecure[r.Host] - return has && v == true + return has && v } func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) { @@ -119,7 +119,7 @@ func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) { Name: "jwt", Value: tok, HttpOnly: true, - Secure: a.allowInsecureCookie(r), + Secure: !a.allowInsecureCookie(r), Domain: a.cfg.Domain, }) diff --git a/pkg/gordio/nexus/client.go b/pkg/gordio/nexus/client.go index 2d899cc..177d4ef 100644 --- a/pkg/gordio/nexus/client.go +++ b/pkg/gordio/nexus/client.go @@ -30,6 +30,7 @@ type client struct { type Connection interface { io.Closer + CloseCh() Send(*pb.Message) } diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 501ad2a..294c1fc 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -5,6 +5,8 @@ import ( "dynatron.me/x/stillbox/pkg/gordio/calls" "dynatron.me/x/stillbox/pkg/pb" + + "github.com/rs/zerolog/log" ) type Nexus struct { @@ -26,7 +28,7 @@ type Registry interface { func New() *Nexus { n := &Nexus{ clients: make(map[*client]struct{}), - callCh: make(chan *calls.Call, 256), + callCh: make(chan *calls.Call), } n.wsManager = newWsManager(n) @@ -34,9 +36,7 @@ func New() *Nexus { return n } -func (n *Nexus) Go(wg *sync.WaitGroup) { - defer wg.Done() - +func (n *Nexus) Go(done <-chan struct{}) { for { select { case call, ok := <-n.callCh: @@ -44,12 +44,19 @@ func (n *Nexus) Go(wg *sync.WaitGroup) { return } - go n.emitCall(call) + go n.broadcastCallToClients(call) + case <-done: + return } } } -func (n *Nexus) emitCall(call *calls.Call) { +func (n *Nexus) BroadcastCall(call *calls.Call) { + n.callCh <- call +} + +func (n *Nexus) broadcastCallToClients(call *calls.Call) { + log.Info().Msg("broadcast") message := &pb.Message{ ToClientMessage: &pb.Message_Call{Call: call.ToPB()}, } @@ -57,6 +64,7 @@ func (n *Nexus) emitCall(call *calls.Call) { defer n.RUnlock() for cl, _ := range n.clients { + log.Info().Msg("client") cl.Send(message) } } @@ -73,5 +81,6 @@ func (n *Nexus) Unregister(c Client) { defer n.Unlock() cl := c.(*client) + cl.Connection.CloseCh() delete(n.clients, cl) } diff --git a/pkg/gordio/nexus/websocket.go b/pkg/gordio/nexus/websocket.go index 55f520e..cd0ba6d 100644 --- a/pkg/gordio/nexus/websocket.go +++ b/pkg/gordio/nexus/websocket.go @@ -46,7 +46,7 @@ func (w *wsConn) Send(msg *pb.Message) { func newWsConn(c *websocket.Conn) *wsConn { return &wsConn{ Conn: c, - out: make(chan *pb.Message, qSize), + out: make(chan *pb.Message), } } @@ -55,6 +55,10 @@ var upgrader = websocket.Upgrader{ WriteBufferSize: 1024, } +func (w *wsConn) CloseCh() { + close(w.out) +} + func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -75,6 +79,7 @@ func (conn *wsConn) readPump(reg Registry, c Client) { defer func() { reg.Unregister(c) conn.Close() + log.Info().Msg("readpump exiting") }() conn.SetReadLimit(maxMessageSize) @@ -102,6 +107,7 @@ func (conn *wsConn) writePump() { defer func() { pingTicker.Stop() conn.Close() + log.Info().Msg("writepump exiting") }() for { diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index ed95bdf..e6b394c 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -39,7 +39,7 @@ func New(cfg *config.Config) (*Server, error) { nex: nexus.New(), } - srv.sinks.Register("database", sinks.NewDatabaseSink()) + srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db)) srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex)) srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) @@ -54,6 +54,12 @@ func New(cfg *config.Config) (*Server, error) { func (s *Server) Go() error { defer s.db.Close() + done := make(chan struct{}) + + defer func() { + close(done) + }() + go s.nex.Go(done) http.ListenAndServe(s.conf.Listen, s.r) return nil diff --git a/pkg/gordio/sinks/database.go b/pkg/gordio/sinks/database.go index 85e8565..5cf77bc 100644 --- a/pkg/gordio/sinks/database.go +++ b/pkg/gordio/sinks/database.go @@ -12,16 +12,15 @@ import ( ) type DatabaseSink struct { + db *database.DB } -func NewDatabaseSink() *DatabaseSink { - return &DatabaseSink{} +func NewDatabaseSink(db *database.DB) *DatabaseSink { + return &DatabaseSink{db: db} } func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { - db := database.FromCtx(ctx) - - dbCall, err := db.AddCall(ctx, s.toAddCallParams(call)) + dbCall, err := s.db.AddCall(ctx, s.toAddCallParams(call)) if err != nil { return fmt.Errorf("add call: %w", err) } diff --git a/pkg/gordio/sinks/nexus.go b/pkg/gordio/sinks/nexus.go index e7f3a55..789eabd 100644 --- a/pkg/gordio/sinks/nexus.go +++ b/pkg/gordio/sinks/nexus.go @@ -24,5 +24,6 @@ func (ns *NexusSink) SinkType() string { } func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error { + ns.nexus.BroadcastCall(call) return nil } diff --git a/pkg/gordio/sinks/sinks.go b/pkg/gordio/sinks/sinks.go index fb36536..fc78dd4 100644 --- a/pkg/gordio/sinks/sinks.go +++ b/pkg/gordio/sinks/sinks.go @@ -2,6 +2,7 @@ package sinks import ( "context" + "dynatron.me/x/stillbox/pkg/gordio/calls" "github.com/rs/zerolog/log" @@ -27,8 +28,8 @@ func (s *Sinks) Register(name string, toAdd Sink) { } func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) { - for _, sink := range *s { - go sink.emitCallLogErr(ctx, call) + for i := range *s { + go (*s)[i].emitCallLogErr(ctx, call) } }