Files
games/chat/chat.go
Ryan Hamamura bcb1fa3872
All checks were successful
CI / Deploy / test (pull_request) Successful in 14s
CI / Deploy / lint (pull_request) Successful in 25s
CI / Deploy / deploy (pull_request) Has been skipped
refactor: simplify chat subscription API
Room.Subscribe() now returns a channel of parsed Message structs
instead of raw NATS messages. The room handles NATS subscription
and message parsing internally, so callers no longer need to call
Receive() separately.
2026-03-03 09:45:56 -10:00

164 lines
4.0 KiB
Go

// Package chat provides a reusable chat room backed by NATS pub/sub
// with optional database persistence.
package chat
import (
"context"
"encoding/json"
"slices"
"sync"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/ryanhamamura/games/db/repository"
)
// Message is the wire format for chat messages over NATS.
type Message struct {
Nickname string `json:"nickname"`
Slot int `json:"slot"` // player slot/color index
Message string `json:"message"`
Time int64 `json:"time"` // unix millis, zero for ephemeral messages
}
const maxMessages = 50
// Room manages an in-memory message buffer and NATS pub/sub for a single
// chat room (typically one per game). When created with NewPersistentRoom,
// messages are automatically loaded from and saved to the database.
type Room struct {
subject string
nc *nats.Conn
messages []Message
mu sync.Mutex
// Optional persistence; nil for ephemeral rooms (e.g. snake).
queries *repository.Queries
roomID string
}
// NewRoom creates an ephemeral chat room with no database persistence.
func NewRoom(nc *nats.Conn, subject string) *Room {
return &Room{
subject: subject,
nc: nc,
}
}
// NewPersistentRoom creates a chat room backed by the database. It loads
// existing messages on creation and auto-saves new messages on Send.
func NewPersistentRoom(nc *nats.Conn, subject string, queries *repository.Queries, roomID string) *Room {
r := &Room{
subject: subject,
nc: nc,
queries: queries,
roomID: roomID,
}
r.messages = r.loadMessages()
return r
}
// Send publishes a message to the room's NATS subject and persists it
// if the room is backed by a database.
func (r *Room) Send(msg Message) {
if r.queries != nil {
r.saveMessage(msg)
}
data, err := json.Marshal(msg)
if err != nil {
log.Error().Err(err).Str("subject", r.subject).Msg("failed to marshal chat message")
return
}
if err := r.nc.Publish(r.subject, data); err != nil {
log.Error().Err(err).Str("subject", r.subject).Msg("failed to publish chat message")
}
}
// receive processes an incoming NATS message, appending it to the buffer.
func (r *Room) receive(data []byte) (Message, bool) {
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
return msg, false
}
r.mu.Lock()
r.messages = append(r.messages, msg)
if len(r.messages) > maxMessages {
r.messages = r.messages[len(r.messages)-maxMessages:]
}
r.mu.Unlock()
return msg, true
}
// Messages returns a snapshot of the current message buffer.
func (r *Room) Messages() []Message {
r.mu.Lock()
defer r.mu.Unlock()
snapshot := make([]Message, len(r.messages))
copy(snapshot, r.messages)
return snapshot
}
// Subscribe returns a channel of parsed messages and a cleanup function.
// The room handles NATS subscription internally and buffers messages.
func (r *Room) Subscribe() (<-chan Message, func()) {
natsCh := make(chan *nats.Msg, 64)
msgCh := make(chan Message, 64)
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
if err != nil {
close(msgCh)
return msgCh, func() {}
}
go func() {
for natsMsg := range natsCh {
if msg, ok := r.receive(natsMsg.Data); ok {
msgCh <- msg
}
}
close(msgCh)
}()
cleanup := func() {
_ = sub.Unsubscribe()
}
return msgCh, cleanup
}
func (r *Room) saveMessage(msg Message) {
err := r.queries.CreateChatMessage(context.Background(), repository.CreateChatMessageParams{
GameID: r.roomID,
Nickname: msg.Nickname,
Color: int64(msg.Slot),
Message: msg.Message,
CreatedAt: msg.Time,
})
if err != nil {
log.Error().Err(err).Str("room_id", r.roomID).Msg("failed to save chat message")
}
}
func (r *Room) loadMessages() []Message {
rows, err := r.queries.GetChatMessages(context.Background(), r.roomID)
if err != nil {
return nil
}
msgs := make([]Message, len(rows))
for i, row := range rows {
msgs[i] = Message{
Nickname: row.Nickname,
Slot: int(row.Color),
Message: row.Message,
Time: row.CreatedAt,
}
}
// DB returns newest-first; reverse for chronological display
slices.Reverse(msgs)
return msgs
}