Graceful shutdown
This commit is contained in:
parent
a5dd8639fc
commit
2cee60c040
5 changed files with 63 additions and 16 deletions
|
@ -1,6 +1,10 @@
|
||||||
package gordio
|
package gordio
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/internal/common"
|
"dynatron.me/x/stillbox/internal/common"
|
||||||
"dynatron.me/x/stillbox/pkg/gordio/config"
|
"dynatron.me/x/stillbox/pkg/gordio/config"
|
||||||
"dynatron.me/x/stillbox/pkg/gordio/server"
|
"dynatron.me/x/stillbox/pkg/gordio/server"
|
||||||
|
@ -37,15 +41,27 @@ func (o *ServeOptions) Options(_ *cobra.Command, args []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *ServeOptions) Execute() error {
|
func (o *ServeOptions) Execute() error {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
sig := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sig, os.Interrupt)
|
||||||
|
defer func() {
|
||||||
|
signal.Stop(sig)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-sig:
|
||||||
|
cancel()
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
srv, err := server.New(o.cfg)
|
srv, err := server.New(o.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = srv.Go()
|
return srv.Go(ctx)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,8 @@ type Connection interface {
|
||||||
io.Closer
|
io.Closer
|
||||||
CloseCh()
|
CloseCh()
|
||||||
|
|
||||||
|
Shutdown()
|
||||||
|
|
||||||
Send(ToClient) (closed bool)
|
Send(ToClient) (closed bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ func (n *Nexus) Go(ctx context.Context) {
|
||||||
|
|
||||||
n.broadcastCallToClients(ctx, call)
|
n.broadcastCallToClients(ctx, call)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
n.Shutdown()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,3 +88,9 @@ func (n *Nexus) Unregister(c Client) {
|
||||||
cl := c.(*client)
|
cl := c.(*client)
|
||||||
delete(n.clients, cl)
|
delete(n.clients, cl)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Nexus) Shutdown() {
|
||||||
|
for c := range n.clients {
|
||||||
|
c.Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,8 +6,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/pkg/gordio/database"
|
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
@ -87,16 +85,16 @@ func (wm *wsManager) serveWS(w http.ResponseWriter, r *http.Request) {
|
||||||
cli.Hello(ctx)
|
cli.Hello(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conn *wsConn) Shutdown() {
|
||||||
|
conn.Conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, ""), time.Now().Add(writeWait))
|
||||||
|
}
|
||||||
|
|
||||||
func (conn *wsConn) readPump(ctx context.Context, reg Registry, c Client) {
|
func (conn *wsConn) readPump(ctx context.Context, reg Registry, c Client) {
|
||||||
defer func() {
|
defer func() {
|
||||||
reg.Unregister(c)
|
reg.Unregister(c)
|
||||||
conn.CloseCh()
|
conn.CloseCh()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
db := database.FromCtx(ctx)
|
|
||||||
ctx, cancel := context.WithCancel(database.CtxWithDB(context.Background(), db))
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
conn.SetReadLimit(maxMessageSize)
|
conn.SetReadLimit(maxMessageSize)
|
||||||
err := conn.SetReadDeadline(time.Now().Add(pongWait))
|
err := conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"dynatron.me/x/stillbox/pkg/gordio/auth"
|
"dynatron.me/x/stillbox/pkg/gordio/auth"
|
||||||
"dynatron.me/x/stillbox/pkg/gordio/config"
|
"dynatron.me/x/stillbox/pkg/gordio/config"
|
||||||
|
@ -13,8 +14,11 @@ import (
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
"github.com/go-chi/cors"
|
"github.com/go-chi/cors"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const shutdownTimeout = 5 * time.Second
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
auth auth.Authenticator
|
auth auth.Authenticator
|
||||||
conf *config.Config
|
conf *config.Config
|
||||||
|
@ -68,13 +72,33 @@ func New(cfg *config.Config) (*Server, error) {
|
||||||
return srv, nil
|
return srv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Go() error {
|
func (s *Server) Go(ctx context.Context) error {
|
||||||
defer s.db.Close()
|
defer s.db.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(database.CtxWithDB(context.Background(), s.db))
|
ctx = database.CtxWithDB(ctx, s.db)
|
||||||
defer cancel()
|
|
||||||
|
httpSrv := &http.Server{
|
||||||
|
Addr: s.conf.Listen,
|
||||||
|
Handler: s.r,
|
||||||
|
}
|
||||||
|
|
||||||
go s.nex.Go(ctx)
|
go s.nex.Go(ctx)
|
||||||
|
|
||||||
return http.ListenAndServe(s.conf.Listen, s.r)
|
var err error
|
||||||
|
go func() {
|
||||||
|
err = httpSrv.ListenAndServe()
|
||||||
|
}()
|
||||||
|
<-ctx.Done()
|
||||||
|
|
||||||
|
ctxShutdown, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := httpSrv.Shutdown(ctxShutdown); err != nil {
|
||||||
|
log.Fatal().Err(err).Msg("shutdown failed")
|
||||||
|
}
|
||||||
|
if err == http.ErrServerClosed {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue