Room.Subscribe() now returns a channel of parsed Message structs instead of raw NATS messages. The room handles NATS subscription and message parsing internally, so callers no longer need to call Receive() separately.
164 lines
4.0 KiB
Go
164 lines
4.0 KiB
Go
// Package chat provides a reusable chat room backed by NATS pub/sub
|
|
// with optional database persistence.
|
|
package chat
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"slices"
|
|
"sync"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"github.com/ryanhamamura/games/db/repository"
|
|
)
|
|
|
|
// Message is the wire format for chat messages over NATS.
|
|
type Message struct {
|
|
Nickname string `json:"nickname"`
|
|
Slot int `json:"slot"` // player slot/color index
|
|
Message string `json:"message"`
|
|
Time int64 `json:"time"` // unix millis, zero for ephemeral messages
|
|
}
|
|
|
|
const maxMessages = 50
|
|
|
|
// Room manages an in-memory message buffer and NATS pub/sub for a single
|
|
// chat room (typically one per game). When created with NewPersistentRoom,
|
|
// messages are automatically loaded from and saved to the database.
|
|
type Room struct {
|
|
subject string
|
|
nc *nats.Conn
|
|
messages []Message
|
|
mu sync.Mutex
|
|
|
|
// Optional persistence; nil for ephemeral rooms (e.g. snake).
|
|
queries *repository.Queries
|
|
roomID string
|
|
}
|
|
|
|
// NewRoom creates an ephemeral chat room with no database persistence.
|
|
func NewRoom(nc *nats.Conn, subject string) *Room {
|
|
return &Room{
|
|
subject: subject,
|
|
nc: nc,
|
|
}
|
|
}
|
|
|
|
// NewPersistentRoom creates a chat room backed by the database. It loads
|
|
// existing messages on creation and auto-saves new messages on Send.
|
|
func NewPersistentRoom(nc *nats.Conn, subject string, queries *repository.Queries, roomID string) *Room {
|
|
r := &Room{
|
|
subject: subject,
|
|
nc: nc,
|
|
queries: queries,
|
|
roomID: roomID,
|
|
}
|
|
r.messages = r.loadMessages()
|
|
return r
|
|
}
|
|
|
|
// Send publishes a message to the room's NATS subject and persists it
|
|
// if the room is backed by a database.
|
|
func (r *Room) Send(msg Message) {
|
|
if r.queries != nil {
|
|
r.saveMessage(msg)
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("subject", r.subject).Msg("failed to marshal chat message")
|
|
return
|
|
}
|
|
if err := r.nc.Publish(r.subject, data); err != nil {
|
|
log.Error().Err(err).Str("subject", r.subject).Msg("failed to publish chat message")
|
|
}
|
|
}
|
|
|
|
// receive processes an incoming NATS message, appending it to the buffer.
|
|
func (r *Room) receive(data []byte) (Message, bool) {
|
|
var msg Message
|
|
if err := json.Unmarshal(data, &msg); err != nil {
|
|
return msg, false
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.messages = append(r.messages, msg)
|
|
if len(r.messages) > maxMessages {
|
|
r.messages = r.messages[len(r.messages)-maxMessages:]
|
|
}
|
|
r.mu.Unlock()
|
|
|
|
return msg, true
|
|
}
|
|
|
|
// Messages returns a snapshot of the current message buffer.
|
|
func (r *Room) Messages() []Message {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
snapshot := make([]Message, len(r.messages))
|
|
copy(snapshot, r.messages)
|
|
return snapshot
|
|
}
|
|
|
|
// Subscribe returns a channel of parsed messages and a cleanup function.
|
|
// The room handles NATS subscription internally and buffers messages.
|
|
func (r *Room) Subscribe() (<-chan Message, func()) {
|
|
natsCh := make(chan *nats.Msg, 64)
|
|
msgCh := make(chan Message, 64)
|
|
|
|
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
|
|
if err != nil {
|
|
close(msgCh)
|
|
return msgCh, func() {}
|
|
}
|
|
|
|
go func() {
|
|
for natsMsg := range natsCh {
|
|
if msg, ok := r.receive(natsMsg.Data); ok {
|
|
msgCh <- msg
|
|
}
|
|
}
|
|
close(msgCh)
|
|
}()
|
|
|
|
cleanup := func() {
|
|
_ = sub.Unsubscribe()
|
|
}
|
|
|
|
return msgCh, cleanup
|
|
}
|
|
|
|
func (r *Room) saveMessage(msg Message) {
|
|
err := r.queries.CreateChatMessage(context.Background(), repository.CreateChatMessageParams{
|
|
GameID: r.roomID,
|
|
Nickname: msg.Nickname,
|
|
Color: int64(msg.Slot),
|
|
Message: msg.Message,
|
|
CreatedAt: msg.Time,
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Str("room_id", r.roomID).Msg("failed to save chat message")
|
|
}
|
|
}
|
|
|
|
func (r *Room) loadMessages() []Message {
|
|
rows, err := r.queries.GetChatMessages(context.Background(), r.roomID)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
msgs := make([]Message, len(rows))
|
|
for i, row := range rows {
|
|
msgs[i] = Message{
|
|
Nickname: row.Nickname,
|
|
Slot: int(row.Color),
|
|
Message: row.Message,
|
|
Time: row.CreatedAt,
|
|
}
|
|
}
|
|
// DB returns newest-first; reverse for chronological display
|
|
slices.Reverse(msgs)
|
|
return msgs
|
|
}
|