feat: add declarative Options.Streams for automatic JetStream stream creation
Some checks failed
CI / Build and Test (push) Has been cancelled

Streams listed in Options.Streams are created by Start() when the
embedded NATS server initializes, replacing manual EnsureStream calls
during setup. Migrates nats-chatroom and pubsub-crud examples.
This commit is contained in:
Ryan Hamamura
2026-02-19 12:24:44 -10:00
parent 42b21348cb
commit 60009124c9
6 changed files with 62 additions and 73 deletions

View File

@@ -53,10 +53,15 @@ type Options struct {
// Defaults to "/_datastar.js" if empty. // Defaults to "/_datastar.js" if empty.
DatastarPath string DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an // PubSub enables publish/subscribe messaging. When nil, an embedded NATS
// embedded NATS backend, or supply any PubSub implementation. // server starts automatically in Start(). Supply any PubSub implementation
// to replace it.
PubSub PubSub PubSub PubSub
// Streams declares JetStream streams to create when Start() initializes
// the embedded NATS server. Ignored when a custom PubSub is configured.
Streams []StreamConfig
// ContextSuspendAfter is the time a context may be disconnected before // ContextSuspendAfter is the time a context may be disconnected before
// the reaper suspends it (frees page resources but keeps the context // the reaper suspends it (frees page resources but keeps the context
// shell for seamless re-init on reconnect). Default: 15m. // shell for seamless re-init on reconnect). Default: 15m.

View File

@@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state.
## PubSub ## PubSub
Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box. Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box.
### Interface ### Interface
@@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages. NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
### Ensure a stream exists ### Declaring streams
The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server:
```go ```go
err := via.EnsureStream(v, via.StreamConfig{ v.Config(via.Options{
Name: "CHAT", Streams: []via.StreamConfig{{
Subjects: []string{"chat.>"}, Name: "CHAT",
MaxMsgs: 1000, Subjects: []string{"chat.>"},
MaxAge: 24 * time.Hour, MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
}) })
``` ```
@@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{
| `MaxMsgs` | Maximum number of messages to retain | | `MaxMsgs` | Maximum number of messages to retain |
| `MaxAge` | Maximum age before messages are discarded | | `MaxAge` | Maximum age before messages are discarded |
Call `EnsureStream` during app initialization, before `v.Start()`. For dynamic stream creation after startup, `EnsureStream` is also available:
```go
err := via.EnsureStream(v, via.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
MaxMsgs: 500,
MaxAge: 12 * time.Hour,
})
```
### Replay history ### Replay history

View File

@@ -2,7 +2,7 @@
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`. A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required. Via includes an embedded NATS server that starts automatically no external server required.
## Key Differences from Original Chatroom ## Key Differences from Original Chatroom
@@ -25,21 +25,6 @@ That's it. No separate NATS server needed.
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients. Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
## How Embedded NATS Works
```go
// Start embedded NATS server (JetStream enabled by default)
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
ns.WaitForServer()
// Get client connection to embedded server
nc, err := ns.Client()
```
Data is persisted to `./data/nats/` for JetStream durability.
## Architecture ## Architecture
``` ```
@@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability.
## JetStream Durability ## JetStream Durability
Messages persist to disk via JetStream: Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server:
```go ```go
js.AddStream(&nats.StreamConfig{ v.Config(via.Options{
Name: "CHAT", Streams: []via.StreamConfig{{
Subjects: []string{"chat.>"}, Name: "CHAT",
MaxMsgs: 1000, // Keep last 1000 messages Subjects: []string{"chat.>"},
MaxAge: 24 * time.Hour, MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
}) })
``` ```
@@ -87,23 +74,6 @@ Stop and restart the app - chat history survives.
- Manual join/leave channels - Manual join/leave channels
**This example - ~60 lines of NATS integration:** **This example - ~60 lines of NATS integration:**
- `embeddednats.New()` starts the server - `via.Subscribe(c, subject, handler)` for receiving
- `nc.Subscribe(subject, handler)` for receiving - `via.Publish(c, subject, data)` for sending
- `nc.Publish(subject, data)` for sending - Streams declared in `Options` — NATS handles delivery, no polling
- NATS handles delivery, no polling
## Next Steps
If this pattern proves useful, it could be promoted to a Via plugin:
```go
// Hypothetical future API
v.Config(via.WithEmbeddedNATS("./data/nats"))
// In page init
c.Subscribe("events.user.*", func(data []byte) {
c.Sync()
})
c.Publish("events.user.login", userData)
```

View File

@@ -21,18 +21,14 @@ func main() {
DocumentTitle: "NATS Chat", DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo, LogLevel: via.LogLevelInfo,
ServerAddress: ":7331", ServerAddress: ":7331",
Streams: []via.StreamConfig{{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
}) })
err := via.EnsureStream(v, via.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
h.StyleEl(h.Raw(chatCSS)), h.StyleEl(h.Raw(chatCSS)),
@@ -76,6 +72,6 @@ func main() {
protected := v.Group("", requireProfile) protected := v.Group("", requireProfile)
protected.Page("/", ChatPage) protected.Page("/", ChatPage)
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)") log.Println("Starting NATS chatroom on :7331")
v.Start() v.Start()
} }

View File

@@ -53,18 +53,14 @@ func main() {
DocumentTitle: "Bookmarks", DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo, LogLevel: via.LogLevelInfo,
ServerAddress: ":7331", ServerAddress: ":7331",
Streams: []via.StreamConfig{{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
}) })
err := via.EnsureStream(v, via.StreamConfig{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
h.Script(h.Src("https://cdn.tailwindcss.com")), h.Script(h.Src("https://cdn.tailwindcss.com")),

9
via.go
View File

@@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) {
if cfg.ContextTTL != 0 { if cfg.ContextTTL != 0 {
v.cfg.ContextTTL = cfg.ContextTTL v.cfg.ContextTTL = cfg.ContextTTL
} }
if cfg.Streams != nil {
v.cfg.Streams = cfg.Streams
}
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 { if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
v.actionRateLimit = cfg.ActionRateLimit v.actionRateLimit = cfg.ActionRateLimit
} }
@@ -371,6 +374,12 @@ func (v *V) Start() {
} }
} }
for _, sc := range v.cfg.Streams {
if err := EnsureStream(v, sc); err != nil {
v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name)
}
}
handler := http.Handler(v.mux) handler := http.Handler(v.mux)
if v.sessionManager != nil { if v.sessionManager != nil {
handler = v.sessionManager.LoadAndSave(v.mux) handler = v.sessionManager.LoadAndSave(v.mux)