diff --git a/configuration.go b/configuration.go index ad9bb61..952a188 100644 --- a/configuration.go +++ b/configuration.go @@ -53,10 +53,15 @@ type Options struct { // Defaults to "/_datastar.js" if empty. DatastarPath string - // PubSub enables publish/subscribe messaging. Use vianats.New() for an - // embedded NATS backend, or supply any PubSub implementation. + // PubSub enables publish/subscribe messaging. When nil, an embedded NATS + // server starts automatically in Start(). Supply any PubSub implementation + // to replace it. PubSub PubSub + // Streams declares JetStream streams to create when Start() initializes + // the embedded NATS server. Ignored when a custom PubSub is configured. + Streams []StreamConfig + // ContextSuspendAfter is the time a context may be disconnected before // the reaper suspends it (frees page resources but keeps the context // shell for seamless re-init on reconnect). Default: 15m. diff --git a/docs/pubsub-and-sessions.md b/docs/pubsub-and-sessions.md index 1d6f4aa..7b437b3 100644 --- a/docs/pubsub-and-sessions.md +++ b/docs/pubsub-and-sessions.md @@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state. ## PubSub -Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box. +Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box. ### Interface @@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages. -### Ensure a stream exists +### Declaring streams + +The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server: ```go -err := via.EnsureStream(v, via.StreamConfig{ - Name: "CHAT", - Subjects: []string{"chat.>"}, - MaxMsgs: 1000, - MaxAge: 24 * time.Hour, +v.Config(via.Options{ + Streams: []via.StreamConfig{{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }}, }) ``` @@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{ | `MaxMsgs` | Maximum number of messages to retain | | `MaxAge` | Maximum age before messages are discarded | -Call `EnsureStream` during app initialization, before `v.Start()`. +For dynamic stream creation after startup, `EnsureStream` is also available: + +```go +err := via.EnsureStream(v, via.StreamConfig{ + Name: "EVENTS", + Subjects: []string{"events.>"}, + MaxMsgs: 500, + MaxAge: 12 * time.Hour, +}) +``` ### Replay history diff --git a/internal/examples/nats-chatroom/README.md b/internal/examples/nats-chatroom/README.md index 9233588..69f68b1 100644 --- a/internal/examples/nats-chatroom/README.md +++ b/internal/examples/nats-chatroom/README.md @@ -2,7 +2,7 @@ A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`. -Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required. +Via includes an embedded NATS server that starts automatically — no external server required. ## Key Differences from Original Chatroom @@ -25,21 +25,6 @@ That's it. No separate NATS server needed. Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients. -## How Embedded NATS Works - -```go -// Start embedded NATS server (JetStream enabled by default) -ns, err := embeddednats.New(ctx, - embeddednats.WithDirectory("./data/nats"), -) -ns.WaitForServer() - -// Get client connection to embedded server -nc, err := ns.Client() -``` - -Data is persisted to `./data/nats/` for JetStream durability. - ## Architecture ``` @@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability. ## JetStream Durability -Messages persist to disk via JetStream: +Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server: ```go -js.AddStream(&nats.StreamConfig{ - Name: "CHAT", - Subjects: []string{"chat.>"}, - MaxMsgs: 1000, // Keep last 1000 messages - MaxAge: 24 * time.Hour, +v.Config(via.Options{ + Streams: []via.StreamConfig{{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }}, }) ``` @@ -87,23 +74,6 @@ Stop and restart the app - chat history survives. - Manual join/leave channels **This example - ~60 lines of NATS integration:** -- `embeddednats.New()` starts the server -- `nc.Subscribe(subject, handler)` for receiving -- `nc.Publish(subject, data)` for sending -- NATS handles delivery, no polling - -## Next Steps - -If this pattern proves useful, it could be promoted to a Via plugin: - -```go -// Hypothetical future API -v.Config(via.WithEmbeddedNATS("./data/nats")) - -// In page init -c.Subscribe("events.user.*", func(data []byte) { - c.Sync() -}) - -c.Publish("events.user.login", userData) -``` +- `via.Subscribe(c, subject, handler)` for receiving +- `via.Publish(c, subject, data)` for sending +- Streams declared in `Options` — NATS handles delivery, no polling diff --git a/internal/examples/nats-chatroom/main.go b/internal/examples/nats-chatroom/main.go index 530827b..863008d 100644 --- a/internal/examples/nats-chatroom/main.go +++ b/internal/examples/nats-chatroom/main.go @@ -21,18 +21,14 @@ func main() { DocumentTitle: "NATS Chat", LogLevel: via.LogLevelInfo, ServerAddress: ":7331", + Streams: []via.StreamConfig{{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }}, }) - err := via.EnsureStream(v, via.StreamConfig{ - Name: "CHAT", - Subjects: []string{"chat.>"}, - MaxMsgs: 1000, - MaxAge: 24 * time.Hour, - }) - if err != nil { - log.Fatalf("Failed to ensure stream: %v", err) - } - v.AppendToHead( h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")), h.StyleEl(h.Raw(chatCSS)), @@ -76,6 +72,6 @@ func main() { protected := v.Group("", requireProfile) protected.Page("/", ChatPage) - log.Println("Starting NATS chatroom on :7331 (embedded NATS server)") + log.Println("Starting NATS chatroom on :7331") v.Start() } diff --git a/internal/examples/pubsub-crud/main.go b/internal/examples/pubsub-crud/main.go index 8fe37cb..1d0b13e 100644 --- a/internal/examples/pubsub-crud/main.go +++ b/internal/examples/pubsub-crud/main.go @@ -53,18 +53,14 @@ func main() { DocumentTitle: "Bookmarks", LogLevel: via.LogLevelInfo, ServerAddress: ":7331", + Streams: []via.StreamConfig{{ + Name: "BOOKMARKS", + Subjects: []string{"bookmarks.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }}, }) - err := via.EnsureStream(v, via.StreamConfig{ - Name: "BOOKMARKS", - Subjects: []string{"bookmarks.>"}, - MaxMsgs: 1000, - MaxAge: 24 * time.Hour, - }) - if err != nil { - log.Fatalf("Failed to ensure stream: %v", err) - } - v.AppendToHead( h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")), h.Script(h.Src("https://cdn.tailwindcss.com")), diff --git a/via.go b/via.go index eb62f85..16ff3af 100644 --- a/via.go +++ b/via.go @@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) { if cfg.ContextTTL != 0 { v.cfg.ContextTTL = cfg.ContextTTL } + if cfg.Streams != nil { + v.cfg.Streams = cfg.Streams + } if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 { v.actionRateLimit = cfg.ActionRateLimit } @@ -371,6 +374,12 @@ func (v *V) Start() { } } + for _, sc := range v.cfg.Streams { + if err := EnsureStream(v, sc); err != nil { + v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name) + } + } + handler := http.Handler(v.mux) if v.sessionManager != nil { handler = v.sessionManager.LoadAndSave(v.mux)