From bcb1fa3872cd1c86df05445e6e756fa4d353394b Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Tue, 3 Mar 2026 09:45:56 -1000 Subject: [PATCH] refactor: simplify chat subscription API Room.Subscribe() now returns a channel of parsed Message structs instead of raw NATS messages. The room handles NATS subscription and message parsing internally, so callers no longer need to call Receive() separately. --- chat/chat.go | 42 ++++++++++++++++++++++------------ features/c4game/handlers.go | 10 +++----- features/snakegame/handlers.go | 16 +++++-------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/chat/chat.go b/chat/chat.go index c5b98be..2c9f1c4 100644 --- a/chat/chat.go +++ b/chat/chat.go @@ -76,12 +76,11 @@ func (r *Room) Send(msg Message) { } } -// Receive processes an incoming NATS message, appending it to the buffer. -// Returns the new message and a snapshot of all messages. -func (r *Room) Receive(data []byte) (Message, []Message) { +// receive processes an incoming NATS message, appending it to the buffer. +func (r *Room) receive(data []byte) (Message, bool) { var msg Message if err := json.Unmarshal(data, &msg); err != nil { - return msg, nil + return msg, false } r.mu.Lock() @@ -89,11 +88,9 @@ func (r *Room) Receive(data []byte) (Message, []Message) { if len(r.messages) > maxMessages { r.messages = r.messages[len(r.messages)-maxMessages:] } - snapshot := make([]Message, len(r.messages)) - copy(snapshot, r.messages) r.mu.Unlock() - return msg, snapshot + return msg, true } // Messages returns a snapshot of the current message buffer. @@ -105,15 +102,32 @@ func (r *Room) Messages() []Message { return snapshot } -// Subscribe creates a NATS channel subscription for the room's subject. -// Caller is responsible for unsubscribing. -func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) { - ch := make(chan *nats.Msg, 64) - sub, err := r.nc.ChanSubscribe(r.subject, ch) +// Subscribe returns a channel of parsed messages and a cleanup function. +// The room handles NATS subscription internally and buffers messages. +func (r *Room) Subscribe() (<-chan Message, func()) { + natsCh := make(chan *nats.Msg, 64) + msgCh := make(chan Message, 64) + + sub, err := r.nc.ChanSubscribe(r.subject, natsCh) if err != nil { - return nil, nil, err + close(msgCh) + return msgCh, func() {} } - return ch, sub, nil + + go func() { + for natsMsg := range natsCh { + if msg, ok := r.receive(natsMsg.Data); ok { + msgCh <- msg + } + } + close(msgCh) + }() + + cleanup := func() { + _ = sub.Unsubscribe() + } + + return msgCh, cleanup } func (r *Room) saveMessage(msg Message) { diff --git a/features/c4game/handlers.go b/features/c4game/handlers.go index be6c09b..e6f5b52 100644 --- a/features/c4game/handlers.go +++ b/features/c4game/handlers.go @@ -132,11 +132,8 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag defer gameSub.Unsubscribe() //nolint:errcheck // Subscribe to chat messages - chatCh, chatSub, err := room.Subscribe() - if err != nil { - return - } - defer chatSub.Unsubscribe() //nolint:errcheck + chatCh, cleanupChat := room.Subscribe() + defer cleanupChat() ctx := r.Context() for { @@ -147,8 +144,7 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag if err := patchAll(); err != nil { return } - case msg := <-chatCh: - chatMsg, _ := room.Receive(msg.Data) + case chatMsg := <-chatCh: err := sse.PatchElementTempl( chatcomponents.ChatMessage(chatMsg, chatCfg), datastar.WithSelectorID("c4-chat-history"), diff --git a/features/snakegame/handlers.go b/features/snakegame/handlers.go index 7a96ed4..afb6f3a 100644 --- a/features/snakegame/handlers.go +++ b/features/snakegame/handlers.go @@ -137,15 +137,12 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess defer gameSub.Unsubscribe() //nolint:errcheck // Chat subscription (multiplayer only) - var chatCh chan *nats.Msg - var chatSub *nats.Subscription + var chatCh <-chan chat.Message + var cleanupChat func() if room != nil { - chatCh, chatSub, err = room.Subscribe() - if err != nil { - return - } - defer chatSub.Unsubscribe() //nolint:errcheck + chatCh, cleanupChat = room.Subscribe() + defer cleanupChat() } ctx := r.Context() @@ -168,11 +165,10 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess return } - case msg := <-chatCh: - if msg == nil { + case chatMsg, ok := <-chatCh: + if !ok { continue } - chatMsg, _ := room.Receive(msg.Data) err := sse.PatchElementTempl( chatcomponents.ChatMessage(chatMsg, chatCfg), datastar.WithSelectorID("snake-chat-history"),