Move SaveMessage/LoadMessages logic into Room as private methods. NewPersistentRoom auto-loads history and auto-saves on Send, removing the need for handlers to coordinate persistence separately.
150 lines
3.9 KiB
Go
150 lines
3.9 KiB
Go
// Package chat provides a reusable chat room backed by NATS pub/sub
|
|
// with optional database persistence.
|
|
package chat
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"slices"
|
|
"sync"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"github.com/ryanhamamura/games/db/repository"
|
|
)
|
|
|
|
// Message is the wire format for chat messages over NATS.
|
|
type Message struct {
|
|
Nickname string `json:"nickname"`
|
|
Slot int `json:"slot"` // player slot/color index
|
|
Message string `json:"message"`
|
|
Time int64 `json:"time"` // unix millis, zero for ephemeral messages
|
|
}
|
|
|
|
const maxMessages = 50
|
|
|
|
// Room manages an in-memory message buffer and NATS pub/sub for a single
|
|
// chat room (typically one per game). When created with NewPersistentRoom,
|
|
// messages are automatically loaded from and saved to the database.
|
|
type Room struct {
|
|
subject string
|
|
nc *nats.Conn
|
|
messages []Message
|
|
mu sync.Mutex
|
|
|
|
// Optional persistence; nil for ephemeral rooms (e.g. snake).
|
|
queries *repository.Queries
|
|
roomID string
|
|
}
|
|
|
|
// NewRoom creates an ephemeral chat room with no database persistence.
|
|
func NewRoom(nc *nats.Conn, subject string) *Room {
|
|
return &Room{
|
|
subject: subject,
|
|
nc: nc,
|
|
}
|
|
}
|
|
|
|
// NewPersistentRoom creates a chat room backed by the database. It loads
|
|
// existing messages on creation and auto-saves new messages on Send.
|
|
func NewPersistentRoom(nc *nats.Conn, subject string, queries *repository.Queries, roomID string) *Room {
|
|
r := &Room{
|
|
subject: subject,
|
|
nc: nc,
|
|
queries: queries,
|
|
roomID: roomID,
|
|
}
|
|
r.messages = r.loadMessages()
|
|
return r
|
|
}
|
|
|
|
// Send publishes a message to the room's NATS subject and persists it
|
|
// if the room is backed by a database.
|
|
func (r *Room) Send(msg Message) {
|
|
if r.queries != nil {
|
|
r.saveMessage(msg)
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("subject", r.subject).Msg("failed to marshal chat message")
|
|
return
|
|
}
|
|
if err := r.nc.Publish(r.subject, data); err != nil {
|
|
log.Error().Err(err).Str("subject", r.subject).Msg("failed to publish chat message")
|
|
}
|
|
}
|
|
|
|
// Receive processes an incoming NATS message, appending it to the buffer.
|
|
// Returns the new message and a snapshot of all messages.
|
|
func (r *Room) Receive(data []byte) (Message, []Message) {
|
|
var msg Message
|
|
if err := json.Unmarshal(data, &msg); err != nil {
|
|
return msg, nil
|
|
}
|
|
|
|
r.mu.Lock()
|
|
r.messages = append(r.messages, msg)
|
|
if len(r.messages) > maxMessages {
|
|
r.messages = r.messages[len(r.messages)-maxMessages:]
|
|
}
|
|
snapshot := make([]Message, len(r.messages))
|
|
copy(snapshot, r.messages)
|
|
r.mu.Unlock()
|
|
|
|
return msg, snapshot
|
|
}
|
|
|
|
// Messages returns a snapshot of the current message buffer.
|
|
func (r *Room) Messages() []Message {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
snapshot := make([]Message, len(r.messages))
|
|
copy(snapshot, r.messages)
|
|
return snapshot
|
|
}
|
|
|
|
// Subscribe creates a NATS channel subscription for the room's subject.
|
|
// Caller is responsible for unsubscribing.
|
|
func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) {
|
|
ch := make(chan *nats.Msg, 64)
|
|
sub, err := r.nc.ChanSubscribe(r.subject, ch)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return ch, sub, nil
|
|
}
|
|
|
|
func (r *Room) saveMessage(msg Message) {
|
|
err := r.queries.CreateChatMessage(context.Background(), repository.CreateChatMessageParams{
|
|
GameID: r.roomID,
|
|
Nickname: msg.Nickname,
|
|
Color: int64(msg.Slot),
|
|
Message: msg.Message,
|
|
CreatedAt: msg.Time,
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Str("room_id", r.roomID).Msg("failed to save chat message")
|
|
}
|
|
}
|
|
|
|
func (r *Room) loadMessages() []Message {
|
|
rows, err := r.queries.GetChatMessages(context.Background(), r.roomID)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
msgs := make([]Message, len(rows))
|
|
for i, row := range rows {
|
|
msgs[i] = Message{
|
|
Nickname: row.Nickname,
|
|
Slot: int(row.Color),
|
|
Message: row.Message,
|
|
Time: row.CreatedAt,
|
|
}
|
|
}
|
|
// DB returns newest-first; reverse for chronological display
|
|
slices.Reverse(msgs)
|
|
return msgs
|
|
}
|