3 Commits

Author SHA1 Message Date
Ryan Hamamura
6d078f3adb feat: add connection status indicator with SSE heartbeat
All checks were successful
CI / Deploy / test (pull_request) Successful in 14s
CI / Deploy / lint (pull_request) Successful in 26s
CI / Deploy / deploy (pull_request) Has been skipped
- Add ConnectionIndicator component showing green/red dot
- Send lastPing signal every 15 seconds via SSE
- Indicator turns red if no ping received in 20 seconds
- Gives users confidence the live connection is active
2026-03-03 09:54:31 -10:00
ffbff8cca5 Merge pull request 'Simplify chat subscription API' (#9) from refactor/chat-subscribe-messages into main
All checks were successful
CI / Deploy / test (push) Successful in 14s
CI / Deploy / lint (push) Successful in 25s
CI / Deploy / deploy (push) Successful in 1m25s
2026-03-03 19:54:21 +00:00
Ryan Hamamura
bcb1fa3872 refactor: simplify chat subscription API
All checks were successful
CI / Deploy / test (pull_request) Successful in 14s
CI / Deploy / lint (pull_request) Successful in 25s
CI / Deploy / deploy (pull_request) Has been skipped
Room.Subscribe() now returns a channel of parsed Message structs
instead of raw NATS messages. The room handles NATS subscription
and message parsing internally, so callers no longer need to call
Receive() separately.
2026-03-03 09:45:56 -10:00
6 changed files with 88 additions and 35 deletions

View File

@@ -76,12 +76,11 @@ func (r *Room) Send(msg Message) {
} }
} }
// Receive processes an incoming NATS message, appending it to the buffer. // receive processes an incoming NATS message, appending it to the buffer.
// Returns the new message and a snapshot of all messages. func (r *Room) receive(data []byte) (Message, bool) {
func (r *Room) Receive(data []byte) (Message, []Message) {
var msg Message var msg Message
if err := json.Unmarshal(data, &msg); err != nil { if err := json.Unmarshal(data, &msg); err != nil {
return msg, nil return msg, false
} }
r.mu.Lock() r.mu.Lock()
@@ -89,11 +88,9 @@ func (r *Room) Receive(data []byte) (Message, []Message) {
if len(r.messages) > maxMessages { if len(r.messages) > maxMessages {
r.messages = r.messages[len(r.messages)-maxMessages:] r.messages = r.messages[len(r.messages)-maxMessages:]
} }
snapshot := make([]Message, len(r.messages))
copy(snapshot, r.messages)
r.mu.Unlock() r.mu.Unlock()
return msg, snapshot return msg, true
} }
// Messages returns a snapshot of the current message buffer. // Messages returns a snapshot of the current message buffer.
@@ -105,15 +102,32 @@ func (r *Room) Messages() []Message {
return snapshot return snapshot
} }
// Subscribe creates a NATS channel subscription for the room's subject. // Subscribe returns a channel of parsed messages and a cleanup function.
// Caller is responsible for unsubscribing. // The room handles NATS subscription internally and buffers messages.
func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) { func (r *Room) Subscribe() (<-chan Message, func()) {
ch := make(chan *nats.Msg, 64) natsCh := make(chan *nats.Msg, 64)
sub, err := r.nc.ChanSubscribe(r.subject, ch) msgCh := make(chan Message, 64)
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
if err != nil { if err != nil {
return nil, nil, err close(msgCh)
return msgCh, func() {}
} }
return ch, sub, nil
go func() {
for natsMsg := range natsCh {
if msg, ok := r.receive(natsMsg.Data); ok {
msgCh <- msg
}
}
close(msgCh)
}()
cleanup := func() {
_ = sub.Unsubscribe()
}
return msgCh, cleanup
} }
func (r *Room) saveMessage(msg Message) { func (r *Room) saveMessage(msg Message) {

View File

@@ -118,11 +118,21 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg)) return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
} }
// Send initial render sendPing := func() error {
return sse.PatchSignals([]byte(fmt.Sprintf(`{"lastPing":%d}`, time.Now().UnixMilli())))
}
// Send initial render and ping
if err := sendPing(); err != nil {
return
}
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
// Subscribe to game state updates // Subscribe to game state updates
gameCh := make(chan *nats.Msg, 64) gameCh := make(chan *nats.Msg, 64)
gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh) gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh)
@@ -132,23 +142,23 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Subscribe to chat messages // Subscribe to chat messages
chatCh, chatSub, err := room.Subscribe() chatCh, cleanupChat := room.Subscribe()
if err != nil { defer cleanupChat()
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
ctx := r.Context() ctx := r.Context()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C:
if err := sendPing(); err != nil {
return
}
case <-gameCh: case <-gameCh:
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
case msg := <-chatCh: case chatMsg := <-chatCh:
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("c4-chat-history"), datastar.WithSelectorID("c4-chat-history"),

View File

@@ -15,9 +15,10 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c
@layouts.Base("Connect 4") { @layouts.Base("Connect 4") {
<main <main
class="flex flex-col items-center gap-4 p-4" class="flex flex-col items-center gap-4 p-4"
data-signals="{chatMsg: ''}" data-signals="{chatMsg: '', lastPing: 0}"
data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) } data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) }
> >
@sharedcomponents.ConnectionIndicator()
@GameContent(g, myColor, messages, chatCfg) @GameContent(g, myColor, messages, chatCfg)
</main> </main>
} }

View File

@@ -44,6 +44,21 @@ templ NicknamePrompt(returnPath string) {
</main> </main>
} }
// ConnectionIndicator shows a small dot indicating SSE connection status.
// It requires a `lastPing` signal (unix ms timestamp) to be set by the server.
templ ConnectionIndicator() {
<div
id="connection-indicator"
class="fixed top-2 right-2 flex items-center gap-1 text-xs text-gray-500"
title="Connection status"
>
<span
class="w-2 h-2 rounded-full transition-colors duration-300"
data-class="{'bg-green-500': Date.now() - $lastPing < 20000, 'bg-red-500': Date.now() - $lastPing >= 20000}"
></span>
</div>
}
templ GameJoinPrompt(loginURL string, registerURL string, gamePath string) { templ GameJoinPrompt(loginURL string, registerURL string, gamePath string) {
<main class="max-w-sm mx-auto mt-8 text-center"> <main class="max-w-sm mx-auto mt-8 text-center">
<h1 class="text-3xl font-bold">Join Game</h1> <h1 class="text-3xl font-bold">Join Game</h1>

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@@ -123,11 +124,21 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID)) return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID))
} }
// Send initial render sendPing := func() error {
return sse.PatchSignals([]byte(fmt.Sprintf(`{"lastPing":%d}`, time.Now().UnixMilli())))
}
// Send initial render and ping
if err := sendPing(); err != nil {
return
}
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
heartbeat := time.NewTicker(15 * time.Second)
defer heartbeat.Stop()
// Subscribe to game updates via NATS // Subscribe to game updates via NATS
gameCh := make(chan *nats.Msg, 64) gameCh := make(chan *nats.Msg, 64)
gameSub, err := nc.ChanSubscribe(snake.GameSubject(gameID), gameCh) gameSub, err := nc.ChanSubscribe(snake.GameSubject(gameID), gameCh)
@@ -137,15 +148,12 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Chat subscription (multiplayer only) // Chat subscription (multiplayer only)
var chatCh chan *nats.Msg var chatCh <-chan chat.Message
var chatSub *nats.Subscription var cleanupChat func()
if room != nil { if room != nil {
chatCh, chatSub, err = room.Subscribe() chatCh, cleanupChat = room.Subscribe()
if err != nil { defer cleanupChat()
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
} }
ctx := r.Context() ctx := r.Context()
@@ -154,6 +162,11 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
case <-ctx.Done(): case <-ctx.Done():
return return
case <-heartbeat.C:
if err := sendPing(); err != nil {
return
}
case <-gameCh: case <-gameCh:
// Drain backed-up game updates // Drain backed-up game updates
for { for {
@@ -168,11 +181,10 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
return return
} }
case msg := <-chatCh: case chatMsg, ok := <-chatCh:
if msg == nil { if !ok {
continue continue
} }
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("snake-chat-history"), datastar.WithSelectorID("snake-chat-history"),

View File

@@ -32,11 +32,12 @@ templ GamePage(sg *snake.SnakeGame, mySlot int, messages []chat.Message, chatCfg
@layouts.Base("Snake") { @layouts.Base("Snake") {
<main <main
class="snake-wrapper flex flex-col items-center gap-4 p-4" class="snake-wrapper flex flex-col items-center gap-4 p-4"
data-signals={ `{"chatMsg":""}` } data-signals={ `{"chatMsg":"","lastPing":0}` }
data-init={ fmt.Sprintf("@get('/snake/%s/events',{requestCancellation:'disabled'})", gameID) } data-init={ fmt.Sprintf("@get('/snake/%s/events',{requestCancellation:'disabled'})", gameID) }
data-on:keydown__throttle.100ms={ keydownScript(gameID) } data-on:keydown__throttle.100ms={ keydownScript(gameID) }
tabindex="0" tabindex="0"
> >
@components.ConnectionIndicator()
@GameContent(sg, mySlot, messages, chatCfg, gameID) @GameContent(sg, mySlot, messages, chatCfg, gameID)
</main> </main>
} }