Fix SSE architecture for reliable connections (#13)
All checks were successful
CI / Deploy / test (push) Successful in 14s
CI / Deploy / lint (push) Successful in 25s
CI / Deploy / deploy (push) Successful in 1m32s

This commit was merged in pull request #13.
This commit is contained in:
2026-03-03 23:33:13 +00:00
parent 331c4c8759
commit 67a768ea22
20 changed files with 2950 additions and 231 deletions

View File

@@ -1,47 +1,23 @@
package c4game
import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/alexedwards/scs/v2"
"github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go"
"github.com/starfederation/datastar-go/datastar"
"github.com/ryanhamamura/games/chat"
chatcomponents "github.com/ryanhamamura/games/chat/components"
"github.com/ryanhamamura/games/connect4"
"github.com/ryanhamamura/games/db/repository"
"github.com/ryanhamamura/games/features/c4game/pages"
sharedcomponents "github.com/ryanhamamura/games/features/common/components"
"github.com/ryanhamamura/games/features/c4game/services"
"github.com/ryanhamamura/games/sessions"
)
// c4ChatColors maps player color (1=Red, 2=Yellow) to CSS background colors.
var c4ChatColors = map[int]string{
0: "#4a2a3a", // color 1 stored as slot 0
1: "#2a4545", // color 2 stored as slot 1
}
func c4ChatColor(slot int) string {
if c, ok := c4ChatColors[slot]; ok {
return c
}
return "#666"
}
func c4ChatConfig(gameID string) chatcomponents.Config {
return chatcomponents.Config{
CSSPrefix: "c4",
PostURL: fmt.Sprintf("/games/%s/chat", gameID),
Color: c4ChatColor,
}
}
func HandleGamePage(store *connect4.Store, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc {
func HandleGamePage(store *connect4.Store, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
gameID := chi.URLParam(r, "id")
@@ -85,16 +61,17 @@ func HandleGamePage(store *connect4.Store, sm *scs.SessionManager, queries *repo
}
g := gi.GetGame()
room := chat.NewPersistentRoom(nil, "", queries, gameID)
room := svc.ChatRoom(gameID)
if err := pages.GamePage(g, myColor, room.Messages(), c4ChatConfig(gameID)).Render(r.Context(), w); err != nil {
if err := pages.GamePage(g, myColor, room.Messages(), svc.ChatConfig(gameID)).Render(r.Context(), w); err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
}
}
}
func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc {
func HandleGameEvents(store *connect4.Store, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
gameID := chi.URLParam(r, "id")
gi, exists := store.Get(gameID)
@@ -104,68 +81,74 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
}
playerID := sessions.GetPlayerID(sm, r)
myColor := gi.GetPlayerColor(playerID)
sse := datastar.NewSSE(w, r, datastar.WithCompression(
datastar.WithBrotli(datastar.WithBrotliLevel(5)),
))
chatCfg := c4ChatConfig(gameID)
room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID)
patchAll := func() error {
myColor = gi.GetPlayerColor(playerID)
g := gi.GetGame()
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
}
sendPing := func() error {
return sse.PatchElementTempl(sharedcomponents.ConnectionIndicator(time.Now().UnixMilli()))
}
// Send initial render and ping
if err := sendPing(); err != nil {
return
}
if err := patchAll(); err != nil {
return
}
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
// Subscribe to game state updates
gameCh := make(chan *nats.Msg, 64)
gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh)
// Subscribe to game state updates BEFORE creating SSE
gameSub, gameCh, err := svc.SubscribeGameUpdates(gameID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer gameSub.Unsubscribe() //nolint:errcheck
// Subscribe to chat messages
// Subscribe to chat messages BEFORE creating SSE
chatCfg := svc.ChatConfig(gameID)
room := svc.ChatRoom(gameID)
chatCh, cleanupChat := room.Subscribe()
defer cleanupChat()
ctx := r.Context()
// Setup heartbeat BEFORE creating SSE
heartbeat := time.NewTicker(1 * time.Second)
defer heartbeat.Stop()
// NOW create SSE
sse := datastar.NewSSE(w, r, datastar.WithCompression(
datastar.WithBrotli(datastar.WithBrotliLevel(5)),
))
// Define patch function
patchAll := func() error {
myColor := gi.GetPlayerColor(playerID)
g := gi.GetGame()
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
}
// Send initial state
if err := patchAll(); err != nil {
return
}
// Event loop
for {
select {
case <-ctx.Done():
return
case <-heartbeat.C:
if err := sendPing(); err != nil {
return
}
case <-gameCh:
// Drain rapid-fire notifications
drainGame:
for {
select {
case <-gameCh:
default:
break drainGame
}
}
if err := patchAll(); err != nil {
return
}
case chatMsg := <-chatCh:
err := sse.PatchElementTempl(
if err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("c4-chat-history"),
datastar.WithModeAppend(),
)
if err != nil {
); err != nil {
return
}
case <-heartbeat.C:
// Heartbeat refreshes game state to keep connection alive
if err := patchAll(); err != nil {
return
}
}
@@ -202,7 +185,7 @@ func HandleDropPiece(store *connect4.Store, sm *scs.SessionManager) http.Handler
}
}
func HandleSendChat(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager, queries *repository.Queries) http.HandlerFunc {
func HandleSendChat(store *connect4.Store, svc *services.GameService, sm *scs.SessionManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
gameID := chi.URLParam(r, "id")
@@ -249,7 +232,7 @@ func HandleSendChat(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManager
Message: signals.ChatMsg,
Time: time.Now().UnixMilli(),
}
room := chat.NewPersistentRoom(nc, connect4.ChatSubject(gameID), queries, gameID)
room := svc.ChatRoom(gameID)
room.Send(msg)
sse := datastar.NewSSE(w, r)

View File

@@ -18,7 +18,6 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c
data-signals="{chatMsg: ''}"
data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) }
>
@sharedcomponents.ConnectionIndicator(0)
@GameContent(g, myColor, messages, chatCfg)
</main>
}
@@ -26,6 +25,7 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c
templ GameContent(g *connect4.Game, myColor int, messages []chat.Message, chatCfg chatcomponents.Config) {
<div id="game-content">
@sharedcomponents.LiveClock()
@sharedcomponents.BackToLobby()
@sharedcomponents.StealthTitle("text-3xl font-bold")
@components.PlayerInfo(g, myColor)

View File

@@ -4,24 +4,22 @@ package c4game
import (
"github.com/alexedwards/scs/v2"
"github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/games/connect4"
"github.com/ryanhamamura/games/db/repository"
"github.com/ryanhamamura/games/features/c4game/services"
)
func SetupRoutes(
router chi.Router,
store *connect4.Store,
nc *nats.Conn,
svc *services.GameService,
sessions *scs.SessionManager,
queries *repository.Queries,
) {
router.Route("/games/{id}", func(r chi.Router) {
r.Get("/", HandleGamePage(store, sessions, queries))
r.Get("/events", HandleGameEvents(store, nc, sessions, queries))
r.Get("/", HandleGamePage(store, svc, sessions))
r.Get("/events", HandleGameEvents(store, svc, sessions))
r.Post("/drop", HandleDropPiece(store, sessions))
r.Post("/chat", HandleSendChat(store, nc, sessions, queries))
r.Post("/chat", HandleSendChat(store, svc, sessions))
r.Post("/join", HandleSetNickname(store, sessions))
r.Post("/rematch", HandleRematch(store, sessions))
})

View File

@@ -0,0 +1,70 @@
// Package services provides the game service layer for Connect 4,
// handling NATS subscriptions and chat room management.
package services
import (
"fmt"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/games/chat"
chatcomponents "github.com/ryanhamamura/games/chat/components"
"github.com/ryanhamamura/games/connect4"
"github.com/ryanhamamura/games/db/repository"
)
// c4ChatColors maps player slot (0-indexed) to CSS background colors.
var c4ChatColors = map[int]string{
0: "#4a2a3a", // Red player
1: "#2a4545", // Yellow player
}
func c4ChatColor(slot int) string {
if c, ok := c4ChatColors[slot]; ok {
return c
}
return "#666"
}
// GameService manages NATS subscriptions and chat for Connect 4 games.
type GameService struct {
nc *nats.Conn
queries *repository.Queries
}
// NewGameService creates a new game service.
func NewGameService(nc *nats.Conn, queries *repository.Queries) *GameService {
return &GameService{
nc: nc,
queries: queries,
}
}
// SubscribeGameUpdates returns a NATS subscription and channel for game state updates.
func (s *GameService) SubscribeGameUpdates(gameID string) (*nats.Subscription, <-chan *nats.Msg, error) {
ch := make(chan *nats.Msg, 64)
sub, err := s.nc.ChanSubscribe(connect4.GameSubject(gameID), ch)
if err != nil {
return nil, nil, fmt.Errorf("subscribing to game updates: %w", err)
}
return sub, ch, nil
}
// ChatConfig returns the chat configuration for a game.
func (s *GameService) ChatConfig(gameID string) chatcomponents.Config {
return chatcomponents.Config{
CSSPrefix: "c4",
PostURL: fmt.Sprintf("/games/%s/chat", gameID),
Color: c4ChatColor,
}
}
// ChatRoom returns a persistent chat room for a game.
func (s *GameService) ChatRoom(gameID string) *chat.Room {
return chat.NewPersistentRoom(s.nc, connect4.ChatSubject(gameID), s.queries, gameID)
}
// PublishGameUpdate sends a notification that the game state has changed.
func (s *GameService) PublishGameUpdate(gameID string) error {
return s.nc.Publish(connect4.GameSubject(gameID), nil)
}