Compare commits
3 Commits
729db559ea
...
6d078f3adb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d078f3adb | ||
| ffbff8cca5 | |||
|
|
bcb1fa3872 |
42
chat/chat.go
42
chat/chat.go
@@ -76,12 +76,11 @@ func (r *Room) Send(msg Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receive processes an incoming NATS message, appending it to the buffer.
|
// receive processes an incoming NATS message, appending it to the buffer.
|
||||||
// Returns the new message and a snapshot of all messages.
|
func (r *Room) receive(data []byte) (Message, bool) {
|
||||||
func (r *Room) Receive(data []byte) (Message, []Message) {
|
|
||||||
var msg Message
|
var msg Message
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
return msg, nil
|
return msg, false
|
||||||
}
|
}
|
||||||
|
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
@@ -89,11 +88,9 @@ func (r *Room) Receive(data []byte) (Message, []Message) {
|
|||||||
if len(r.messages) > maxMessages {
|
if len(r.messages) > maxMessages {
|
||||||
r.messages = r.messages[len(r.messages)-maxMessages:]
|
r.messages = r.messages[len(r.messages)-maxMessages:]
|
||||||
}
|
}
|
||||||
snapshot := make([]Message, len(r.messages))
|
|
||||||
copy(snapshot, r.messages)
|
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
|
|
||||||
return msg, snapshot
|
return msg, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Messages returns a snapshot of the current message buffer.
|
// Messages returns a snapshot of the current message buffer.
|
||||||
@@ -105,15 +102,32 @@ func (r *Room) Messages() []Message {
|
|||||||
return snapshot
|
return snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe creates a NATS channel subscription for the room's subject.
|
// Subscribe returns a channel of parsed messages and a cleanup function.
|
||||||
// Caller is responsible for unsubscribing.
|
// The room handles NATS subscription internally and buffers messages.
|
||||||
func (r *Room) Subscribe() (chan *nats.Msg, *nats.Subscription, error) {
|
func (r *Room) Subscribe() (<-chan Message, func()) {
|
||||||
ch := make(chan *nats.Msg, 64)
|
natsCh := make(chan *nats.Msg, 64)
|
||||||
sub, err := r.nc.ChanSubscribe(r.subject, ch)
|
msgCh := make(chan Message, 64)
|
||||||
|
|
||||||
|
sub, err := r.nc.ChanSubscribe(r.subject, natsCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
close(msgCh)
|
||||||
|
return msgCh, func() {}
|
||||||
}
|
}
|
||||||
return ch, sub, nil
|
|
||||||
|
go func() {
|
||||||
|
for natsMsg := range natsCh {
|
||||||
|
if msg, ok := r.receive(natsMsg.Data); ok {
|
||||||
|
msgCh <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(msgCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
cleanup := func() {
|
||||||
|
_ = sub.Unsubscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
return msgCh, cleanup
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Room) saveMessage(msg Message) {
|
func (r *Room) saveMessage(msg Message) {
|
||||||
|
|||||||
@@ -118,11 +118,21 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
|
|||||||
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
|
return sse.PatchElementTempl(pages.GameContent(g, myColor, room.Messages(), chatCfg))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send initial render
|
sendPing := func() error {
|
||||||
|
return sse.PatchSignals([]byte(fmt.Sprintf(`{"lastPing":%d}`, time.Now().UnixMilli())))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send initial render and ping
|
||||||
|
if err := sendPing(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if err := patchAll(); err != nil {
|
if err := patchAll(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
heartbeat := time.NewTicker(15 * time.Second)
|
||||||
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
// Subscribe to game state updates
|
// Subscribe to game state updates
|
||||||
gameCh := make(chan *nats.Msg, 64)
|
gameCh := make(chan *nats.Msg, 64)
|
||||||
gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh)
|
gameSub, err := nc.ChanSubscribe(connect4.GameSubject(gameID), gameCh)
|
||||||
@@ -132,23 +142,23 @@ func HandleGameEvents(store *connect4.Store, nc *nats.Conn, sm *scs.SessionManag
|
|||||||
defer gameSub.Unsubscribe() //nolint:errcheck
|
defer gameSub.Unsubscribe() //nolint:errcheck
|
||||||
|
|
||||||
// Subscribe to chat messages
|
// Subscribe to chat messages
|
||||||
chatCh, chatSub, err := room.Subscribe()
|
chatCh, cleanupChat := room.Subscribe()
|
||||||
if err != nil {
|
defer cleanupChat()
|
||||||
return
|
|
||||||
}
|
|
||||||
defer chatSub.Unsubscribe() //nolint:errcheck
|
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-heartbeat.C:
|
||||||
|
if err := sendPing(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
case <-gameCh:
|
case <-gameCh:
|
||||||
if err := patchAll(); err != nil {
|
if err := patchAll(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case msg := <-chatCh:
|
case chatMsg := <-chatCh:
|
||||||
chatMsg, _ := room.Receive(msg.Data)
|
|
||||||
err := sse.PatchElementTempl(
|
err := sse.PatchElementTempl(
|
||||||
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
||||||
datastar.WithSelectorID("c4-chat-history"),
|
datastar.WithSelectorID("c4-chat-history"),
|
||||||
|
|||||||
@@ -15,9 +15,10 @@ templ GamePage(g *connect4.Game, myColor int, messages []chat.Message, chatCfg c
|
|||||||
@layouts.Base("Connect 4") {
|
@layouts.Base("Connect 4") {
|
||||||
<main
|
<main
|
||||||
class="flex flex-col items-center gap-4 p-4"
|
class="flex flex-col items-center gap-4 p-4"
|
||||||
data-signals="{chatMsg: ''}"
|
data-signals="{chatMsg: '', lastPing: 0}"
|
||||||
data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) }
|
data-init={ fmt.Sprintf("@get('/games/%s/events',{requestCancellation:'disabled'})", g.ID) }
|
||||||
>
|
>
|
||||||
|
@sharedcomponents.ConnectionIndicator()
|
||||||
@GameContent(g, myColor, messages, chatCfg)
|
@GameContent(g, myColor, messages, chatCfg)
|
||||||
</main>
|
</main>
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,6 +44,21 @@ templ NicknamePrompt(returnPath string) {
|
|||||||
</main>
|
</main>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectionIndicator shows a small dot indicating SSE connection status.
|
||||||
|
// It requires a `lastPing` signal (unix ms timestamp) to be set by the server.
|
||||||
|
templ ConnectionIndicator() {
|
||||||
|
<div
|
||||||
|
id="connection-indicator"
|
||||||
|
class="fixed top-2 right-2 flex items-center gap-1 text-xs text-gray-500"
|
||||||
|
title="Connection status"
|
||||||
|
>
|
||||||
|
<span
|
||||||
|
class="w-2 h-2 rounded-full transition-colors duration-300"
|
||||||
|
data-class="{'bg-green-500': Date.now() - $lastPing < 20000, 'bg-red-500': Date.now() - $lastPing >= 20000}"
|
||||||
|
></span>
|
||||||
|
</div>
|
||||||
|
}
|
||||||
|
|
||||||
templ GameJoinPrompt(loginURL string, registerURL string, gamePath string) {
|
templ GameJoinPrompt(loginURL string, registerURL string, gamePath string) {
|
||||||
<main class="max-w-sm mx-auto mt-8 text-center">
|
<main class="max-w-sm mx-auto mt-8 text-center">
|
||||||
<h1 class="text-3xl font-bold">Join Game</h1>
|
<h1 class="text-3xl font-bold">Join Game</h1>
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/alexedwards/scs/v2"
|
"github.com/alexedwards/scs/v2"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
@@ -123,11 +124,21 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID))
|
return sse.PatchElementTempl(pages.GameContent(sg, mySlot, chatMessages(), chatCfg, gameID))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send initial render
|
sendPing := func() error {
|
||||||
|
return sse.PatchSignals([]byte(fmt.Sprintf(`{"lastPing":%d}`, time.Now().UnixMilli())))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send initial render and ping
|
||||||
|
if err := sendPing(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if err := patchAll(); err != nil {
|
if err := patchAll(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
heartbeat := time.NewTicker(15 * time.Second)
|
||||||
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
// Subscribe to game updates via NATS
|
// Subscribe to game updates via NATS
|
||||||
gameCh := make(chan *nats.Msg, 64)
|
gameCh := make(chan *nats.Msg, 64)
|
||||||
gameSub, err := nc.ChanSubscribe(snake.GameSubject(gameID), gameCh)
|
gameSub, err := nc.ChanSubscribe(snake.GameSubject(gameID), gameCh)
|
||||||
@@ -137,15 +148,12 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
defer gameSub.Unsubscribe() //nolint:errcheck
|
defer gameSub.Unsubscribe() //nolint:errcheck
|
||||||
|
|
||||||
// Chat subscription (multiplayer only)
|
// Chat subscription (multiplayer only)
|
||||||
var chatCh chan *nats.Msg
|
var chatCh <-chan chat.Message
|
||||||
var chatSub *nats.Subscription
|
var cleanupChat func()
|
||||||
|
|
||||||
if room != nil {
|
if room != nil {
|
||||||
chatCh, chatSub, err = room.Subscribe()
|
chatCh, cleanupChat = room.Subscribe()
|
||||||
if err != nil {
|
defer cleanupChat()
|
||||||
return
|
|
||||||
}
|
|
||||||
defer chatSub.Unsubscribe() //nolint:errcheck
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
@@ -154,6 +162,11 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case <-heartbeat.C:
|
||||||
|
if err := sendPing(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
case <-gameCh:
|
case <-gameCh:
|
||||||
// Drain backed-up game updates
|
// Drain backed-up game updates
|
||||||
for {
|
for {
|
||||||
@@ -168,11 +181,10 @@ func HandleSnakeEvents(snakeStore *snake.SnakeStore, nc *nats.Conn, sm *scs.Sess
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
case msg := <-chatCh:
|
case chatMsg, ok := <-chatCh:
|
||||||
if msg == nil {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
chatMsg, _ := room.Receive(msg.Data)
|
|
||||||
err := sse.PatchElementTempl(
|
err := sse.PatchElementTempl(
|
||||||
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
chatcomponents.ChatMessage(chatMsg, chatCfg),
|
||||||
datastar.WithSelectorID("snake-chat-history"),
|
datastar.WithSelectorID("snake-chat-history"),
|
||||||
|
|||||||
@@ -32,11 +32,12 @@ templ GamePage(sg *snake.SnakeGame, mySlot int, messages []chat.Message, chatCfg
|
|||||||
@layouts.Base("Snake") {
|
@layouts.Base("Snake") {
|
||||||
<main
|
<main
|
||||||
class="snake-wrapper flex flex-col items-center gap-4 p-4"
|
class="snake-wrapper flex flex-col items-center gap-4 p-4"
|
||||||
data-signals={ `{"chatMsg":""}` }
|
data-signals={ `{"chatMsg":"","lastPing":0}` }
|
||||||
data-init={ fmt.Sprintf("@get('/snake/%s/events',{requestCancellation:'disabled'})", gameID) }
|
data-init={ fmt.Sprintf("@get('/snake/%s/events',{requestCancellation:'disabled'})", gameID) }
|
||||||
data-on:keydown__throttle.100ms={ keydownScript(gameID) }
|
data-on:keydown__throttle.100ms={ keydownScript(gameID) }
|
||||||
tabindex="0"
|
tabindex="0"
|
||||||
>
|
>
|
||||||
|
@components.ConnectionIndicator()
|
||||||
@GameContent(sg, mySlot, messages, chatCfg, gameID)
|
@GameContent(sg, mySlot, messages, chatCfg, gameID)
|
||||||
</main>
|
</main>
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user