diff --git a/pkg/gordio/gordio.go b/pkg/gordio/gordio.go index de7837d..528d9b8 100644 --- a/pkg/gordio/gordio.go +++ b/pkg/gordio/gordio.go @@ -1,6 +1,10 @@ package gordio import ( + "context" + "os" + "os/signal" + "dynatron.me/x/stillbox/internal/common" "dynatron.me/x/stillbox/pkg/gordio/config" "dynatron.me/x/stillbox/pkg/gordio/server" @@ -37,15 +41,27 @@ func (o *ServeOptions) Options(_ *cobra.Command, args []string) error { } func (o *ServeOptions) Execute() error { + ctx, cancel := context.WithCancel(context.Background()) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt) + defer func() { + signal.Stop(sig) + cancel() + }() + + go func() { + select { + case <-sig: + cancel() + case <-ctx.Done(): + } + }() + srv, err := server.New(o.cfg) if err != nil { return err } - err = srv.Go() - if err != nil { - return err - } - - return nil + return srv.Go(ctx) } diff --git a/pkg/gordio/nexus/client.go b/pkg/gordio/nexus/client.go index 27026a0..475bb9c 100644 --- a/pkg/gordio/nexus/client.go +++ b/pkg/gordio/nexus/client.go @@ -45,6 +45,8 @@ type Connection interface { io.Closer CloseCh() + Shutdown() + Send(ToClient) (closed bool) } diff --git a/pkg/gordio/nexus/nexus.go b/pkg/gordio/nexus/nexus.go index 134ffe2..99462c6 100644 --- a/pkg/gordio/nexus/nexus.go +++ b/pkg/gordio/nexus/nexus.go @@ -45,6 +45,7 @@ func (n *Nexus) Go(ctx context.Context) { n.broadcastCallToClients(ctx, call) case <-ctx.Done(): + n.Shutdown() return } } @@ -87,3 +88,9 @@ func (n *Nexus) Unregister(c Client) { cl := c.(*client) delete(n.clients, cl) } + +func (n *Nexus) Shutdown() { + for c := range n.clients { + c.Shutdown() + } +} diff --git a/pkg/gordio/nexus/websocket.go b/pkg/gordio/nexus/websocket.go index 60eaabd..bf2e982 100644 --- a/pkg/gordio/nexus/websocket.go +++ b/pkg/gordio/nexus/websocket.go @@ -6,8 +6,6 @@ import ( "net/http" "time" - "dynatron.me/x/stillbox/pkg/gordio/database" - "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" @@ -87,16 +85,16 @@ func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) { cli.Hello(ctx) } +func (conn *wsConn) Shutdown() { + conn.Conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, ""), time.Now().Add(writeWait)) +} + func (conn *wsConn) readPump(ctx context.Context, reg Registry, c Client) { defer func() { reg.Unregister(c) conn.CloseCh() }() - db := database.FromCtx(ctx) - ctx, cancel := context.WithCancel(database.CtxWithDB(context.Background(), db)) - defer cancel() - conn.SetReadLimit(maxMessageSize) err := conn.SetReadDeadline(time.Now().Add(pongWait)) if err != nil { diff --git a/pkg/gordio/server/server.go b/pkg/gordio/server/server.go index 2b7e9cb..3782eb0 100644 --- a/pkg/gordio/server/server.go +++ b/pkg/gordio/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "net/http" + "time" "dynatron.me/x/stillbox/pkg/gordio/auth" "dynatron.me/x/stillbox/pkg/gordio/config" @@ -13,8 +14,11 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" + "github.com/rs/zerolog/log" ) +const shutdownTimeout = 5 * time.Second + type Server struct { auth auth.Authenticator conf *config.Config @@ -68,13 +72,33 @@ func New(cfg *config.Config) (*Server, error) { return srv, nil } -func (s *Server) Go() error { +func (s *Server) Go(ctx context.Context) error { defer s.db.Close() - ctx, cancel := context.WithCancel(database.CtxWithDB(context.Background(), s.db)) - defer cancel() + ctx = database.CtxWithDB(ctx, s.db) + + httpSrv := &http.Server{ + Addr: s.conf.Listen, + Handler: s.r, + } go s.nex.Go(ctx) - return http.ListenAndServe(s.conf.Listen, s.r) + var err error + go func() { + err = httpSrv.ListenAndServe() + }() + <-ctx.Done() + + ctxShutdown, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer cancel() + + if err := httpSrv.Shutdown(ctxShutdown); err != nil { + log.Fatal().Err(err).Msg("shutdown failed") + } + if err == http.ErrServerClosed { + err = nil + } + + return err }