Fix SSE architecture for reliable connections #13

Merged
ryan merged 9 commits from fix/sse-architecture into main 2026-03-03 23:33:14 +00:00
7 changed files with 79 additions and 63 deletions
Showing only changes of commit b2b06a062b - Show all commits

View File

@@ -16,7 +16,6 @@ import (
"github.com/ryanhamamura/games/connect4" "github.com/ryanhamamura/games/connect4"
"github.com/ryanhamamura/games/db/repository" "github.com/ryanhamamura/games/db/repository"
"github.com/ryanhamamura/games/features/c4game/pages" "github.com/ryanhamamura/games/features/c4game/pages"
sharedcomponents "github.com/ryanhamamura/games/features/common/components"
"github.com/ryanhamamura/games/sessions" "github.com/ryanhamamura/games/sessions"
) )
@@ -95,6 +94,7 @@ func HandleGamePage(store *connect4.Store, sm *scs.SessionManager, queries *repo
func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc { func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
gameID := chi.URLParam(r, "id") gameID := chi.URLParam(r, "id")
gi, exists := store.Get(gameID) gi, exists := store.Get(gameID)
@@ -104,68 +104,75 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
} }
playerID := sessions.GetPlayerID(sm, r) playerID := sessions.GetPlayerID(sm, r)
myColor := gi.GetPlayerColor(playerID)
sse := datastar.NewSSE(w, r, datastar.WithCompression( // Subscribe to game state updates BEFORE creating SSE
datastar.WithBrotli(datastar.WithBrotliLevel(5)),
))
chatCfg := c4ChatConfig(gameID)
room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID)
patchAll := func() error {
myColor = gi.GetPlayerColor(playerID)
g := gi.GetGame()
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
}
sendPing := func() error {
return sse.PatchElementTempl(sharedcomponents.ConnectionIndicator(time.Now().UnixMilli()))
}
// Send initial render and ping
if err := sendPing(); err != nil {
return
}
if err := patchAll(); err != nil {
return
}
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
// Subscribe to game state updates
gameCh := make(chan *nats.Msg, 64) gameCh := make(chan *nats.Msg, 64)
gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh) gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Subscribe to chat messages // Subscribe to chat messages BEFORE creating SSE
chatCfg := c4ChatConfig(gameID)
room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID)
chatCh, cleanupChat := room.Subscribe() chatCh, cleanupChat := room.Subscribe()
defer cleanupChat() defer cleanupChat()
ctx := r.Context() // Setup heartbeat BEFORE creating SSE
heartbeat := time.NewTicker(10 * time.Second)
defer heartbeat.Stop()
// NOW create SSE
sse := datastar.NewSSE(w, r, datastar.WithCompression(
datastar.WithBrotli(datastar.WithBrotliLevel(5)),
))
// Define patch function
patchAll := func() error {
myColor := gi.GetPlayerColor(playerID)
g := gi.GetGame()
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
}
// Send initial state
if err := patchAll(); err != nil {
return
}
// Event loop
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C:
if err := sendPing(); err != nil {
return
}
case <-gameCh: case <-gameCh:
// Drain rapid-fire notifications
drainGame:
for {
select {
case <-gameCh:
default:
break drainGame
}
}
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
case chatMsg := <-chatCh: case chatMsg := <-chatCh:
err := sse.PatchElementTempl( if err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("c4-chat-history"), datastar.WithSelectorID("c4-chat-history"),
datastar.WithModeAppend(), datastar.WithModeAppend(),
) ); err != nil {
if err != nil { return
}
case <-heartbeat.C:
// Heartbeat just keeps the connection alive by triggering a game state refresh
if err := patchAll(); err != nil {
return return
} }
} }

View File

@@ -18,7 +18,6 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c
data-signals="{chatMsg: ''}" data-signals="{chatMsg: ''}"
data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) } data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) }
> >
@sharedcomponents.ConnectionIndicator(0)
@GameContent(g, myColor, messages, chatCfg) @GameContent(g, myColor, messages, chatCfg)
</main> </main>
} }

3
go.mod
View File

@@ -170,6 +170,9 @@ require (
github.com/rivo/uniseg v0.4.7 // indirect github.com/rivo/uniseg v0.4.7 // indirect
github.com/riza-io/grpc-go v0.2.0 // indirect github.com/riza-io/grpc-go v0.2.0 // indirect
github.com/sajari/fuzzy v1.0.0 // indirect github.com/sajari/fuzzy v1.0.0 // indirect
github.com/samber/lo v1.52.0 // indirect
github.com/samber/slog-common v0.20.0 // indirect
github.com/samber/slog-zerolog/v2 v2.9.1 // indirect
github.com/segmentio/asm v1.2.1 // indirect github.com/segmentio/asm v1.2.1 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect

6
go.sum
View File

@@ -565,6 +565,12 @@ github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sajari/fuzzy v1.0.0 h1:+FmwVvJErsd0d0hAPlj4CxqxUtQY/fOoY0DwX4ykpRY= github.com/sajari/fuzzy v1.0.0 h1:+FmwVvJErsd0d0hAPlj4CxqxUtQY/fOoY0DwX4ykpRY=
github.com/sajari/fuzzy v1.0.0/go.mod h1:OjYR6KxoWOe9+dOlXeiCJd4dIbED4Oo8wpS89o0pwOo= github.com/sajari/fuzzy v1.0.0/go.mod h1:OjYR6KxoWOe9+dOlXeiCJd4dIbED4Oo8wpS89o0pwOo=
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/samber/slog-common v0.20.0 h1:WaLnm/aCvBJSk5nR5aXZTFBaV0B47A+AEaEOiZDeUnc=
github.com/samber/slog-common v0.20.0/go.mod h1:+Ozat1jgnnE59UAlmNX1IF3IByHsODnnwf9jUcBZ+m8=
github.com/samber/slog-zerolog/v2 v2.9.1 h1:RMOq8XqzfuGx1X0TEIlS9OXbbFmqLY2/wJppghz66YY=
github.com/samber/slog-zerolog/v2 v2.9.1/go.mod h1:DQYYve14WgCRN/XnKeHl4266jXK0DgYkYXkfZ4Fp98k=
github.com/sebdah/goldie/v2 v2.8.0 h1:dZb9wR8q5++oplmEiJT+U/5KyotVD+HNGCAc5gNr8rc= github.com/sebdah/goldie/v2 v2.8.0 h1:dZb9wR8q5++oplmEiJT+U/5KyotVD+HNGCAc5gNr8rc=
github.com/sebdah/goldie/v2 v2.8.0/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI= github.com/sebdah/goldie/v2 v2.8.0/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI=
github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=

View File

@@ -5,10 +5,11 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/ryanhamamura/games/config" "github.com/go-chi/chi/v5/middleware"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/ryanhamamura/games/config"
) )
const ( const (
@@ -64,25 +65,15 @@ func colorLatency(d time.Duration, useColor bool) string {
} }
} }
type responseWriter struct {
http.ResponseWriter
status int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.status = code
rw.ResponseWriter.WriteHeader(code)
}
func RequestLogger(logger *zerolog.Logger, env config.Environment) func(http.Handler) http.Handler { func RequestLogger(logger *zerolog.Logger, env config.Environment) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
rw := &responseWriter{ResponseWriter: w} ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
next.ServeHTTP(rw, r) next.ServeHTTP(ww, r)
status := rw.status status := ww.Status()
if status == 0 { if status == 0 {
status = http.StatusOK status = http.StatusOK
} }

22
main.go
View File

@@ -11,6 +11,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/rs/zerolog/log"
slogzerolog "github.com/samber/slog-zerolog/v2"
"golang.org/x/sync/errgroup"
"github.com/ryanhamamura/games/config" "github.com/ryanhamamura/games/config"
"github.com/ryanhamamura/games/connect4" "github.com/ryanhamamura/games/connect4"
"github.com/ryanhamamura/games/db" "github.com/ryanhamamura/games/db"
@@ -21,11 +27,6 @@ import (
"github.com/ryanhamamura/games/sessions" "github.com/ryanhamamura/games/sessions"
"github.com/ryanhamamura/games/snake" "github.com/ryanhamamura/games/snake"
"github.com/ryanhamamura/games/version" "github.com/ryanhamamura/games/version"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
) )
//go:embed assets //go:embed assets
@@ -36,7 +37,12 @@ func main() {
defer cancel() defer cancel()
cfg := config.Global cfg := config.Global
logging.SetupLogger(cfg.Environment, cfg.LogLevel) zerologLogger := logging.SetupLogger(cfg.Environment, cfg.LogLevel)
slog.SetDefault(slog.New(slogzerolog.Option{
Level: slogzerolog.ZeroLogLeveler{Logger: zerologLogger},
Logger: zerologLogger,
NoTimestamp: true,
}.NewZerologHandler()))
if err := run(ctx); err != nil && err != http.ErrServerClosed { if err := run(ctx); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("server error") log.Fatal().Err(err).Msg("server error")
@@ -101,6 +107,10 @@ func run(ctx context.Context) error {
BaseContext: func(l net.Listener) context.Context { BaseContext: func(l net.Listener) context.Context {
return egctx return egctx
}, },
ErrorLog: slog.NewLogLogger(
slog.Default().Handler(),
slog.LevelError,
),
} }
eg.Go(func() error { eg.Go(func() error {

View File

@@ -33,7 +33,7 @@ func SetupSessionManager(db *sql.DB) (*scs.SessionManager, func()) {
sessionManager.Cookie.Name = "games_session" sessionManager.Cookie.Name = "games_session"
sessionManager.Cookie.Path = "/" sessionManager.Cookie.Path = "/"
sessionManager.Cookie.HttpOnly = true sessionManager.Cookie.HttpOnly = true
sessionManager.Cookie.Secure = true sessionManager.Cookie.Secure = false
sessionManager.Cookie.SameSite = http.SameSiteLaxMode sessionManager.Cookie.SameSite = http.SameSiteLaxMode
slog.Info("session manager configured") slog.Info("session manager configured")