Compare commits
1 Commits
6d078f3adb
...
729db559ea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
729db559ea |
42
chat/chat.go
42
chat/chat.go
@@ -76,11 +76,12 @@ func (r *Room) Send(msg Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// receive processes an incoming NATS message, appending it to the buffer.
|
// Receive processes an incoming NATS message, appending it to the buffer.
|
||||||
func (r *Room) receive(data []byte) (Message, bool) {
|
// Returns the new message and a snapshot of all messages.
|
||||||
|
func (r *Room) Receive(data []byte) (Message, []Message) {
|
||||||
var msg Message
|
var msg Message
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
return msg, false
|
return msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
@@ -88,9 +89,11 @@ func (r *Room) receive(data []byte) (Message, bool) {
|
|||||||
if len(r.messages) > maxMessages {
|
if len(r.messages) > maxMessages {
|
||||||
r.messages = r.messages[len(r.messages)-maxMessages:]
|
r.messages = r.messages[len(r.messages)-maxMessages:]
|
||||||
}
|
}
|
||||||
|
snapshot := make([]Message, len(r.messages))
|
||||||
|
copy(snapshot, r.messages)
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
|
|
||||||
return msg, true
|
return msg, snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// Messages returns a snapshot of the current message buffer.
|
// Messages returns a snapshot of the current message buffer.
|
||||||
@@ -102,32 +105,15 @@ func (r *Room) Messages() []Message {
|
|||||||
return snapshot
|
return snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe returns a channel of parsed messages and a cleanup function.
|
// Subscribe creates a NATS channel subscription for the room's subject.
|
||||||
// The room handles NATS subscription internally and buffers messages.
|
// Caller is responsible for unsubscribing.
|
||||||
func (r *Room) Subscribe() (<-chan Message, func()) {
|
func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) {
|
||||||
natsCh := make(chan *nats.Msg, 64)
|
ch := make(chan *nats.Msg, 64)
|
||||||
msgCh := make(chan Message, 64)
|
sub, err := r.nc.ChanSubscribe(r.subject, ch)
|
||||||
|
|
||||||
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
close(msgCh)
|
return nil, nil, err
|
||||||
return msgCh, func() {}
|
|
||||||
}
|
}
|
||||||
|
return ch, sub, nil
|
||||||
go func() {
|
|
||||||
for natsMsg := range natsCh {
|
|
||||||
if msg, ok := r.receive(natsMsg.Data); ok {
|
|
||||||
msgCh <- msg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
close(msgCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
cleanup := func() {
|
|
||||||
_ = sub.Unsubscribe()
|
|
||||||
}
|
|
||||||
|
|
||||||
return msgCh, cleanup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) saveMessage(msg Message) {
|
func (r *Room) saveMessage(msg Message) {
|
||||||
|
|||||||
@@ -142,8 +142,11 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
|
|||||||
defer gameSub.Unsubscribe() //nolint:errcheck
|
defer gameSub.Unsubscribe() //nolint:errcheck
|
||||||
|
|
||||||
// Subscribe to chat messages
|
// Subscribe to chat messages
|
||||||
chatCh, cleanupChat := room.Subscribe()
|
chatCh, chatSub, err := room.Subscribe()
|
||||||
defer cleanupChat()
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer chatSub.Unsubscribe() //nolint:errcheck
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
for {
|
for {
|
||||||
@@ -158,7 +161,8 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
|
|||||||
if err := patchAll(); err != nil {
|
if err := patchAll(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case chatMsg := <-chatCh:
|
case msg := <-chatCh:
|
||||||
|
chatMsg, _ := room.Receive(msg.Data)
|
||||||
err := sse.PatchElementTempl(
|
err := sse.PatchElementTempl(
|
||||||
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
||||||
datastar.WithSelectorID("c4-chat-history"),
|
datastar.WithSelectorID("c4-chat-history"),
|
||||||
|
|||||||
@@ -148,12 +148,15 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
defer gameSub.Unsubscribe() //nolint:errcheck
|
defer gameSub.Unsubscribe() //nolint:errcheck
|
||||||
|
|
||||||
// Chat subscription (multiplayer only)
|
// Chat subscription (multiplayer only)
|
||||||
var chatCh <-chan chat.Message
|
var chatCh chan *nats.Msg
|
||||||
var cleanupChat func()
|
var chatSub *nats.Subscription
|
||||||
|
|
||||||
if room != nil {
|
if room != nil {
|
||||||
chatCh, cleanupChat = room.Subscribe()
|
chatCh, chatSub, err = room.Subscribe()
|
||||||
defer cleanupChat()
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer chatSub.Unsubscribe() //nolint:errcheck
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
@@ -181,10 +184,11 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
case chatMsg, ok := <-chatCh:
|
case msg := <-chatCh:
|
||||||
if !ok {
|
if msg == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
chatMsg, _ := room.Receive(msg.Data)
|
||||||
err := sse.PatchElementTempl(
|
err := sse.PatchElementTempl(
|
||||||
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
||||||
datastar.WithSelectorID("snake-chat-history"),
|
datastar.WithSelectorID("snake-chat-history"),
|
||||||
|
|||||||
Reference in New Issue
Block a user