From 88bd0f31df31997a270583f55868c15ceb92b6e5 Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Fri, 16 Jan 2026 00:50:05 -1000 Subject: [PATCH] feat: add NATS chatroom example with embedded server Demonstrates pub/sub messaging as an alternative to custom Rooms implementation. Uses delaneyj/toolbelt/embeddednats to run NATS with JetStream inside the binary - no external server required. --- .gitignore | 4 + go.mod | 23 +- go.sum | 51 ++- internal/examples/nats-chatroom/README.md | 109 ++++++ internal/examples/nats-chatroom/main.go | 409 ++++++++++++++++++++++ 5 files changed, 585 insertions(+), 11 deletions(-) create mode 100644 internal/examples/nats-chatroom/README.md create mode 100644 internal/examples/nats-chatroom/main.go diff --git a/.gitignore b/.gitignore index f45c3d1..3d6612f 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,7 @@ internal/examples/picocss/picocss internal/examples/plugins/plugins internal/examples/realtimechart/realtimechart internal/examples/shakespeare/shakespeare +internal/examples/nats-chatroom/nats-chatroom + +# NATS data directory +data/ diff --git a/go.mod b/go.mod index 7668c1a..e744470 100644 --- a/go.mod +++ b/go.mod @@ -8,17 +8,32 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/alexedwards/scs/sqlite3store v0.0.0-20251002162104-209de6e426de github.com/alexedwards/scs/v2 v2.9.0 + github.com/delaneyj/toolbelt v0.9.1 github.com/mattn/go-sqlite3 v1.14.32 + github.com/nats-io/nats.go v1.48.0 github.com/starfederation/datastar-go v1.0.3 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 ) require ( github.com/CAFxX/httpcompression v0.0.9 // indirect github.com/andybalholm/brotli v1.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/klauspost/compress v1.18.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/antithesishq/antithesis-sdk-go v0.5.0 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/google/go-tpm v0.9.7 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect + github.com/nats-io/jwt/v2 v2.8.0 // indirect + github.com/nats-io/nats-server/v2 v2.12.2 // indirect + github.com/nats-io/nkeys v0.4.12 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/crypto v0.45.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/time v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 470891f..05cc239 100644 --- a/go.sum +++ b/go.sum @@ -9,23 +9,52 @@ github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gv github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/antithesishq/antithesis-sdk-go v0.5.0 h1:cudCFF83pDDANcXFzkQPUHHedfnnIbUO3JMr9fqwFJs= +github.com/antithesishq/antithesis-sdk-go v0.5.0/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/delaneyj/toolbelt v0.9.1 h1:QJComn2qoaQ4azl5uRkGpdHSO9e+JtoxDTXCiQHvH8o= +github.com/delaneyj/toolbelt v0.9.1/go.mod h1:eNXpPuThjTD4tpRNCBl4JEz9jdg9LpyzNuyG+stnIbs= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= +github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA= +github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.2 h1:4TEQd0Y4zvcW0IsVxjlXnRso1hBkQl3TS0BI+SxgPhE= +github.com/nats-io/nats-server/v2 v2.12.2/go.mod h1:j1AAttYeu7WnvD8HLJ+WWKNMSyxsqmZ160pNtCQRMyE= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= +github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/starfederation/datastar-go v1.0.3 h1:DnzgsJ6tDHDM6y5Nxsk0AGW/m8SyKch2vQg3P1xGTcU= github.com/starfederation/datastar-go v1.0.3/go.mod h1:stm83LQkhZkwa5GzzdPEN6dLuu8FVwxIv0w1DYkbD3w= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -34,8 +63,8 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= @@ -43,8 +72,16 @@ github.com/valyala/gozstd v1.20.1 h1:xPnnnvjmaDDitMFfDxmQ4vpx0+3CdTg2o3lALvXTU/g github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/examples/nats-chatroom/README.md b/internal/examples/nats-chatroom/README.md new file mode 100644 index 0000000..9233588 --- /dev/null +++ b/internal/examples/nats-chatroom/README.md @@ -0,0 +1,109 @@ +# NATS Chatroom Example (Embedded) + +A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`. + +Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required. + +## Key Differences from Original Chatroom + +| Aspect | Original (`../chatroom`) | This Example | +|--------|-------------------------|--------------| +| Pub/sub | Custom `Rooms` struct (~160 lines) | NATS subjects | +| Member tracking | Manual `map[TU]Syncable` | NATS handles subscribers | +| Publish timing | Ticker every 100ms + dirty flag | Instant delivery | +| Durability | None (in-memory) | JetStream persists to disk | +| Multi-instance | Not supported | Works across server instances | +| External deps | None | **None** (NATS embedded in binary) | + +## Run the Example + +```bash +go run ./internal/examples/nats-chatroom +``` + +That's it. No separate NATS server needed. + +Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients. + +## How Embedded NATS Works + +```go +// Start embedded NATS server (JetStream enabled by default) +ns, err := embeddednats.New(ctx, + embeddednats.WithDirectory("./data/nats"), +) +ns.WaitForServer() + +// Get client connection to embedded server +nc, err := ns.Client() +``` + +Data is persisted to `./data/nats/` for JetStream durability. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────┐ +│ Single Binary │ +│ │ +│ Browser A Embedded NATS Browser B │ +│ │ │ │ │ +│ │-- Via Action ---> │ │ │ +│ │ (Send msg) │ │ │ +│ │ │ │ │ +│ │ nc.Publish() │ │ +│ │ "chat.room.Go" │ │ +│ │ │ │ │ +│ │<-- Subscribe -----|---- Subscribe --->│ │ +│ │ callback │ callback │ │ +│ │ │ │ │ +│ │-- c.Sync() ------>│<--- c.Sync() -----| │ +│ │ (SSE) │ (SSE) │ │ +│ │ +└─────────────────────────────────────────────────────────┘ +``` + +## JetStream Durability + +Messages persist to disk via JetStream: + +```go +js.AddStream(&nats.StreamConfig{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + MaxMsgs: 1000, // Keep last 1000 messages + MaxAge: 24 * time.Hour, +}) +``` + +Stop and restart the app - chat history survives. + +## Code Comparison + +**Original chatroom - 160+ lines of custom pub/sub:** +- `Rooms` struct with named rooms +- `Room` with member tracking, mutex, dirty flag +- Ticker-based publish loop +- Manual join/leave channels + +**This example - ~60 lines of NATS integration:** +- `embeddednats.New()` starts the server +- `nc.Subscribe(subject, handler)` for receiving +- `nc.Publish(subject, data)` for sending +- NATS handles delivery, no polling + +## Next Steps + +If this pattern proves useful, it could be promoted to a Via plugin: + +```go +// Hypothetical future API +v.Config(via.WithEmbeddedNATS("./data/nats")) + +// In page init +c.Subscribe("events.user.*", func(data []byte) { + c.Sync() +}) + +c.Publish("events.user.login", userData) +``` diff --git a/internal/examples/nats-chatroom/main.go b/internal/examples/nats-chatroom/main.go new file mode 100644 index 0000000..bed5bf0 --- /dev/null +++ b/internal/examples/nats-chatroom/main.go @@ -0,0 +1,409 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "math/rand" + "sync" + "time" + + "github.com/delaneyj/toolbelt/embeddednats" + "github.com/nats-io/nats.go" + "github.com/ryanhamamura/via" + "github.com/ryanhamamura/via/h" +) + +var ( + WithSignal = via.WithSignal +) + +// ChatMessage represents a message in a chat room +type ChatMessage struct { + User UserInfo `json:"user"` + Message string `json:"message"` + Time int64 `json:"time"` +} + +// UserInfo identifies a chat participant +type UserInfo struct { + Name string `json:"name"` + Emoji string `json:"emoji"` +} + +func (u *UserInfo) Avatar() h.H { + return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji)) +} + +// NATSChatroom manages NATS connections and per-context subscriptions +type NATSChatroom struct { + nc *nats.Conn + js nats.JetStreamContext + subs map[string]*nats.Subscription + mu sync.RWMutex +} + +func NewNATSChatroom(nc *nats.Conn) (*NATSChatroom, error) { + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + // Create or update the CHAT stream for durability + _, err = js.AddStream(&nats.StreamConfig{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + Retention: nats.LimitsPolicy, + MaxMsgs: 1000, // Keep last 1000 messages per room + MaxAge: 24 * time.Hour, + }) + if err != nil && err != nats.ErrStreamNameAlreadyInUse { + // Stream might already exist, that's fine + log.Printf("Note: stream creation returned: %v", err) + } + + return &NATSChatroom{ + nc: nc, + js: js, + subs: make(map[string]*nats.Subscription), + }, nil +} + +// Subscribe creates a subscription for a context to a room +func (chat *NATSChatroom) Subscribe(ctxID, room string, handler func(msg *ChatMessage)) error { + subject := "chat.room." + room + + sub, err := chat.nc.Subscribe(subject, func(m *nats.Msg) { + var msg ChatMessage + if err := json.Unmarshal(m.Data, &msg); err != nil { + log.Printf("Failed to unmarshal message: %v", err) + return + } + handler(&msg) + }) + if err != nil { + return err + } + + chat.mu.Lock() + // Clean up old subscription if exists + if old, exists := chat.subs[ctxID]; exists { + old.Unsubscribe() + } + chat.subs[ctxID] = sub + chat.mu.Unlock() + + return nil +} + +// Unsubscribe removes a context's subscription +func (chat *NATSChatroom) Unsubscribe(ctxID string) { + chat.mu.Lock() + defer chat.mu.Unlock() + if sub, exists := chat.subs[ctxID]; exists { + sub.Unsubscribe() + delete(chat.subs, ctxID) + } +} + +// Publish sends a message to a room +func (chat *NATSChatroom) Publish(room string, msg ChatMessage) error { + subject := "chat.room." + room + data, err := json.Marshal(msg) + if err != nil { + return err + } + return chat.nc.Publish(subject, data) +} + +// GetHistory retrieves recent messages from JetStream +func (chat *NATSChatroom) GetHistory(room string, limit int) ([]ChatMessage, error) { + subject := "chat.room." + room + + // Create an ephemeral consumer to replay messages + sub, err := chat.js.SubscribeSync(subject, nats.DeliverLast()) + if err != nil { + // No messages yet + return nil, nil + } + defer sub.Unsubscribe() + + var messages []ChatMessage + for i := 0; i < limit; i++ { + msg, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + break + } + var chatMsg ChatMessage + if err := json.Unmarshal(msg.Data, &chatMsg); err == nil { + messages = append(messages, chatMsg) + } + } + return messages, nil +} + +func (chat *NATSChatroom) Close() { + chat.mu.Lock() + for _, sub := range chat.subs { + sub.Unsubscribe() + } + chat.mu.Unlock() + chat.nc.Close() +} + +var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"} + +func main() { + ctx := context.Background() + + // Start embedded NATS server (JetStream enabled by default) + ns, err := embeddednats.New(ctx, + embeddednats.WithDirectory("./data/nats"), + ) + if err != nil { + log.Fatalf("Failed to start embedded NATS: %v", err) + } + ns.WaitForServer() + + // Get client connection to embedded server + nc, err := ns.Client() + if err != nil { + log.Fatalf("Failed to connect to embedded NATS: %v", err) + } + + chat, err := NewNATSChatroom(nc) + if err != nil { + log.Fatalf("Failed to initialize chatroom: %v", err) + } + defer chat.Close() + + v := via.New() + v.Config(via.Options{ + DevMode: true, + DocumentTitle: "NATS Chat", + LogLvl: via.LogLevelInfo, + ServerAddress: ":7331", + }) + + v.AppendToHead( + h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")), + h.StyleEl(h.Raw(` + body { margin: 0; } + main { + display: flex; + flex-direction: column; + height: 100vh; + } + nav[role="tab-control"] ul li a[aria-current="page"] { + background-color: var(--pico-primary-background); + color: var(--pico-primary-inverse); + border-bottom: 2px solid var(--pico-primary); + } + .chat-message { display: flex; gap: 0.75rem; margin-bottom: 0.5rem; } + .avatar { + width: 2rem; + height: 2rem; + border-radius: 50%; + background: var(--pico-muted-border-color); + display: grid; + place-items: center; + font-size: 1.5rem; + flex-shrink: 0; + } + .bubble { flex: 1; } + .bubble p { margin: 0; } + .chat-history { + flex: 1; + overflow-y: auto; + padding: 1rem; + padding-bottom: calc(88px + env(safe-area-inset-bottom)); + } + .chat-input { + position: fixed; + left: 0; + right: 0; + bottom: 0; + background: var(--pico-background-color); + display: flex; + align-items: center; + gap: 0.75rem; + padding: 0.75rem 1rem calc(0.75rem + env(safe-area-inset-bottom)); + border-top: 1px solid var(--pico-muted-border-color); + } + .chat-input fieldset { + flex: 1; + margin: 0; + } + .nats-badge { + background: #27AAE1; + color: white; + padding: 0.25rem 0.5rem; + border-radius: 4px; + font-size: 0.75rem; + margin-left: auto; + } + `)), + h.Script(h.Raw(` + function scrollChatToBottom() { + const chatHistory = document.querySelector('.chat-history'); + if (chatHistory) chatHistory.scrollTop = chatHistory.scrollHeight; + } + `)), + ) + + v.Page("/", func(c *via.Context) { + currentUser := randUser() + roomSignal := c.Signal("Go") + statement := c.Signal("") + + // Local message cache for this context + var messages []ChatMessage + var messagesMu sync.Mutex + currentRoom := "Go" + + // Context ID for subscription management + ctxID := randID() + + // Subscribe to current room + subscribeToRoom := func(room string) { + chat.Subscribe(ctxID, room, func(msg *ChatMessage) { + messagesMu.Lock() + messages = append(messages, *msg) + // Keep only last 50 messages + if len(messages) > 50 { + messages = messages[len(messages)-50:] + } + messagesMu.Unlock() + c.Sync() + }) + currentRoom = room + } + + subscribeToRoom("Go") + + switchRoom := c.Action(func() { + newRoom := roomSignal.String() + if newRoom != currentRoom { + messagesMu.Lock() + messages = nil // Clear messages for new room + messagesMu.Unlock() + subscribeToRoom(newRoom) + c.Sync() + } + }) + + say := c.Action(func() { + msg := statement.String() + if msg == "" { + msg = randomDevQuote() + } + statement.SetValue("") + + chat.Publish(currentRoom, ChatMessage{ + User: currentUser, + Message: msg, + Time: time.Now().UnixMilli(), + }) + }) + + c.View(func() h.H { + // Build room tabs + var tabs []h.H + for _, name := range roomNames { + isCurrent := name == currentRoom + tabs = append(tabs, h.Li( + h.A( + h.If(isCurrent, h.Attr("aria-current", "page")), + h.Text(name), + switchRoom.OnClick(WithSignal(roomSignal, name)), + ), + )) + } + + // Build message list + messagesMu.Lock() + chatHistoryChildren := []h.H{ + h.Class("chat-history"), + h.Script(h.Raw(`new MutationObserver(()=>scrollChatToBottom()).observe(document.querySelector('.chat-history'), {childList:true})`)), + } + for _, msg := range messages { + chatHistoryChildren = append(chatHistoryChildren, + h.Div(h.Class("chat-message"), + h.Div(h.Class("avatar"), h.Attr("title", msg.User.Name), h.Text(msg.User.Emoji)), + h.Div(h.Class("bubble"), + h.P(h.Text(msg.Message)), + ), + ), + ) + } + messagesMu.Unlock() + + return h.Main(h.Class("container"), + h.Nav( + h.Attr("role", "tab-control"), + h.Ul(tabs...), + h.Span(h.Class("nats-badge"), h.Text("NATS")), + ), + h.Div(chatHistoryChildren...), + h.Div( + h.Class("chat-input"), + currentUser.Avatar(), + h.FieldSet( + h.Attr("role", "group"), + h.Input( + h.Type("text"), + h.Placeholder(currentUser.Name+" says..."), + statement.Bind(), + h.Attr("autofocus"), + say.OnKeyDown("Enter"), + ), + h.Button(h.Text("Send"), say.OnClick()), + ), + ), + ) + }) + }) + + log.Println("Starting NATS chatroom on :7331 (embedded NATS server)") + v.Start() +} + +func randUser() UserInfo { + adjectives := []string{"Happy", "Clever", "Brave", "Swift", "Gentle", "Wise", "Bold", "Calm", "Eager", "Fierce"} + animals := []string{"Panda", "Tiger", "Eagle", "Dolphin", "Fox", "Wolf", "Bear", "Hawk", "Otter", "Lion"} + emojis := []string{"🐼", "🐯", "🦅", "🐬", "🦊", "🐺", "🐻", "🦅", "🦦", "🦁"} + + idx := rand.Intn(len(animals)) + return UserInfo{ + Name: adjectives[rand.Intn(len(adjectives))] + " " + animals[idx], + Emoji: emojis[idx], + } +} + +func randID() string { + const chars = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, 8) + for i := range b { + b[i] = chars[rand.Intn(len(chars))] + } + return string(b) +} + +var quoteIdx = rand.Intn(len(devQuotes)) +var devQuotes = []string{ + "Just use NATS.", + "Pub/sub all the things!", + "Messages are the new API.", + "JetStream for durability.", + "No more polling.", + "Event-driven architecture FTW.", + "Decouple everything.", + "NATS is fast.", + "Subjects are like topics.", + "Request-reply is cool.", +} + +func randomDevQuote() string { + quoteIdx = (quoteIdx + 1) % len(devQuotes) + return devQuotes[quoteIdx] +}