This commit is contained in:
Daniel 2024-08-04 10:56:46 -04:00
parent 59fb6e047d
commit bfd6a8545b
8 changed files with 40 additions and 17 deletions

View file

@ -95,7 +95,7 @@ func (a *authenticator) PublicRoutes(r chi.Router) {
func (a *authenticator) allowInsecureCookie(r *http.Request) bool { func (a *authenticator) allowInsecureCookie(r *http.Request) bool {
v, has := a.cfg.AllowInsecure[r.Host] v, has := a.cfg.AllowInsecure[r.Host]
return has && v == true return has && v
} }
func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) { func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) {
@ -119,7 +119,7 @@ func (a *authenticator) routeAuth(w http.ResponseWriter, r *http.Request) {
Name: "jwt", Name: "jwt",
Value: tok, Value: tok,
HttpOnly: true, HttpOnly: true,
Secure: a.allowInsecureCookie(r), Secure: !a.allowInsecureCookie(r),
Domain: a.cfg.Domain, Domain: a.cfg.Domain,
}) })

View file

@ -30,6 +30,7 @@ type client struct {
type Connection interface { type Connection interface {
io.Closer io.Closer
CloseCh()
Send(*pb.Message) Send(*pb.Message)
} }

View file

@ -5,6 +5,8 @@ import (
"dynatron.me/x/stillbox/pkg/gordio/calls" "dynatron.me/x/stillbox/pkg/gordio/calls"
"dynatron.me/x/stillbox/pkg/pb" "dynatron.me/x/stillbox/pkg/pb"
"github.com/rs/zerolog/log"
) )
type Nexus struct { type Nexus struct {
@ -26,7 +28,7 @@ type Registry interface {
func New() *Nexus { func New() *Nexus {
n := &Nexus{ n := &Nexus{
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
callCh: make(chan *calls.Call, 256), callCh: make(chan *calls.Call),
} }
n.wsManager = newWsManager(n) n.wsManager = newWsManager(n)
@ -34,9 +36,7 @@ func New() *Nexus {
return n return n
} }
func (n *Nexus) Go(wg *sync.WaitGroup) { func (n *Nexus) Go(done <-chan struct{}) {
defer wg.Done()
for { for {
select { select {
case call, ok := <-n.callCh: case call, ok := <-n.callCh:
@ -44,12 +44,19 @@ func (n *Nexus) Go(wg *sync.WaitGroup) {
return return
} }
go n.emitCall(call) go n.broadcastCallToClients(call)
case <-done:
return
} }
} }
} }
func (n *Nexus) emitCall(call *calls.Call) { func (n *Nexus) BroadcastCall(call *calls.Call) {
n.callCh <- call
}
func (n *Nexus) broadcastCallToClients(call *calls.Call) {
log.Info().Msg("broadcast")
message := &pb.Message{ message := &pb.Message{
ToClientMessage: &pb.Message_Call{Call: call.ToPB()}, ToClientMessage: &pb.Message_Call{Call: call.ToPB()},
} }
@ -57,6 +64,7 @@ func (n *Nexus) emitCall(call *calls.Call) {
defer n.RUnlock() defer n.RUnlock()
for cl, _ := range n.clients { for cl, _ := range n.clients {
log.Info().Msg("client")
cl.Send(message) cl.Send(message)
} }
} }
@ -73,5 +81,6 @@ func (n *Nexus) Unregister(c Client) {
defer n.Unlock() defer n.Unlock()
cl := c.(*client) cl := c.(*client)
cl.Connection.CloseCh()
delete(n.clients, cl) delete(n.clients, cl)
} }

View file

@ -46,7 +46,7 @@ func (w *wsConn) Send(msg *pb.Message) {
func newWsConn(c *websocket.Conn) *wsConn { func newWsConn(c *websocket.Conn) *wsConn {
return &wsConn{ return &wsConn{
Conn: c, Conn: c,
out: make(chan *pb.Message, qSize), out: make(chan *pb.Message),
} }
} }
@ -55,6 +55,10 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1024, WriteBufferSize: 1024,
} }
func (w *wsConn) CloseCh() {
close(w.out)
}
func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) { func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -75,6 +79,7 @@ func (conn *wsConn) readPump(reg Registry, c Client) {
defer func() { defer func() {
reg.Unregister(c) reg.Unregister(c)
conn.Close() conn.Close()
log.Info().Msg("readpump exiting")
}() }()
conn.SetReadLimit(maxMessageSize) conn.SetReadLimit(maxMessageSize)
@ -102,6 +107,7 @@ func (conn *wsConn) writePump() {
defer func() { defer func() {
pingTicker.Stop() pingTicker.Stop()
conn.Close() conn.Close()
log.Info().Msg("writepump exiting")
}() }()
for { for {

View file

@ -39,7 +39,7 @@ func New(cfg *config.Config) (*Server, error) {
nex: nexus.New(), nex: nexus.New(),
} }
srv.sinks.Register("database", sinks.NewDatabaseSink()) srv.sinks.Register("database", sinks.NewDatabaseSink(srv.db))
srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex)) srv.sinks.Register("nexus", sinks.NewNexusSink(srv.nex))
srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv)) srv.sources.Register("rdio-http", sources.NewRdioHTTP(authenticator, srv))
@ -54,6 +54,12 @@ func New(cfg *config.Config) (*Server, error) {
func (s *Server) Go() error { func (s *Server) Go() error {
defer s.db.Close() defer s.db.Close()
done := make(chan struct{})
defer func() {
close(done)
}()
go s.nex.Go(done)
http.ListenAndServe(s.conf.Listen, s.r) http.ListenAndServe(s.conf.Listen, s.r)
return nil return nil

View file

@ -12,16 +12,15 @@ import (
) )
type DatabaseSink struct { type DatabaseSink struct {
db *database.DB
} }
func NewDatabaseSink() *DatabaseSink { func NewDatabaseSink(db *database.DB) *DatabaseSink {
return &DatabaseSink{} return &DatabaseSink{db: db}
} }
func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error { func (s *DatabaseSink) Call(ctx context.Context, call *calls.Call) error {
db := database.FromCtx(ctx) dbCall, err := s.db.AddCall(ctx, s.toAddCallParams(call))
dbCall, err := db.AddCall(ctx, s.toAddCallParams(call))
if err != nil { if err != nil {
return fmt.Errorf("add call: %w", err) return fmt.Errorf("add call: %w", err)
} }

View file

@ -24,5 +24,6 @@ func (ns *NexusSink) SinkType() string {
} }
func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error { func (ns *NexusSink) Call(ctx context.Context, call *calls.Call) error {
ns.nexus.BroadcastCall(call)
return nil return nil
} }

View file

@ -2,6 +2,7 @@ package sinks
import ( import (
"context" "context"
"dynatron.me/x/stillbox/pkg/gordio/calls" "dynatron.me/x/stillbox/pkg/gordio/calls"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -27,8 +28,8 @@ func (s *Sinks) Register(name string, toAdd Sink) {
} }
func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) { func (s *Sinks) EmitCall(ctx context.Context, call *calls.Call) {
for _, sink := range *s { for i := range *s {
go sink.emitCallLogErr(ctx, call) go (*s)[i].emitCallLogErr(ctx, call)
} }
} }