refactor: simplify chat subscription API
All checks were successful
CI / Deploy / test (pull_request) Successful in 14s
CI / Deploy / lint (pull_request) Successful in 25s
CI / Deploy / deploy (pull_request) Has been skipped

Room.Subscribe() now returns a channel of parsed Message structs
instead of raw NATS messages. The room handles NATS subscription
and message parsing internally, so callers no longer need to call
Receive() separately.
This commit is contained in:
Ryan Hamamura
2026-03-03 09:45:56 -10:00
parent bf9a8755f0
commit bcb1fa3872
3 changed files with 37 additions and 31 deletions

View File

@@ -76,12 +76,11 @@ func (r *Room) Send(msg Message) {
} }
} }
// Receive processes an incoming NATS message, appending it to the buffer. // receive processes an incoming NATS message, appending it to the buffer.
// Returns the new message and a snapshot of all messages. func (r *Room) receive(data []byte) (Message, bool) {
func (r *Room) Receive(data []byte) (Message, []Message) {
var msg Message var msg Message
if err := json.Unmarshal(data, &msg); err != nil { if err := json.Unmarshal(data, &msg); err != nil {
return msg, nil return msg, false
} }
r.mu.Lock() r.mu.Lock()
@@ -89,11 +88,9 @@ func (r *Room) Receive(data []byte) (Message, []Message) {
if len(r.messages) > maxMessages { if len(r.messages) > maxMessages {
r.messages = r.messages[len(r.messages)-maxMessages:] r.messages = r.messages[len(r.messages)-maxMessages:]
} }
snapshot := make([]Message, len(r.messages))
copy(snapshot, r.messages)
r.mu.Unlock() r.mu.Unlock()
return msg, snapshot return msg, true
} }
// Messages returns a snapshot of the current message buffer. // Messages returns a snapshot of the current message buffer.
@@ -105,15 +102,32 @@ func (r *Room) Messages() []Message {
return snapshot return snapshot
} }
// Subscribe creates a NATS channel subscription for the room's subject. // Subscribe returns a channel of parsed messages and a cleanup function.
// Caller is responsible for unsubscribing. // The room handles NATS subscription internally and buffers messages.
func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) { func (r *Room) Subscribe() (<-chan Message, func()) {
ch := make(chan *nats.Msg, 64) natsCh := make(chan *nats.Msg, 64)
sub, err := r.nc.ChanSubscribe(r.subject, ch) msgCh := make(chan Message, 64)
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
if err != nil { if err != nil {
return nil, nil, err close(msgCh)
return msgCh, func() {}
} }
return ch, sub, nil
go func() {
for natsMsg := range natsCh {
if msg, ok := r.receive(natsMsg.Data); ok {
msgCh <- msg
}
}
close(msgCh)
}()
cleanup := func() {
_ = sub.Unsubscribe()
}
return msgCh, cleanup
} }
func (r *Room) saveMessage(msg Message) { func (r *Room) saveMessage(msg Message) {

View File

@@ -132,11 +132,8 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Subscribe to chat messages // Subscribe to chat messages
chatCh, chatSub, err := room.Subscribe() chatCh, cleanupChat := room.Subscribe()
if err != nil { defer cleanupChat()
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
ctx := r.Context() ctx := r.Context()
for { for {
@@ -147,8 +144,7 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
if err := patchAll(); err != nil { if err := patchAll(); err != nil {
return return
} }
case msg := <-chatCh: case chatMsg := <-chatCh:
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("c4-chat-history"), datastar.WithSelectorID("c4-chat-history"),

View File

@@ -137,15 +137,12 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
defer gameSub.Unsubscribe() //nolint:errcheck defer gameSub.Unsubscribe() //nolint:errcheck
// Chat subscription (multiplayer only) // Chat subscription (multiplayer only)
var chatCh chan *nats.Msg var chatCh <-chan chat.Message
var chatSub *nats.Subscription var cleanupChat func()
if room != nil { if room != nil {
chatCh, chatSub, err = room.Subscribe() chatCh, cleanupChat = room.Subscribe()
if err != nil { defer cleanupChat()
return
}
defer chatSub.Unsubscribe() //nolint:errcheck
} }
ctx := r.Context() ctx := r.Context()
@@ -168,11 +165,10 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
return return
} }
case msg := <-chatCh: case chatMsg, ok := <-chatCh:
if msg == nil { if !ok {
continue continue
} }
chatMsg, _ := room.Receive(msg.Data)
err := sse.PatchElementTempl( err := sse.PatchElementTempl(
chatcomponents.ChatMessage(chatMsg, chatCfg), chatcomponents.ChatMessage(chatMsg, chatCfg),
datastar.WithSelectorID("snake-chat-history"), datastar.WithSelectorID("snake-chat-history"),