1 Commits

Author SHA1 Message Date
Ryan Hamamura
729db559ea feat: add connection status indicator with SSE heartbeat
All checks were successful
CI / Deploy / test (pull_request) Successful in 17s
CI / Deploy / lint (pull_request) Successful in 25s
CI / Deploy / deploy (pull_request) Has been skipped
- Add ConnectionIndicator component showing green/red dot
- Send lastPing signal every 15 seconds via SSE
- Indicator turns red if no ping received in 20 seconds
- Gives users confidence the live connection is active
2026-03-03 09:53:20 -10:00
3 changed files with 31 additions and 37 deletions

View File

@@ -76,11 +76,12 @@ func (r *Room) Send(msg Message) {
} }
} }
// receive processes an incoming NATS message, appending it to the buffer. // Receive processes an incoming NATS message, appending it to the buffer.
func (r *Room) receive(data []byte) (Message, bool) { // Returns the new message and a snapshot of all messages.
func (r *Room) Receive(data []byte) (Message, []Message) {
var msg Message var msg Message
if err := json.Unmarshal(data, &msg); err != nil { if err := json.Unmarshal(data, &msg); err != nil {
return msg, false return msg, nil
} }
r.mu.Lock() r.mu.Lock()
@@ -88,9 +89,11 @@ func (r *Room) receive(data []byte) (Message, bool) {
if len(r.messages) > maxMessages { if len(r.messages) > maxMessages {
r.messages = r.messages[len(r.messages)-maxMessages:] r.messages = r.messages[len(r.messages)-maxMessages:]
} }
snapshot := make([]Message, len(r.messages))
copy(snapshot, r.messages)
r.mu.Unlock() r.mu.Unlock()
return msg, true return msg, snapshot
} }
// Messages returns a snapshot of the current message buffer. // Messages returns a snapshot of the current message buffer.
@@ -102,32 +105,15 @@ func (r *Room) Messages() []Message {
return snapshot return snapshot
} }
// Subscribe returns a channel of parsed messages and a cleanup function. // Subscribe creates a NATS channel subscription for the room's subject.
// The room handles NATS subscription internally and buffers messages. // Caller is responsible for unsubscribing.
func (r *Room) Subscribe() (<-chan Message, func()) { func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) {
natsCh := make(chan *nats.Msg, 64) ch := make(chan *nats.Msg, 64)
msgCh := make(chan Message, 64) sub, err := r.nc.ChanSubscribe(r.subject, ch)
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
if err != nil { if err != nil {
close(msgCh) return nil, nil, err
return msgCh, func() {}
} }
return ch, sub, nil
go func() {
for natsMsg := range natsCh {
if msg, ok := r.receive(natsMsg.Data); ok {
msgCh <- msg
}
}
close(msgCh)
}()
cleanup := func() {
_ = sub.Unsubscribe()
}
return msgCh, cleanup
} }
func (r *Room) saveMessage(msg Message) { func (r *Room) saveMessage(msg Message) {

View File

@@ -142,8 +142,11 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Subscribe to chat messages // Subscribe to chat messages
chatCh, cleanupChat := room.Subscribe() chatCh, chatSub, err := room.Subscribe()
defer cleanupChat() if err != nil {
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
ctx := r.Context() ctx := r.Context()
for { for {
@@ -158,7 +161,8 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
case chatMsg := <-chatCh: case msg := <-chatCh:
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("c4-chat-history"), datastar.WithSelectorID("c4-chat-history"),

View File

@@ -148,12 +148,15 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Chat subscription (multiplayer only) // Chat subscription (multiplayer only)
var chatCh <-chan chat.Message var chatCh chan *nats.Msg
var cleanupChat func() var chatSub *nats.Subscription
if room != nil { if room != nil {
chatCh, cleanupChat = room.Subscribe() chatCh, chatSub, err = room.Subscribe()
defer cleanupChat() if err != nil {
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
} }
ctx := r.Context() ctx := r.Context()
@@ -181,10 +184,11 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
return return
} }
case chatMsg, ok := <-chatCh: case msg := <-chatCh:
if !ok { if msg == nil {
continue continue
} }
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("snake-chat-history"), datastar.WithSelectorID("snake-chat-history"),