Fix SSE architecture for reliable connections #13

Merged
ryan merged 9 commits from fix/sse-architecture into main 2026-03-03 23:33:14 +00:00
4 changed files with 95 additions and 53 deletions
Showing only changes of commit de78ba6d39 - Show all commits

View File

@@ -1,41 +1,24 @@
package snakegame package snakegame
import ( import (
"fmt" "errors"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go"
"github.com/starfederation/datastar-go/datastar" "github.com/starfederation/datastar-go/datastar"
"github.com/ryanhamamura/games/chat" "github.com/ryanhamamura/games/chat"
chatcomponents "github.com/ryanhamamura/games/chat/components" chatcomponents "github.com/ryanhamamura/games/chat/components"
sharedcomponents "github.com/ryanhamamura/games/features/common/components"
"github.com/ryanhamamura/games/features/snakegame/pages" "github.com/ryanhamamura/games/features/snakegame/pages"
"github.com/ryanhamamura/games/features/snakegame/services"
"github.com/ryanhamamura/games/sessions" "github.com/ryanhamamura/games/sessions"
"github.com/ryanhamamura/games/snake" "github.com/ryanhamamura/games/snake"
) )
func snakeChatColor(slot int) string { func HandleSnakePage(snakeStore *snake.SnakeStore, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
if slot >= 0 && slot < len(snake.SnakeColors) {
return snake.SnakeColors[slot]
}
return "#666"
}
func snakeChatConfig(gameID string) chatcomponents.Config {
return chatcomponents.Config{
CSSPrefix: "snake",
PostURL: fmt.Sprintf("/snake/%s/chat", gameID),
Color: snakeChatColor,
StopKeyPropagation: true,
}
}
func HandleSnakePage(snakeStore *snake.SnakeStore, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
gameID := chi.URLParam(r, "id") gameID := chi.URLParam(r, "id")
si, ok := snakeStore.Get(gameID) si, ok := snakeStore.Get(gameID)
@@ -77,13 +60,14 @@ func HandleSnakePage(snakeStore *snake.SnakeStore, sm *scs.SessionManager) http.
} }
sg := si.GetGame() sg := si.GetGame()
if err := pages.GamePage(sg, mySlot, nil, snakeChatConfig(gameID), gameID).Render(r.Context(), w); err != nil { chatCfg := svc.ChatConfig(gameID)
if err := pages.GamePage(sg, mySlot, nil, chatCfg, gameID).Render(r.Context(), w); err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
} }
} }
} }
func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.SessionManager) http.HandlerFunc { func HandleSnakeEvents(snakeStore *snake.SnakeStore, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
gameID := chi.URLParam(r, "id") gameID := chi.URLParam(r, "id")
si, ok := snakeStore.Get(gameID) si, ok := snakeStore.Get(gameID)
@@ -95,17 +79,25 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
playerID := sessions.GetPlayerID(sm, r) playerID := sessions.GetPlayerID(sm, r)
mySlot := si.GetPlayerSlot(playerID) mySlot := si.GetPlayerSlot(playerID)
// Subscribe to game updates BEFORE creating SSE (following portigo pattern)
gameSub, gameCh, err := svc.SubscribeGameUpdates(gameID)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
defer gameSub.Unsubscribe() //nolint:errcheck
sse := datastar.NewSSE(w, r, datastar.WithCompression( sse := datastar.NewSSE(w, r, datastar.WithCompression(
datastar.WithBrotli(datastar.WithBrotliLevel(5)), datastar.WithBrotli(datastar.WithBrotliLevel(5)),
)) ))
chatCfg := snakeChatConfig(gameID) chatCfg := svc.ChatConfig(gameID)
// Chat room (multiplayer only) // Chat room (multiplayer only)
var room *chat.Room var room *chat.Room
sg := si.GetGame() sg := si.GetGame()
if sg.Mode == snake.ModeMultiplayer { if sg.Mode == snake.ModeMultiplayer {
room = chat.NewRoom(nc, snake.ChatSubject(gameID)) room = svc.ChatRoom(gameID)
} }
chatMessages := func() []chat.Message { chatMessages := func() []chat.Message {
@@ -118,36 +110,21 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
patchAll := func() error { patchAll := func() error {
si, ok = snakeStore.Get(gameID) si, ok = snakeStore.Get(gameID)
if !ok { if !ok {
return fmt.Errorf("game not found") return errors.New("game not found")
} }
mySlot = si.GetPlayerSlot(playerID) mySlot = si.GetPlayerSlot(playerID)
sg = si.GetGame() sg = si.GetGame()
return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID)) return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID))
} }
sendPing := func() error { // Send initial render
return sse.PatchElementTempl(sharedcomponents.ConnectionIndicator(time.Now().UnixMilli()))
}
// Send initial render and ping
if err := sendPing(); err != nil {
return
}
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
heartbeat := time.NewTicker(15 * time.Second) heartbeat := time.NewTicker(10 * time.Second)
defer heartbeat.Stop() defer heartbeat.Stop()
// Subscribe to game updates via NATS
gameCh := make(chan *nats.Msg, 64)
gameSub, err := nc.ChanSubscribe(snake.GameSubject(gameID), gameCh)
if err != nil {
return
}
defer gameSub.Unsubscribe() //nolint:errcheck
// Chat subscription (multiplayer only) // Chat subscription (multiplayer only)
var chatCh <-chan chat.Message var chatCh <-chan chat.Message
var cleanupChat func() var cleanupChat func()
@@ -164,7 +141,8 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
return return
case <-heartbeat.C: case <-heartbeat.C:
if err := sendPing(); err != nil { // Heartbeat just refreshes game state
if err := patchAll(); err != nil {
return return
} }
@@ -231,7 +209,7 @@ type chatSignals struct {
ChatMsg string `json:"chatMsg"` ChatMsg string `json:"chatMsg"`
} }
func HandleSendChat(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.SessionManager) http.HandlerFunc { func HandleSendChat(snakeStore *snake.SnakeStore, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
gameID := chi.URLParam(r, "id") gameID := chi.URLParam(r, "id")
si, ok := snakeStore.Get(gameID) si, ok := snakeStore.Get(gameID)
@@ -264,7 +242,7 @@ func HandleSendChat(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Session
Message: signals.ChatMsg, Message: signals.ChatMsg,
} }
room := chat.NewRoom(nc, snake.ChatSubject(gameID)) room := svc.ChatRoom(gameID)
room.Send(msg) room.Send(msg)
sse := datastar.NewSSE(w, r) sse := datastar.NewSSE(w, r)

View File

@@ -4,17 +4,17 @@ package snakegame
import ( import (
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/games/features/snakegame/services"
"github.com/ryanhamamura/games/snake" "github.com/ryanhamamura/games/snake"
) )
func SetupRoutes(router chi.Router, snakeStore *snake.SnakeStore, nc *nats.Conn, sessions *scs.SessionManager) { func SetupRoutes(router chi.Router, snakeStore *snake.SnakeStore, svc *services.GameService, sessions *scs.SessionManager) {
router.Route("/snake/{id}", func(r chi.Router) { router.Route("/snake/{id}", func(r chi.Router) {
r.Get("/", HandleSnakePage(snakeStore, sessions)) r.Get("/", HandleSnakePage(snakeStore, svc, sessions))
r.Get("/events", HandleSnakeEvents(snakeStore, nc, sessions)) r.Get("/events", HandleSnakeEvents(snakeStore, svc, sessions))
r.Post("/dir", HandleSetDirection(snakeStore, sessions)) r.Post("/dir", HandleSetDirection(snakeStore, sessions))
r.Post("/chat", HandleSendChat(snakeStore, nc, sessions)) r.Post("/chat", HandleSendChat(snakeStore, svc, sessions))
r.Post("/join", HandleSetNickname(snakeStore, sessions)) r.Post("/join", HandleSetNickname(snakeStore, sessions))
r.Post("/rematch", HandleRematch(snakeStore, sessions)) r.Post("/rematch", HandleRematch(snakeStore, sessions))
}) })

View File

@@ -0,0 +1,62 @@
// Package services provides the game service layer for Snake,
// handling NATS subscriptions and chat room management.
package services
import (
"fmt"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/games/chat"
chatcomponents "github.com/ryanhamamura/games/chat/components"
"github.com/ryanhamamura/games/snake"
)
func snakeChatColor(slot int) string {
if slot >= 0 && slot < len(snake.SnakeColors) {
return snake.SnakeColors[slot]
}
return "#666"
}
// GameService manages NATS subscriptions and chat for Snake games.
type GameService struct {
nc *nats.Conn
}
// NewGameService creates a new game service.
func NewGameService(nc *nats.Conn) *GameService {
return &GameService{
nc: nc,
}
}
// SubscribeGameUpdates returns a NATS subscription and channel for game state updates.
func (s *GameService) SubscribeGameUpdates(gameID string) (*nats.Subscription, <-chan *nats.Msg, error) {
ch := make(chan *nats.Msg, 64)
sub, err := s.nc.ChanSubscribe(snake.GameSubject(gameID), ch)
if err != nil {
return nil, nil, fmt.Errorf("subscribing to game updates: %w", err)
}
return sub, ch, nil
}
// ChatConfig returns the chat configuration for a game.
func (s *GameService) ChatConfig(gameID string) chatcomponents.Config {
return chatcomponents.Config{
CSSPrefix: "snake",
PostURL: fmt.Sprintf("/snake/%s/chat", gameID),
Color: snakeChatColor,
StopKeyPropagation: true,
}
}
// ChatRoom returns a chat room for a game (ephemeral, not persisted).
func (s *GameService) ChatRoom(gameID string) *chat.Room {
return chat.NewRoom(s.nc, snake.ChatSubject(gameID))
}
// PublishGameUpdate sends a notification that the game state has changed.
func (s *GameService) PublishGameUpdate(gameID string) error {
return s.nc.Publish(snake.GameSubject(gameID), nil)
}

View File

@@ -17,9 +17,10 @@ import (
"github.com/ryanhamamura/games/db/repository" "github.com/ryanhamamura/games/db/repository"
"github.com/ryanhamamura/games/features/auth" "github.com/ryanhamamura/games/features/auth"
"github.com/ryanhamamura/games/features/c4game" "github.com/ryanhamamura/games/features/c4game"
"github.com/ryanhamamura/games/features/c4game/services" c4services "github.com/ryanhamamura/games/features/c4game/services"
"github.com/ryanhamamura/games/features/lobby" "github.com/ryanhamamura/games/features/lobby"
"github.com/ryanhamamura/games/features/snakegame" "github.com/ryanhamamura/games/features/snakegame"
snakeservices "github.com/ryanhamamura/games/features/snakegame/services"
"github.com/ryanhamamura/games/snake" "github.com/ryanhamamura/games/snake"
) )
@@ -42,12 +43,13 @@ func SetupRoutes(
} }
// Services // Services
c4Svc := services.NewGameService(nc, queries) c4Svc := c4services.NewGameService(nc, queries)
snakeSvc := snakeservices.NewGameService(nc)
auth.SetupRoutes(router, queries, sessions) auth.SetupRoutes(router, queries, sessions)
lobby.SetupRoutes(router, queries, sessions, store, snakeStore) lobby.SetupRoutes(router, queries, sessions, store, snakeStore)
c4game.SetupRoutes(router, store, c4Svc, sessions) c4game.SetupRoutes(router, store, c4Svc, sessions)
snakegame.SetupRoutes(router, snakeStore, nc, sessions) snakegame.SetupRoutes(router, snakeStore, snakeSvc, sessions)
} }
func setupReload(router chi.Router) { func setupReload(router chi.Router) {