3 Commits

Author SHA1 Message Date
Ryan Hamamura
d8318af9c4 feat: add JetStream message replay to chatroom example
Replay stored messages from JetStream when joining or switching rooms
so users see chat history immediately.
2026-01-26 08:10:30 -10:00
Ryan Hamamura
30cc6d88e6 feat: add embedded NATS pub/sub support on Context
Define PubSub and Subscription interfaces in the core via package with
a vianats sub-package providing the embedded NATS + JetStream
implementation. Expose c.Publish() and c.Subscribe() on Context with
automatic subscription cleanup on session close. Refactor the NATS
chatroom example to use the built-in methods instead of manual
subscription tracking.
2026-01-26 08:06:50 -10:00
Ryan Hamamura
88bd0f31df feat: add NATS chatroom example with embedded server
Demonstrates pub/sub messaging as an alternative to custom Rooms
implementation. Uses delaneyj/toolbelt/embeddednats to run NATS
with JetStream inside the binary - no external server required.
2026-01-16 00:50:05 -10:00
11 changed files with 826 additions and 11 deletions

4
.gitignore vendored
View File

@@ -47,3 +47,7 @@ internal/examples/picocss/picocss
internal/examples/plugins/plugins
internal/examples/realtimechart/realtimechart
internal/examples/shakespeare/shakespeare
internal/examples/nats-chatroom/nats-chatroom
# NATS data directory
data/

View File

@@ -45,4 +45,8 @@ type Options struct {
// DatastarPath is the URL path where the script is served.
// Defaults to "/_datastar.js" if empty.
DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
// embedded NATS backend, or supply any PubSub implementation.
PubSub PubSub
}

View File

@@ -31,6 +31,8 @@ type Context struct {
mu sync.RWMutex
ctxDisposedChan chan struct{}
reqCtx context.Context
subscriptions []Subscription
subsMu sync.Mutex
}
// View defines the UI rendered by this context.
@@ -403,6 +405,55 @@ func (c *Context) Session() *Session {
}
}
// Publish sends data to the given subject via the configured PubSub backend.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Publish(subject string, data []byte) error {
if c.id == "" {
return nil
}
if c.app.pubsub == nil {
return fmt.Errorf("pubsub not configured")
}
return c.app.pubsub.Publish(subject, data)
}
// Subscribe creates a subscription on the configured PubSub backend.
// The subscription is tracked for automatic cleanup when the context is disposed.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
if c.id == "" {
return nil, nil
}
if c.app.pubsub == nil {
return nil, fmt.Errorf("pubsub not configured")
}
sub, err := c.app.pubsub.Subscribe(subject, handler)
if err != nil {
return nil, err
}
// Track on page context for cleanup (components use parent, like signals/actions)
target := c
if c.isComponent() {
target = c.parentPageCtx
}
target.subsMu.Lock()
target.subscriptions = append(target.subscriptions, sub)
target.subsMu.Unlock()
return sub, nil
}
// unsubscribeAll cleans up all tracked subscriptions for this context and its components.
func (c *Context) unsubscribeAll() {
c.subsMu.Lock()
subs := c.subscriptions
c.subscriptions = nil
c.subsMu.Unlock()
for _, sub := range subs {
sub.Unsubscribe()
}
}
func newContext(id string, route string, v *V) *Context {
if v == nil {
log.Fatal("create context failed: app pointer is nil")

23
go.mod
View File

@@ -8,17 +8,32 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/alexedwards/scs/sqlite3store v0.0.0-20251002162104-209de6e426de
github.com/alexedwards/scs/v2 v2.9.0
github.com/delaneyj/toolbelt v0.9.1
github.com/mattn/go-sqlite3 v1.14.32
github.com/nats-io/nats.go v1.48.0
github.com/starfederation/datastar-go v1.0.3
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
)
require (
github.com/CAFxX/httpcompression v0.0.9 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/antithesishq/antithesis-sdk-go v0.5.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/nats-io/nats-server/v2 v2.12.2 // indirect
github.com/nats-io/nkeys v0.4.12 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/time v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

51
go.sum
View File

@@ -9,23 +9,52 @@ github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gv
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/antithesishq/antithesis-sdk-go v0.5.0 h1:cudCFF83pDDANcXFzkQPUHHedfnnIbUO3JMr9fqwFJs=
github.com/antithesishq/antithesis-sdk-go v0.5.0/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/delaneyj/toolbelt v0.9.1 h1:QJComn2qoaQ4azl5uRkGpdHSO9e+JtoxDTXCiQHvH8o=
github.com/delaneyj/toolbelt v0.9.1/go.mod h1:eNXpPuThjTD4tpRNCBl4JEz9jdg9LpyzNuyG+stnIbs=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.2 h1:4TEQd0Y4zvcW0IsVxjlXnRso1hBkQl3TS0BI+SxgPhE=
github.com/nats-io/nats-server/v2 v2.12.2/go.mod h1:j1AAttYeu7WnvD8HLJ+WWKNMSyxsqmZ160pNtCQRMyE=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/starfederation/datastar-go v1.0.3 h1:DnzgsJ6tDHDM6y5Nxsk0AGW/m8SyKch2vQg3P1xGTcU=
github.com/starfederation/datastar-go v1.0.3/go.mod h1:stm83LQkhZkwa5GzzdPEN6dLuu8FVwxIv0w1DYkbD3w=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -34,8 +63,8 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
@@ -43,8 +72,16 @@ github.com/valyala/gozstd v1.20.1 h1:xPnnnvjmaDDitMFfDxmQ4vpx0+3CdTg2o3lALvXTU/g
github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,109 @@
# NATS Chatroom Example (Embedded)
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required.
## Key Differences from Original Chatroom
| Aspect | Original (`../chatroom`) | This Example |
|--------|-------------------------|--------------|
| Pub/sub | Custom `Rooms` struct (~160 lines) | NATS subjects |
| Member tracking | Manual `map[TU]Syncable` | NATS handles subscribers |
| Publish timing | Ticker every 100ms + dirty flag | Instant delivery |
| Durability | None (in-memory) | JetStream persists to disk |
| Multi-instance | Not supported | Works across server instances |
| External deps | None | **None** (NATS embedded in binary) |
## Run the Example
```bash
go run ./internal/examples/nats-chatroom
```
That's it. No separate NATS server needed.
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
## How Embedded NATS Works
```go
// Start embedded NATS server (JetStream enabled by default)
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
ns.WaitForServer()
// Get client connection to embedded server
nc, err := ns.Client()
```
Data is persisted to `./data/nats/` for JetStream durability.
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Single Binary │
│ │
│ Browser A Embedded NATS Browser B │
│ │ │ │ │
│ │-- Via Action ---> │ │ │
│ │ (Send msg) │ │ │
│ │ │ │ │
│ │ nc.Publish() │ │
│ │ "chat.room.Go" │ │
│ │ │ │ │
│ │<-- Subscribe -----|---- Subscribe --->│ │
│ │ callback │ callback │ │
│ │ │ │ │
│ │-- c.Sync() ------>│<--- c.Sync() -----| │
│ │ (SSE) │ (SSE) │ │
│ │
└─────────────────────────────────────────────────────────┘
```
## JetStream Durability
Messages persist to disk via JetStream:
```go
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000, // Keep last 1000 messages
MaxAge: 24 * time.Hour,
})
```
Stop and restart the app - chat history survives.
## Code Comparison
**Original chatroom - 160+ lines of custom pub/sub:**
- `Rooms` struct with named rooms
- `Room` with member tracking, mutex, dirty flag
- Ticker-based publish loop
- Manual join/leave channels
**This example - ~60 lines of NATS integration:**
- `embeddednats.New()` starts the server
- `nc.Subscribe(subject, handler)` for receiving
- `nc.Publish(subject, data)` for sending
- NATS handles delivery, no polling
## Next Steps
If this pattern proves useful, it could be promoted to a Via plugin:
```go
// Hypothetical future API
v.Config(via.WithEmbeddedNATS("./data/nats"))
// In page init
c.Subscribe("events.user.*", func(data []byte) {
c.Sync()
})
c.Publish("events.user.login", userData)
```

View File

@@ -0,0 +1,303 @@
package main
import (
"context"
"encoding/json"
"log"
"math/rand"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
)
var (
WithSignal = via.WithSignal
)
// ChatMessage represents a message in a chat room
type ChatMessage struct {
User UserInfo `json:"user"`
Message string `json:"message"`
Time int64 `json:"time"`
}
// UserInfo identifies a chat participant
type UserInfo struct {
Name string `json:"name"`
Emoji string `json:"emoji"`
}
func (u *UserInfo) Avatar() h.H {
return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji))
}
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
func main() {
ctx := context.Background()
ps, err := vianats.New(ctx, "./data/nats")
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
// Create JetStream stream for message durability
js := ps.JetStream()
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "NATS Chat",
LogLvl: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
h.StyleEl(h.Raw(`
body { margin: 0; }
main {
display: flex;
flex-direction: column;
height: 100vh;
}
nav[role="tab-control"] ul li a[aria-current="page"] {
background-color: var(--pico-primary-background);
color: var(--pico-primary-inverse);
border-bottom: 2px solid var(--pico-primary);
}
.chat-message { display: flex; gap: 0.75rem; margin-bottom: 0.5rem; }
.avatar {
width: 2rem;
height: 2rem;
border-radius: 50%;
background: var(--pico-muted-border-color);
display: grid;
place-items: center;
font-size: 1.5rem;
flex-shrink: 0;
}
.bubble { flex: 1; }
.bubble p { margin: 0; }
.chat-history {
flex: 1;
overflow-y: auto;
padding: 1rem;
padding-bottom: calc(88px + env(safe-area-inset-bottom));
}
.chat-input {
position: fixed;
left: 0;
right: 0;
bottom: 0;
background: var(--pico-background-color);
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.75rem 1rem calc(0.75rem + env(safe-area-inset-bottom));
border-top: 1px solid var(--pico-muted-border-color);
}
.chat-input fieldset {
flex: 1;
margin: 0;
}
.nats-badge {
background: #27AAE1;
color: white;
padding: 0.25rem 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
margin-left: auto;
}
`)),
h.Script(h.Raw(`
function scrollChatToBottom() {
const chatHistory = document.querySelector('.chat-history');
if (chatHistory) chatHistory.scrollTop = chatHistory.scrollHeight;
}
`)),
)
v.Page("/", func(c *via.Context) {
currentUser := randUser()
roomSignal := c.Signal("Go")
statement := c.Signal("")
var messages []ChatMessage
var messagesMu sync.Mutex
currentRoom := "Go"
var currentSub via.Subscription
subscribeToRoom := func(room string) {
if currentSub != nil {
currentSub.Unsubscribe()
}
// Replay history from JetStream before subscribing for real-time
subject := "chat.room." + room
if hist, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer()); err == nil {
for {
msg, err := hist.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var chatMsg ChatMessage
if json.Unmarshal(msg.Data, &chatMsg) == nil {
messages = append(messages, chatMsg)
}
}
hist.Unsubscribe()
if len(messages) > 50 {
messages = messages[len(messages)-50:]
}
}
sub, _ := c.Subscribe(subject, func(data []byte) {
var msg ChatMessage
if err := json.Unmarshal(data, &msg); err != nil {
return
}
messagesMu.Lock()
messages = append(messages, msg)
if len(messages) > 50 {
messages = messages[len(messages)-50:]
}
messagesMu.Unlock()
c.Sync()
})
currentSub = sub
currentRoom = room
}
subscribeToRoom("Go")
switchRoom := c.Action(func() {
newRoom := roomSignal.String()
if newRoom != currentRoom {
messagesMu.Lock()
messages = nil
messagesMu.Unlock()
subscribeToRoom(newRoom)
c.Sync()
}
})
say := c.Action(func() {
msg := statement.String()
if msg == "" {
msg = randomDevQuote()
}
statement.SetValue("")
data, _ := json.Marshal(ChatMessage{
User: currentUser,
Message: msg,
Time: time.Now().UnixMilli(),
})
c.Publish("chat.room."+currentRoom, data)
})
c.View(func() h.H {
var tabs []h.H
for _, name := range roomNames {
isCurrent := name == currentRoom
tabs = append(tabs, h.Li(
h.A(
h.If(isCurrent, h.Attr("aria-current", "page")),
h.Text(name),
switchRoom.OnClick(WithSignal(roomSignal, name)),
),
))
}
messagesMu.Lock()
chatHistoryChildren := []h.H{
h.Class("chat-history"),
h.Script(h.Raw(`new MutationObserver(()=>scrollChatToBottom()).observe(document.querySelector('.chat-history'), {childList:true})`)),
}
for _, msg := range messages {
chatHistoryChildren = append(chatHistoryChildren,
h.Div(h.Class("chat-message"),
h.Div(h.Class("avatar"), h.Attr("title", msg.User.Name), h.Text(msg.User.Emoji)),
h.Div(h.Class("bubble"),
h.P(h.Text(msg.Message)),
),
),
)
}
messagesMu.Unlock()
return h.Main(h.Class("container"),
h.Nav(
h.Attr("role", "tab-control"),
h.Ul(tabs...),
h.Span(h.Class("nats-badge"), h.Text("NATS")),
),
h.Div(chatHistoryChildren...),
h.Div(
h.Class("chat-input"),
currentUser.Avatar(),
h.FieldSet(
h.Attr("role", "group"),
h.Input(
h.Type("text"),
h.Placeholder(currentUser.Name+" says..."),
statement.Bind(),
h.Attr("autofocus"),
say.OnKeyDown("Enter"),
),
h.Button(h.Text("Send"), say.OnClick()),
),
),
)
})
})
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)")
v.Start()
}
func randUser() UserInfo {
adjectives := []string{"Happy", "Clever", "Brave", "Swift", "Gentle", "Wise", "Bold", "Calm", "Eager", "Fierce"}
animals := []string{"Panda", "Tiger", "Eagle", "Dolphin", "Fox", "Wolf", "Bear", "Hawk", "Otter", "Lion"}
emojis := []string{"🐼", "🐯", "🦅", "🐬", "🦊", "🐺", "🐻", "🦅", "🦦", "🦁"}
idx := rand.Intn(len(animals))
return UserInfo{
Name: adjectives[rand.Intn(len(adjectives))] + " " + animals[idx],
Emoji: emojis[idx],
}
}
var quoteIdx = rand.Intn(len(devQuotes))
var devQuotes = []string{
"Just use NATS.",
"Pub/sub all the things!",
"Messages are the new API.",
"JetStream for durability.",
"No more polling.",
"Event-driven architecture FTW.",
"Decouple everything.",
"NATS is fast.",
"Subjects are like topics.",
"Request-reply is cool.",
}
func randomDevQuote() string {
quoteIdx = (quoteIdx + 1) % len(devQuotes)
return devQuotes[quoteIdx]
}

195
nats_test.go Normal file
View File

@@ -0,0 +1,195 @@
package via
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type mockHandler struct {
id int64
fn func([]byte)
active atomic.Bool
}
// mockPubSub implements PubSub for testing without NATS.
type mockPubSub struct {
mu sync.Mutex
subs map[string][]*mockHandler
nextID atomic.Int64
}
func newMockPubSub() *mockPubSub {
return &mockPubSub{subs: make(map[string][]*mockHandler)}
}
func (m *mockPubSub) Publish(subject string, data []byte) error {
m.mu.Lock()
handlers := make([]*mockHandler, len(m.subs[subject]))
copy(handlers, m.subs[subject])
m.mu.Unlock()
for _, h := range handlers {
if h.active.Load() {
h.fn(data)
}
}
return nil
}
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
m.mu.Lock()
defer m.mu.Unlock()
mh := &mockHandler{
id: m.nextID.Add(1),
fn: handler,
}
mh.active.Store(true)
m.subs[subject] = append(m.subs[subject], mh)
return &mockSub{handler: mh}, nil
}
func (m *mockPubSub) Close() error { return nil }
type mockSub struct {
handler *mockHandler
}
func (s *mockSub) Unsubscribe() error {
s.handler.active.Store(false)
return nil
}
func TestPubSub_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var received []byte
var wg sync.WaitGroup
wg.Add(1)
c := newContext("test-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := c.Subscribe("test.topic", func(data []byte) {
received = data
wg.Done()
})
require.NoError(t, err)
err = c.Publish("test.topic", []byte("hello"))
require.NoError(t, err)
wg.Wait()
assert.Equal(t, []byte("hello"), received)
}
func TestPubSub_MultipleSubscribers(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var mu sync.Mutex
var results []string
var wg sync.WaitGroup
wg.Add(2)
c1 := newContext("ctx-1", "/", v)
c1.View(func() h.H { return h.Div() })
c2 := newContext("ctx-2", "/", v)
c2.View(func() h.H { return h.Div() })
c1.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c1:"+string(data))
mu.Unlock()
wg.Done()
})
c2.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c2:"+string(data))
mu.Unlock()
wg.Done()
})
c1.Publish("broadcast", []byte("msg"))
wg.Wait()
assert.Len(t, results, 2)
assert.Contains(t, results, "c1:msg")
assert.Contains(t, results, "c2:msg")
}
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("cleanup-ctx", "/", v)
c.View(func() h.H { return h.Div() })
c.Subscribe("room.1", func(data []byte) {})
c.Subscribe("room.2", func(data []byte) {})
assert.Len(t, c.subscriptions, 2)
c.unsubscribeAll()
assert.Empty(t, c.subscriptions)
}
func TestPubSub_ManualUnsubscribe(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("unsub-ctx", "/", v)
c.View(func() h.H { return h.Div() })
called := false
sub, err := c.Subscribe("topic", func(data []byte) {
called = true
})
require.NoError(t, err)
sub.Unsubscribe()
c.Publish("topic", []byte("ignored"))
time.Sleep(10 * time.Millisecond)
assert.False(t, called)
}
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
v := New()
c := newContext("noop-ctx", "/", v)
c.View(func() h.H { return h.Div() })
err := c.Publish("topic", []byte("data"))
assert.Error(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.Error(t, err)
assert.Nil(t, sub)
}
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
// Panic-check context has id=""
c := newContext("", "/", v)
err := c.Publish("topic", []byte("data"))
assert.NoError(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.NoError(t, err)
assert.Nil(t, sub)
}

14
pubsub.go Normal file
View File

@@ -0,0 +1,14 @@
package via
// PubSub is an interface for publish/subscribe messaging backends.
// The vianats sub-package provides an embedded NATS implementation.
type PubSub interface {
Publish(subject string, data []byte) error
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
Close() error
}
// Subscription represents an active subscription that can be manually unsubscribed.
type Subscription interface {
Unsubscribe() error
}

5
via.go
View File

@@ -40,6 +40,7 @@ type V struct {
documentFootIncludes []h.H
devModePageInitFnMap map[string]func(*Context)
sessionManager *scs.SessionManager
pubsub PubSub
datastarPath string
datastarContent []byte
datastarOnce sync.Once
@@ -117,6 +118,9 @@ func (v *V) Config(cfg Options) {
if cfg.DatastarPath != "" {
v.datastarPath = cfg.DatastarPath
}
if cfg.PubSub != nil {
v.pubsub = cfg.PubSub
}
}
// AppendToHead appends the given h.H nodes to the head of the base HTML document.
@@ -525,6 +529,7 @@ func New() *V {
v.logErr(c, "failed to handle session close: %v", err)
return
}
c.unsubscribeAll()
c.stopAllRoutines()
v.logDebug(c, "session close event triggered")
if v.cfg.DevMode {

78
vianats/vianats.go Normal file
View File

@@ -0,0 +1,78 @@
// Package vianats provides an embedded NATS server with JetStream as a
// pub/sub backend for Via applications.
package vianats
import (
"context"
"fmt"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
)
// NATS implements via.PubSub using an embedded NATS server with JetStream.
type NATS struct {
server *embeddednats.Server
nc *nats.Conn
js nats.JetStreamContext
}
// New starts an embedded NATS server with JetStream enabled and returns a
// ready-to-use NATS instance. The server stores data in dataDir and shuts
// down when ctx is cancelled.
func New(ctx context.Context, dataDir string) (*NATS, error) {
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
if err != nil {
return nil, fmt.Errorf("vianats: start server: %w", err)
}
ns.WaitForServer()
nc, err := ns.Client()
if err != nil {
ns.Close()
return nil, fmt.Errorf("vianats: connect client: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
ns.Close()
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
}
return &NATS{server: ns, nc: nc, js: js}, nil
}
// Publish sends data to the given subject using core NATS publish.
// JetStream captures messages automatically if a matching stream exists.
func (n *NATS) Publish(subject string, data []byte) error {
return n.nc.Publish(subject, data)
}
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
if err != nil {
return nil, err
}
return sub, nil
}
// Close shuts down the client connection and embedded server.
func (n *NATS) Close() error {
n.nc.Close()
return n.server.Close()
}
// Conn returns the underlying NATS connection for advanced usage.
func (n *NATS) Conn() *nats.Conn {
return n.nc
}
// JetStream returns the JetStream context for stream configuration and replay.
func (n *NATS) JetStream() nats.JetStreamContext {
return n.js
}