From b2b06a062bc736219b6dd7cf4318cfc6cd58589c Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Tue, 3 Mar 2026 11:57:58 -1000 Subject: [PATCH] fix: align SSE architecture with portigo for reliable connections - Reorder HandleGameEvents to create NATS subscriptions before SSE - Use chi's middleware.NewWrapResponseWriter for proper http.Flusher support - Add slog-zerolog adapter for unified logging - Add ErrorLog to HTTP server for better error visibility - Change session Cookie.Secure to false for HTTP support - Change heartbeat from 15s to 10s - Remove ConnectionIndicator patching (was causing PatchElementsNoTargetsFound) The key fix was using chi's response writer wrapper which properly implements http.Flusher, allowing SSE data to be flushed immediately instead of being buffered. --- features/c4game/handlers.go | 87 +++++++++++++++++--------------- features/c4game/pages/game.templ | 1 - go.mod | 3 ++ go.sum | 6 +++ logging/middleware.go | 21 +++----- main.go | 22 +++++--- sessions/sessions.go | 2 +- 7 files changed, 79 insertions(+), 63 deletions(-) diff --git a/features/c4game/handlers.go b/features/c4game/handlers.go index 7b153d9..3fa7565 100644 --- a/features/c4game/handlers.go +++ b/features/c4game/handlers.go @@ -16,7 +16,6 @@ import ( "github.com/ryanhamamura/games/connect4" "github.com/ryanhamamura/games/db/repository" "github.com/ryanhamamura/games/features/c4game/pages" - sharedcomponents "github.com/ryanhamamura/games/features/common/components" "github.com/ryanhamamura/games/sessions" ) @@ -95,6 +94,7 @@ func HandleGamePage(store *connect4.Store, sm *scs.SessionManager, queries *repo func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() gameID := chi.URLParam(r, "id") gi, exists := store.Get(gameID) @@ -104,68 +104,75 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag } playerID := sessions.GetPlayerID(sm, r) - myColor := gi.GetPlayerColor(playerID) - sse := datastar.NewSSE(w, r, datastar.WithCompression( - datastar.WithBrotli(datastar.WithBrotliLevel(5)), - )) - - chatCfg := c4ChatConfig(gameID) - room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID) - - patchAll := func() error { - myColor = gi.GetPlayerColor(playerID) - g := gi.GetGame() - return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg)) - } - - sendPing := func() error { - return sse.PatchElementTempl(sharedcomponents.ConnectionIndicator(time.Now().UnixMilli())) - } - - // Send initial render and ping - if err := sendPing(); err != nil { - return - } - if err := patchAll(); err != nil { - return - } - - heartbeat := time.NewTicker(15 * time.Second) - defer heartbeat.Stop() - - // Subscribe to game state updates + // Subscribe to game state updates BEFORE creating SSE gameCh := make(chan *nats.Msg, 64) gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh) if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } defer gameSub.Unsubscribe() //nolint:errcheck - // Subscribe to chat messages + // Subscribe to chat messages BEFORE creating SSE + chatCfg := c4ChatConfig(gameID) + room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID) chatCh, cleanupChat := room.Subscribe() defer cleanupChat() - ctx := r.Context() + // Setup heartbeat BEFORE creating SSE + heartbeat := time.NewTicker(10 * time.Second) + defer heartbeat.Stop() + + // NOW create SSE + sse := datastar.NewSSE(w, r, datastar.WithCompression( + datastar.WithBrotli(datastar.WithBrotliLevel(5)), + )) + + // Define patch function + patchAll := func() error { + myColor := gi.GetPlayerColor(playerID) + g := gi.GetGame() + return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg)) + } + + // Send initial state + if err := patchAll(); err != nil { + return + } + + // Event loop for { select { case <-ctx.Done(): return - case <-heartbeat.C: - if err := sendPing(); err != nil { - return - } + case <-gameCh: + // Drain rapid-fire notifications + drainGame: + for { + select { + case <-gameCh: + default: + break drainGame + } + } if err := patchAll(); err != nil { return } + case chatMsg := <-chatCh: - err := sse.PatchElementTempl( + if err := sse.PatchElementTempl( chatcomponents.ChatMessage(chatMsg, chatCfg), datastar.WithSelectorID("c4-chat-history"), datastar.WithModeAppend(), - ) - if err != nil { + ); err != nil { + return + } + + case <-heartbeat.C: + // Heartbeat just keeps the connection alive by triggering a game state refresh + if err := patchAll(); err != nil { return } } diff --git a/features/c4game/pages/game.templ b/features/c4game/pages/game.templ index 455c91d..c5d9f11 100644 --- a/features/c4game/pages/game.templ +++ b/features/c4game/pages/game.templ @@ -18,7 +18,6 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c data-signals="{chatMsg: ''}" data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) } > - @sharedcomponents.ConnectionIndicator(0) @GameContent(g, myColor, messages, chatCfg) } diff --git a/go.mod b/go.mod index 069eb86..ad1794a 100644 --- a/go.mod +++ b/go.mod @@ -170,6 +170,9 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/riza-io/grpc-go v0.2.0 // indirect github.com/sajari/fuzzy v1.0.0 // indirect + github.com/samber/lo v1.52.0 // indirect + github.com/samber/slog-common v0.20.0 // indirect + github.com/samber/slog-zerolog/v2 v2.9.1 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect diff --git a/go.sum b/go.sum index df2ccbd..150f2e4 100644 --- a/go.sum +++ b/go.sum @@ -565,6 +565,12 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sajari/fuzzy v1.0.0 h1:+FmwVvJErsd0d0hAPlj4CxqxUtQY/fOoY0DwX4ykpRY= github.com/sajari/fuzzy v1.0.0/go.mod h1:OjYR6KxoWOe9+dOlXeiCJd4dIbED4Oo8wpS89o0pwOo= +github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= +github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= +github.com/samber/slog-common v0.20.0 h1:WaLnm/aCvBJSk5nR5aXZTFBaV0B47A+AEaEOiZDeUnc= +github.com/samber/slog-common v0.20.0/go.mod h1:+Ozat1jgnnE59UAlmNX1IF3IByHsODnnwf9jUcBZ+m8= +github.com/samber/slog-zerolog/v2 v2.9.1 h1:RMOq8XqzfuGx1X0TEIlS9OXbbFmqLY2/wJppghz66YY= +github.com/samber/slog-zerolog/v2 v2.9.1/go.mod h1:DQYYve14WgCRN/XnKeHl4266jXK0DgYkYXkfZ4Fp98k= github.com/sebdah/goldie/v2 v2.8.0 h1:dZb9wR8q5++oplmEiJT+U/5KyotVD+HNGCAc5gNr8rc= github.com/sebdah/goldie/v2 v2.8.0/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= diff --git a/logging/middleware.go b/logging/middleware.go index be6c21a..7cdc9e8 100644 --- a/logging/middleware.go +++ b/logging/middleware.go @@ -5,10 +5,11 @@ import ( "net/http" "time" - "github.com/ryanhamamura/games/config" - + "github.com/go-chi/chi/v5/middleware" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "github.com/ryanhamamura/games/config" ) const ( @@ -64,25 +65,15 @@ func colorLatency(d time.Duration, useColor bool) string { } } -type responseWriter struct { - http.ResponseWriter - status int -} - -func (rw *responseWriter) WriteHeader(code int) { - rw.status = code - rw.ResponseWriter.WriteHeader(code) -} - func RequestLogger(logger *zerolog.Logger, env config.Environment) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() - rw := &responseWriter{ResponseWriter: w} + ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) - next.ServeHTTP(rw, r) + next.ServeHTTP(ww, r) - status := rw.status + status := ww.Status() if status == 0 { status = http.StatusOK } diff --git a/main.go b/main.go index 9170eb1..34cde03 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,12 @@ import ( "syscall" "time" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/rs/zerolog/log" + slogzerolog "github.com/samber/slog-zerolog/v2" + "golang.org/x/sync/errgroup" + "github.com/ryanhamamura/games/config" "github.com/ryanhamamura/games/connect4" "github.com/ryanhamamura/games/db" @@ -21,11 +27,6 @@ import ( "github.com/ryanhamamura/games/sessions" "github.com/ryanhamamura/games/snake" "github.com/ryanhamamura/games/version" - - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - "github.com/rs/zerolog/log" - "golang.org/x/sync/errgroup" ) //go:embed assets @@ -36,7 +37,12 @@ func main() { defer cancel() cfg := config.Global - logging.SetupLogger(cfg.Environment, cfg.LogLevel) + zerologLogger := logging.SetupLogger(cfg.Environment, cfg.LogLevel) + slog.SetDefault(slog.New(slogzerolog.Option{ + Level: slogzerolog.ZeroLogLeveler{Logger: zerologLogger}, + Logger: zerologLogger, + NoTimestamp: true, + }.NewZerologHandler())) if err := run(ctx); err != nil && err != http.ErrServerClosed { log.Fatal().Err(err).Msg("server error") @@ -101,6 +107,10 @@ func run(ctx context.Context) error { BaseContext: func(l net.Listener) context.Context { return egctx }, + ErrorLog: slog.NewLogLogger( + slog.Default().Handler(), + slog.LevelError, + ), } eg.Go(func() error { diff --git a/sessions/sessions.go b/sessions/sessions.go index 3880d47..fe8e0e0 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -33,7 +33,7 @@ func SetupSessionManager(db *sql.DB) (*scs.SessionManager, func()) { sessionManager.Cookie.Name = "games_session" sessionManager.Cookie.Path = "/" sessionManager.Cookie.HttpOnly = true - sessionManager.Cookie.Secure = true + sessionManager.Cookie.Secure = false sessionManager.Cookie.SameSite = http.SameSiteLaxMode slog.Info("session manager configured")