Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60009124c9 | ||
|
|
42b21348cb | ||
|
|
58ad9a2699 |
@@ -53,10 +53,15 @@ type Options struct {
|
||||
// Defaults to "/_datastar.js" if empty.
|
||||
DatastarPath string
|
||||
|
||||
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
|
||||
// embedded NATS backend, or supply any PubSub implementation.
|
||||
// PubSub enables publish/subscribe messaging. When nil, an embedded NATS
|
||||
// server starts automatically in Start(). Supply any PubSub implementation
|
||||
// to replace it.
|
||||
PubSub PubSub
|
||||
|
||||
// Streams declares JetStream streams to create when Start() initializes
|
||||
// the embedded NATS server. Ignored when a custom PubSub is configured.
|
||||
Streams []StreamConfig
|
||||
|
||||
// ContextSuspendAfter is the time a context may be disconnected before
|
||||
// the reaper suspends it (frees page resources but keeps the context
|
||||
// shell for seamless re-init on reconnect). Default: 15m.
|
||||
|
||||
@@ -42,6 +42,7 @@ type Context struct {
|
||||
createdAt time.Time
|
||||
sseConnected atomic.Bool
|
||||
sseDisconnectedAt atomic.Pointer[time.Time]
|
||||
lastSeenAt atomic.Pointer[time.Time]
|
||||
suspended atomic.Bool
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state.
|
||||
|
||||
## PubSub
|
||||
|
||||
Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box.
|
||||
Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box.
|
||||
|
||||
### Interface
|
||||
|
||||
@@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces
|
||||
|
||||
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
|
||||
|
||||
### Ensure a stream exists
|
||||
### Declaring streams
|
||||
|
||||
The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server:
|
||||
|
||||
```go
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
v.Config(via.Options{
|
||||
Streams: []via.StreamConfig{{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
}},
|
||||
})
|
||||
```
|
||||
|
||||
@@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{
|
||||
| `MaxMsgs` | Maximum number of messages to retain |
|
||||
| `MaxAge` | Maximum age before messages are discarded |
|
||||
|
||||
Call `EnsureStream` during app initialization, before `v.Start()`.
|
||||
For dynamic stream creation after startup, `EnsureStream` is also available:
|
||||
|
||||
```go
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "EVENTS",
|
||||
Subjects: []string{"events.>"},
|
||||
MaxMsgs: 500,
|
||||
MaxAge: 12 * time.Hour,
|
||||
})
|
||||
```
|
||||
|
||||
### Replay history
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
|
||||
|
||||
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required.
|
||||
Via includes an embedded NATS server that starts automatically — no external server required.
|
||||
|
||||
## Key Differences from Original Chatroom
|
||||
|
||||
@@ -25,21 +25,6 @@ That's it. No separate NATS server needed.
|
||||
|
||||
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
|
||||
|
||||
## How Embedded NATS Works
|
||||
|
||||
```go
|
||||
// Start embedded NATS server (JetStream enabled by default)
|
||||
ns, err := embeddednats.New(ctx,
|
||||
embeddednats.WithDirectory("./data/nats"),
|
||||
)
|
||||
ns.WaitForServer()
|
||||
|
||||
// Get client connection to embedded server
|
||||
nc, err := ns.Client()
|
||||
```
|
||||
|
||||
Data is persisted to `./data/nats/` for JetStream durability.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
@@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability.
|
||||
|
||||
## JetStream Durability
|
||||
|
||||
Messages persist to disk via JetStream:
|
||||
Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server:
|
||||
|
||||
```go
|
||||
js.AddStream(&nats.StreamConfig{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000, // Keep last 1000 messages
|
||||
MaxAge: 24 * time.Hour,
|
||||
v.Config(via.Options{
|
||||
Streams: []via.StreamConfig{{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
}},
|
||||
})
|
||||
```
|
||||
|
||||
@@ -87,23 +74,6 @@ Stop and restart the app - chat history survives.
|
||||
- Manual join/leave channels
|
||||
|
||||
**This example - ~60 lines of NATS integration:**
|
||||
- `embeddednats.New()` starts the server
|
||||
- `nc.Subscribe(subject, handler)` for receiving
|
||||
- `nc.Publish(subject, data)` for sending
|
||||
- NATS handles delivery, no polling
|
||||
|
||||
## Next Steps
|
||||
|
||||
If this pattern proves useful, it could be promoted to a Via plugin:
|
||||
|
||||
```go
|
||||
// Hypothetical future API
|
||||
v.Config(via.WithEmbeddedNATS("./data/nats"))
|
||||
|
||||
// In page init
|
||||
c.Subscribe("events.user.*", func(data []byte) {
|
||||
c.Sync()
|
||||
})
|
||||
|
||||
c.Publish("events.user.login", userData)
|
||||
```
|
||||
- `via.Subscribe(c, subject, handler)` for receiving
|
||||
- `via.Publish(c, subject, data)` for sending
|
||||
- Streams declared in `Options` — NATS handles delivery, no polling
|
||||
|
||||
@@ -21,18 +21,14 @@ func main() {
|
||||
DocumentTitle: "NATS Chat",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
Streams: []via.StreamConfig{{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
}},
|
||||
})
|
||||
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to ensure stream: %v", err)
|
||||
}
|
||||
|
||||
v.AppendToHead(
|
||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
||||
h.StyleEl(h.Raw(chatCSS)),
|
||||
@@ -76,6 +72,6 @@ func main() {
|
||||
protected := v.Group("", requireProfile)
|
||||
protected.Page("/", ChatPage)
|
||||
|
||||
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)")
|
||||
log.Println("Starting NATS chatroom on :7331")
|
||||
v.Start()
|
||||
}
|
||||
|
||||
@@ -53,18 +53,14 @@ func main() {
|
||||
DocumentTitle: "Bookmarks",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
Streams: []via.StreamConfig{{
|
||||
Name: "BOOKMARKS",
|
||||
Subjects: []string{"bookmarks.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
}},
|
||||
})
|
||||
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "BOOKMARKS",
|
||||
Subjects: []string{"bookmarks.>"},
|
||||
MaxMsgs: 1000,
|
||||
MaxAge: 24 * time.Hour,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to ensure stream: %v", err)
|
||||
}
|
||||
|
||||
v.AppendToHead(
|
||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
||||
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
||||
|
||||
10
nats.go
10
nats.go
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/delaneyj/toolbelt/embeddednats"
|
||||
natsserver "github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
@@ -50,7 +51,14 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
||||
ns, err := embeddednats.New(ctx,
|
||||
embeddednats.WithDirectory(dataDir),
|
||||
embeddednats.WithNATSServerOptions(&natsserver.Options{
|
||||
JetStream: true,
|
||||
StoreDir: dataDir,
|
||||
Port: -1,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
cancel()
|
||||
os.RemoveAll(dataDir)
|
||||
|
||||
37
via.go
37
via.go
@@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) {
|
||||
if cfg.ContextTTL != 0 {
|
||||
v.cfg.ContextTTL = cfg.ContextTTL
|
||||
}
|
||||
if cfg.Streams != nil {
|
||||
v.cfg.Streams = cfg.Streams
|
||||
}
|
||||
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
|
||||
v.actionRateLimit = cfg.ActionRateLimit
|
||||
}
|
||||
@@ -331,15 +334,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
||||
if c.sseConnected.Load() {
|
||||
continue
|
||||
}
|
||||
var disconnectedFor time.Duration
|
||||
if dc := c.sseDisconnectedAt.Load(); dc != nil {
|
||||
disconnectedFor = now.Sub(*dc)
|
||||
} else {
|
||||
disconnectedFor = now.Sub(c.createdAt)
|
||||
// Use the most recent liveness signal
|
||||
lastAlive := c.createdAt
|
||||
if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
|
||||
lastAlive = *dc
|
||||
}
|
||||
if disconnectedFor > ttl {
|
||||
if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
|
||||
lastAlive = *seen
|
||||
}
|
||||
silentFor := now.Sub(lastAlive)
|
||||
if silentFor > ttl {
|
||||
toReap = append(toReap, c)
|
||||
} else if disconnectedFor > suspendAfter && !c.suspended.Load() {
|
||||
} else if silentFor > suspendAfter && !c.suspended.Load() {
|
||||
toSuspend = append(toSuspend, c)
|
||||
}
|
||||
}
|
||||
@@ -368,6 +374,12 @@ func (v *V) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
for _, sc := range v.cfg.Streams {
|
||||
if err := EnsureStream(v, sc); err != nil {
|
||||
v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name)
|
||||
}
|
||||
}
|
||||
|
||||
handler := http.Handler(v.mux)
|
||||
if v.sessionManager != nil {
|
||||
handler = v.sessionManager.LoadAndSave(v.mux)
|
||||
@@ -655,6 +667,8 @@ func New() *V {
|
||||
return
|
||||
}
|
||||
c.reqCtx = r.Context()
|
||||
now := time.Now()
|
||||
c.lastSeenAt.Store(&now)
|
||||
|
||||
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
|
||||
|
||||
@@ -688,17 +702,22 @@ func New() *V {
|
||||
|
||||
go c.Sync()
|
||||
|
||||
keepalive := time.NewTicker(30 * time.Second)
|
||||
defer keepalive.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sse.Context().Done():
|
||||
v.logDebug(c, "SSE connection ended")
|
||||
c.sseConnected.Store(false)
|
||||
now := time.Now()
|
||||
c.sseDisconnectedAt.Store(&now)
|
||||
dcNow := time.Now()
|
||||
c.sseDisconnectedAt.Store(&dcNow)
|
||||
return
|
||||
case <-c.ctxDisposedChan:
|
||||
v.logDebug(c, "context disposed, closing SSE")
|
||||
return
|
||||
case <-keepalive.C:
|
||||
sse.PatchSignals([]byte("{}"))
|
||||
case patch := <-c.patchChan:
|
||||
switch patch.typ {
|
||||
case patchTypeElements:
|
||||
|
||||
71
via_test.go
71
via_test.go
@@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
|
||||
func TestReaperSuspendsContext(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("suspend-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
v.registerCtx(c)
|
||||
@@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
|
||||
func TestReaperReapsAfterTTL(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("reap-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||
dc := time.Now().Add(-2 * time.Hour)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
c.suspended.Store(true)
|
||||
@@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
|
||||
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("already-sus-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
c.suspended.Store(true)
|
||||
@@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
|
||||
assert.Error(t, err, "context should be removed after cleanup")
|
||||
}
|
||||
|
||||
func TestReaperRespectsLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("seen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
seen := time.Now().Add(-2 * time.Minute)
|
||||
c.lastSeenAt.Store(&seen)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
_, err := v.getCtx("seen-1")
|
||||
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
|
||||
assert.False(t, c.suspended.Load(), "context should not be suspended")
|
||||
}
|
||||
|
||||
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("noseen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
// no lastSeenAt set — should fall back to sseDisconnectedAt
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
got, err := v.getCtx("noseen-1")
|
||||
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
|
||||
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
|
||||
}
|
||||
|
||||
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("stale-seen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||
dc := time.Now().Add(-2 * time.Hour)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
// lastSeenAt is also old — beyond TTL
|
||||
seen := time.Now().Add(-90 * time.Minute)
|
||||
c.lastSeenAt.Store(&seen)
|
||||
c.suspended.Store(true)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
_, err := v.getCtx("stale-seen-1")
|
||||
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
|
||||
}
|
||||
|
||||
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("seen-sse-1", "/", v)
|
||||
v.registerCtx(c)
|
||||
|
||||
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
|
||||
|
||||
// Simulate what the SSE handler does after getCtx
|
||||
now := time.Now()
|
||||
c.lastSeenAt.Store(&now)
|
||||
|
||||
got := c.lastSeenAt.Load()
|
||||
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
|
||||
assert.WithinDuration(t, now, *got, time.Second)
|
||||
}
|
||||
|
||||
func TestDevModeRemovePersistedFix(t *testing.T) {
|
||||
v := New()
|
||||
v.cfg.DevMode = true
|
||||
|
||||
Reference in New Issue
Block a user