5 Commits

Author SHA1 Message Date
Ryan Hamamura
60009124c9 feat: add declarative Options.Streams for automatic JetStream stream creation
Some checks failed
CI / Build and Test (push) Has been cancelled
Streams listed in Options.Streams are created by Start() when the
embedded NATS server initializes, replacing manual EnsureStream calls
during setup. Migrates nats-chatroom and pubsub-crud examples.
2026-02-19 12:24:44 -10:00
Ryan Hamamura
42b21348cb fix: use random port for embedded NATS to avoid binding conflicts
Port 0 is treated as default (4222) by NATS server, causing hangs when
that port is unavailable. Port -1 (RANDOM_PORT) binds to an OS-assigned
free port, which is correct for an embedded server.
2026-02-19 12:24:37 -10:00
Ryan Hamamura
58ad9a2699 feat: add SSE keepalive and liveness tracking for resilient connections
Some checks failed
CI / Build and Test (push) Has been cancelled
Add 30s keepalive pings to prevent proxy/CDN idle timeouts from killing
SSE connections silently. Track lastSeenAt on each SSE connect attempt
so Datastar's retry signals keep contexts alive through the reaper.
2026-02-19 12:07:25 -10:00
Ryan Hamamura
f3a9c8036f refactor: use computed signals in pubsub-crud and chatroom examples
Some checks failed
CI / Build and Test (push) Has been cancelled
2026-02-19 09:03:13 -10:00
Ryan Hamamura
6763e1a420 feat: add computed signals for derived reactive values
All checks were successful
CI / Build and Test (push) Successful in 33s
Read-only signals whose value is a function of other signals,
recomputed automatically at sync time. Supports String, Int, Bool,
and Text methods. Components store computed signals on the parent
page context like regular signals.
2026-02-18 09:22:40 -10:00
12 changed files with 464 additions and 97 deletions

55
computed.go Normal file
View File

@@ -0,0 +1,55 @@
package via
import (
"fmt"
"strconv"
"strings"
"github.com/ryanhamamura/via/h"
)
// computedSignal is a read-only signal whose value is derived from other signals.
// It recomputes on every read and is included in patches only when the value changes.
type computedSignal struct {
id string
compute func() string
lastVal string
changed bool
}
func (s *computedSignal) ID() string {
return s.id
}
func (s *computedSignal) String() string {
return s.compute()
}
func (s *computedSignal) Int() int {
if n, err := strconv.Atoi(s.String()); err == nil {
return n
}
return 0
}
func (s *computedSignal) Bool() bool {
val := strings.ToLower(s.String())
return val == "true" || val == "1" || val == "yes" || val == "on"
}
func (s *computedSignal) Text() h.H {
return h.Span(h.Data("text", "$"+s.id))
}
// recompute calls the compute function and sets changed if the value differs from lastVal.
func (s *computedSignal) recompute() {
val := s.compute()
if val != s.lastVal {
s.lastVal = val
s.changed = true
}
}
func (s *computedSignal) patchValue() string {
return fmt.Sprintf("%v", s.lastVal)
}

190
computed_test.go Normal file
View File

@@ -0,0 +1,190 @@
package via
import (
"bytes"
"fmt"
"testing"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
)
func TestComputedBasic(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig1 := c.Signal("hello")
sig2 := c.Signal("world")
cs = c.Computed(func() string {
return sig1.String() + " " + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello world", cs.String())
}
func TestComputedReactivity(t *testing.T) {
v := New()
var cs *computedSignal
var sig1 *signal
v.Page("/", func(c *Context) {
sig1 = c.Signal("a")
sig2 := c.Signal("b")
cs = c.Computed(func() string {
return sig1.String() + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "ab", cs.String())
sig1.SetValue("x")
assert.Equal(t, "xb", cs.String())
}
func TestComputedInt(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal(21)
cs = c.Computed(func() string {
return fmt.Sprintf("%d", sig.Int()*2)
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, 42, cs.Int())
}
func TestComputedBool(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal("true")
cs = c.Computed(func() string {
return sig.String()
})
c.View(func() h.H { return h.Div() })
})
assert.True(t, cs.Bool())
}
func TestComputedText(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
cs = c.Computed(func() string { return "hi" })
c.View(func() h.H { return h.Div() })
})
var buf bytes.Buffer
err := cs.Text().Render(&buf)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `data-text="$`+cs.ID()+`"`)
}
func TestComputedChangeDetection(t *testing.T) {
v := New()
var ctx *Context
var sig *signal
v.Page("/", func(c *Context) {
ctx = c
sig = c.Signal("a")
c.Computed(func() string {
return sig.String() + "!"
})
c.View(func() h.H { return h.Div() })
})
// First patch includes computed (changed=true from init)
patch1 := ctx.prepareSignalsForPatch()
assert.NotEmpty(t, patch1)
// Second patch: nothing changed, computed should not be included
patch2 := ctx.prepareSignalsForPatch()
// Regular signal still has changed=true (not reset in prepareSignalsForPatch),
// but computed should not appear since its value didn't change.
hasComputed := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
_, inPatch := patch2[cs.ID()]
hasComputed = inPatch
}
return true
})
assert.False(t, hasComputed)
// After changing dependency, computed should reappear
sig.SetValue("b")
patch3 := ctx.prepareSignalsForPatch()
found := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
if v, ok := patch3[cs.ID()]; ok {
assert.Equal(t, "b!", v)
found = true
}
}
return true
})
assert.True(t, found)
}
func TestComputedInComponent(t *testing.T) {
v := New()
var cs *computedSignal
var parentCtx *Context
v.Page("/", func(c *Context) {
parentCtx = c
c.Component(func(comp *Context) {
sig := comp.Signal("via")
cs = comp.Computed(func() string {
return "hello " + sig.String()
})
comp.View(func() h.H { return h.Div() })
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello via", cs.String())
// Verify it's stored on the parent page context
found := false
parentCtx.signals.Range(func(_, value any) bool {
if stored, ok := value.(*computedSignal); ok && stored.ID() == cs.ID() {
found = true
}
return true
})
assert.True(t, found)
}
func TestComputedIsReadOnly(t *testing.T) {
// Compile-time guarantee: *computedSignal has no Bind() or SetValue() methods.
// This test exists as documentation — if someone adds those methods, the
// interface assertion below will need updating and serve as a reminder.
var cs interface{} = &computedSignal{}
type writable interface {
SetValue(any)
}
type bindable interface {
Bind() h.H
}
_, isWritable := cs.(writable)
_, isBindable := cs.(bindable)
assert.False(t, isWritable, "computedSignal must not have SetValue")
assert.False(t, isBindable, "computedSignal must not have Bind")
}
func TestComputedInjectSignalsSkips(t *testing.T) {
v := New()
var ctx *Context
var cs *computedSignal
v.Page("/", func(c *Context) {
ctx = c
cs = c.Computed(func() string { return "fixed" })
c.View(func() h.H { return h.Div() })
})
// Simulate browser sending back a value for the computed signal — should be ignored
ctx.injectSignals(map[string]any{
cs.ID(): "injected",
})
assert.Equal(t, "fixed", cs.String())
}

View File

@@ -53,10 +53,15 @@ type Options struct {
// Defaults to "/_datastar.js" if empty. // Defaults to "/_datastar.js" if empty.
DatastarPath string DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an // PubSub enables publish/subscribe messaging. When nil, an embedded NATS
// embedded NATS backend, or supply any PubSub implementation. // server starts automatically in Start(). Supply any PubSub implementation
// to replace it.
PubSub PubSub PubSub PubSub
// Streams declares JetStream streams to create when Start() initializes
// the embedded NATS server. Ignored when a custom PubSub is configured.
Streams []StreamConfig
// ContextSuspendAfter is the time a context may be disconnected before // ContextSuspendAfter is the time a context may be disconnected before
// the reaper suspends it (frees page resources but keeps the context // the reaper suspends it (frees page resources but keeps the context
// shell for seamless re-init on reconnect). Default: 15m. // shell for seamless re-init on reconnect). Default: 15m.

View File

@@ -42,6 +42,7 @@ type Context struct {
createdAt time.Time createdAt time.Time
sseConnected atomic.Bool sseConnected atomic.Bool
sseDisconnectedAt atomic.Pointer[time.Time] sseDisconnectedAt atomic.Pointer[time.Time]
lastSeenAt atomic.Pointer[time.Time]
suspended atomic.Bool suspended atomic.Bool
} }
@@ -207,6 +208,40 @@ func (c *Context) Signal(v any) *signal {
} }
// Computed creates a read-only signal whose value is derived from the given function.
// The function is called on every read (String/Int/Bool) for fresh values,
// and during sync to detect changes for browser patches.
//
// Computed signals cannot be bound to inputs or set manually.
//
// Example:
//
// full := c.Computed(func() string {
// return first.String() + " " + last.String()
// })
// c.View(func() h.H {
// return h.Span(full.Text())
// })
func (c *Context) Computed(fn func() string) *computedSignal {
sigID := genRandID()
initial := fn()
cs := &computedSignal{
id: sigID,
compute: fn,
lastVal: initial,
changed: true,
}
c.mu.Lock()
defer c.mu.Unlock()
if c.isComponent() {
c.parentPageCtx.signals.Store(sigID, cs)
} else {
c.signals.Store(sigID, cs)
}
return cs
}
func (c *Context) injectSignals(sigs map[string]any) { func (c *Context) injectSignals(sigs map[string]any) {
if sigs == nil { if sigs == nil {
c.app.logErr(c, "signal injection failed: nil signals") c.app.logErr(c, "signal injection failed: nil signals")
@@ -248,7 +283,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
defer c.mu.RUnlock() defer c.mu.RUnlock()
updatedSigs := make(map[string]any) updatedSigs := make(map[string]any)
c.signals.Range(func(sigID, value any) bool { c.signals.Range(func(sigID, value any) bool {
if sig, ok := value.(*signal); ok { switch sig := value.(type) {
case *signal:
if sig.err != nil { if sig.err != nil {
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err) c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
return true return true
@@ -256,6 +292,12 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
if sig.changed { if sig.changed {
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val) updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val)
} }
case *computedSignal:
sig.recompute()
if sig.changed {
updatedSigs[sigID.(string)] = sig.patchValue()
sig.changed = false
}
} }
return true return true
}) })

View File

@@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state.
## PubSub ## PubSub
Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box. Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box.
### Interface ### Interface
@@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages. NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
### Ensure a stream exists ### Declaring streams
The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server:
```go ```go
err := via.EnsureStream(v, via.StreamConfig{ v.Config(via.Options{
Streams: []via.StreamConfig{{
Name: "CHAT", Name: "CHAT",
Subjects: []string{"chat.>"}, Subjects: []string{"chat.>"},
MaxMsgs: 1000, MaxMsgs: 1000,
MaxAge: 24 * time.Hour, MaxAge: 24 * time.Hour,
}},
}) })
``` ```
@@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{
| `MaxMsgs` | Maximum number of messages to retain | | `MaxMsgs` | Maximum number of messages to retain |
| `MaxAge` | Maximum age before messages are discarded | | `MaxAge` | Maximum age before messages are discarded |
Call `EnsureStream` during app initialization, before `v.Start()`. For dynamic stream creation after startup, `EnsureStream` is also available:
```go
err := via.EnsureStream(v, via.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
MaxMsgs: 500,
MaxAge: 12 * time.Hour,
})
```
### Replay history ### Replay history

View File

@@ -2,7 +2,7 @@
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`. A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required. Via includes an embedded NATS server that starts automatically no external server required.
## Key Differences from Original Chatroom ## Key Differences from Original Chatroom
@@ -25,21 +25,6 @@ That's it. No separate NATS server needed.
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients. Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
## How Embedded NATS Works
```go
// Start embedded NATS server (JetStream enabled by default)
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
ns.WaitForServer()
// Get client connection to embedded server
nc, err := ns.Client()
```
Data is persisted to `./data/nats/` for JetStream durability.
## Architecture ## Architecture
``` ```
@@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability.
## JetStream Durability ## JetStream Durability
Messages persist to disk via JetStream: Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server:
```go ```go
js.AddStream(&nats.StreamConfig{ v.Config(via.Options{
Streams: []via.StreamConfig{{
Name: "CHAT", Name: "CHAT",
Subjects: []string{"chat.>"}, Subjects: []string{"chat.>"},
MaxMsgs: 1000, // Keep last 1000 messages MaxMsgs: 1000,
MaxAge: 24 * time.Hour, MaxAge: 24 * time.Hour,
}},
}) })
``` ```
@@ -87,23 +74,6 @@ Stop and restart the app - chat history survives.
- Manual join/leave channels - Manual join/leave channels
**This example - ~60 lines of NATS integration:** **This example - ~60 lines of NATS integration:**
- `embeddednats.New()` starts the server - `via.Subscribe(c, subject, handler)` for receiving
- `nc.Subscribe(subject, handler)` for receiving - `via.Publish(c, subject, data)` for sending
- `nc.Publish(subject, data)` for sending - Streams declared in `Options` — NATS handles delivery, no polling
- NATS handles delivery, no polling
## Next Steps
If this pattern proves useful, it could be promoted to a Via plugin:
```go
// Hypothetical future API
v.Config(via.WithEmbeddedNATS("./data/nats"))
// In page init
c.Subscribe("events.user.*", func(data []byte) {
c.Sync()
})
c.Publish("events.user.login", userData)
```

View File

@@ -21,17 +21,13 @@ func main() {
DocumentTitle: "NATS Chat", DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo, LogLevel: via.LogLevelInfo,
ServerAddress: ":7331", ServerAddress: ":7331",
}) Streams: []via.StreamConfig{{
err := via.EnsureStream(v, via.StreamConfig{
Name: "CHAT", Name: "CHAT",
Subjects: []string{"chat.>"}, Subjects: []string{"chat.>"},
MaxMsgs: 1000, MaxMsgs: 1000,
MaxAge: 24 * time.Hour, MaxAge: 24 * time.Hour,
}},
}) })
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
@@ -76,6 +72,6 @@ func main() {
protected := v.Group("", requireProfile) protected := v.Group("", requireProfile)
protected.Page("/", ChatPage) protected.Page("/", ChatPage)
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)") log.Println("Starting NATS chatroom on :7331")
v.Start() v.Start()
} }

View File

@@ -18,6 +18,12 @@ func ProfilePage(c *via.Context) {
via.MaxLen(20, "Must be at most 20 characters"), via.MaxLen(20, "Must be at most 20 characters"),
) )
selectedEmoji := c.Signal(existingEmoji) selectedEmoji := c.Signal(existingEmoji)
previewName := c.Computed(func() string {
if name := nameField.String(); name != "" {
return name
}
return "Your Name"
})
saveToSession := func() bool { saveToSession := func() bool {
if !c.ValidateAll() { if !c.ValidateAll() {
@@ -68,18 +74,13 @@ func ProfilePage(c *via.Context) {
h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()), h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()),
) )
previewName := nameField.String()
if previewName == "" {
previewName = "Your Name"
}
return h.Div(h.Class("profile-page"), return h.Div(h.Class("profile-page"),
h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")), h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")),
// Live preview // Live preview
h.Div(h.Class("profile-preview"), h.Div(h.Class("profile-preview"),
h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())), h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())),
h.Span(h.Class("preview-name"), h.Text(previewName)), h.Span(h.Class("preview-name"), previewName.Text()),
), ),
h.Div(h.Class("profile-form"), h.Div(h.Class("profile-form"),

View File

@@ -53,17 +53,13 @@ func main() {
DocumentTitle: "Bookmarks", DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo, LogLevel: via.LogLevelInfo,
ServerAddress: ":7331", ServerAddress: ":7331",
}) Streams: []via.StreamConfig{{
err := via.EnsureStream(v, via.StreamConfig{
Name: "BOOKMARKS", Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"}, Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000, MaxMsgs: 1000,
MaxAge: 24 * time.Hour, MaxAge: 24 * time.Hour,
}},
}) })
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
@@ -76,6 +72,12 @@ func main() {
titleSignal := c.Signal("") titleSignal := c.Signal("")
urlSignal := c.Signal("") urlSignal := c.Signal("")
targetIDSignal := c.Signal("") targetIDSignal := c.Signal("")
saveLabel := c.Computed(func() string {
if targetIDSignal.String() != "" {
return "Update Bookmark"
}
return "Add Bookmark"
})
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) { via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
if evt.UserID == userID { if evt.UserID == userID {
@@ -205,11 +207,6 @@ func main() {
} }
bookmarksMu.RUnlock() bookmarksMu.RUnlock()
saveLabel := "Add Bookmark"
if isEditing {
saveLabel = "Update Bookmark"
}
return h.Div(h.Class("min-h-screen bg-base-200"), return h.Div(h.Class("min-h-screen bg-base-200"),
// Navbar // Navbar
h.Div(h.Class("navbar bg-base-100 shadow-sm"), h.Div(h.Class("navbar bg-base-100 shadow-sm"),
@@ -225,7 +222,7 @@ func main() {
// Form card // Form card
h.Div(h.Class("card bg-base-100 shadow"), h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"), h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text(saveLabel)), h.H2(h.Class("card-title"), saveLabel.Text()),
h.Div(h.Class("flex flex-col gap-2"), h.Div(h.Class("flex flex-col gap-2"),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()), h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()), h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
@@ -233,7 +230,7 @@ func main() {
h.If(isEditing, h.If(isEditing,
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()), h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
), ),
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()), h.Button(h.Class("btn btn-primary"), saveLabel.Text(), save.OnClick()),
), ),
), ),
), ),

10
nats.go
View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/delaneyj/toolbelt/embeddednats" "github.com/delaneyj/toolbelt/embeddednats"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
@@ -50,7 +51,14 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir)) ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory(dataDir),
embeddednats.WithNATSServerOptions(&natsserver.Options{
JetStream: true,
StoreDir: dataDir,
Port: -1,
}),
)
if err != nil { if err != nil {
cancel() cancel()
os.RemoveAll(dataDir) os.RemoveAll(dataDir)

37
via.go
View File

@@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) {
if cfg.ContextTTL != 0 { if cfg.ContextTTL != 0 {
v.cfg.ContextTTL = cfg.ContextTTL v.cfg.ContextTTL = cfg.ContextTTL
} }
if cfg.Streams != nil {
v.cfg.Streams = cfg.Streams
}
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 { if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
v.actionRateLimit = cfg.ActionRateLimit v.actionRateLimit = cfg.ActionRateLimit
} }
@@ -331,15 +334,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
if c.sseConnected.Load() { if c.sseConnected.Load() {
continue continue
} }
var disconnectedFor time.Duration // Use the most recent liveness signal
if dc := c.sseDisconnectedAt.Load(); dc != nil { lastAlive := c.createdAt
disconnectedFor = now.Sub(*dc) if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
} else { lastAlive = *dc
disconnectedFor = now.Sub(c.createdAt)
} }
if disconnectedFor > ttl { if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
lastAlive = *seen
}
silentFor := now.Sub(lastAlive)
if silentFor > ttl {
toReap = append(toReap, c) toReap = append(toReap, c)
} else if disconnectedFor > suspendAfter && !c.suspended.Load() { } else if silentFor > suspendAfter && !c.suspended.Load() {
toSuspend = append(toSuspend, c) toSuspend = append(toSuspend, c)
} }
} }
@@ -368,6 +374,12 @@ func (v *V) Start() {
} }
} }
for _, sc := range v.cfg.Streams {
if err := EnsureStream(v, sc); err != nil {
v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name)
}
}
handler := http.Handler(v.mux) handler := http.Handler(v.mux)
if v.sessionManager != nil { if v.sessionManager != nil {
handler = v.sessionManager.LoadAndSave(v.mux) handler = v.sessionManager.LoadAndSave(v.mux)
@@ -655,6 +667,8 @@ func New() *V {
return return
} }
c.reqCtx = r.Context() c.reqCtx = r.Context()
now := time.Now()
c.lastSeenAt.Store(&now)
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5)))) sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
@@ -688,17 +702,22 @@ func New() *V {
go c.Sync() go c.Sync()
keepalive := time.NewTicker(30 * time.Second)
defer keepalive.Stop()
for { for {
select { select {
case <-sse.Context().Done(): case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended") v.logDebug(c, "SSE connection ended")
c.sseConnected.Store(false) c.sseConnected.Store(false)
now := time.Now() dcNow := time.Now()
c.sseDisconnectedAt.Store(&now) c.sseDisconnectedAt.Store(&dcNow)
return return
case <-c.ctxDisposedChan: case <-c.ctxDisposedChan:
v.logDebug(c, "context disposed, closing SSE") v.logDebug(c, "context disposed, closing SSE")
return return
case <-keepalive.C:
sse.PatchSignals([]byte("{}"))
case patch := <-c.patchChan: case patch := <-c.patchChan:
switch patch.typ { switch patch.typ {
case patchTypeElements: case patchTypeElements:

View File

@@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
func TestReaperSuspendsContext(t *testing.T) { func TestReaperSuspendsContext(t *testing.T) {
v := New() v := New()
c := newContext("suspend-1", "/", v) c := newContext("suspend-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
v.registerCtx(c) v.registerCtx(c)
@@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
func TestReaperReapsAfterTTL(t *testing.T) { func TestReaperReapsAfterTTL(t *testing.T) {
v := New() v := New()
c := newContext("reap-1", "/", v) c := newContext("reap-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour) dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
func TestReaperIgnoresAlreadySuspended(t *testing.T) { func TestReaperIgnoresAlreadySuspended(t *testing.T) {
v := New() v := New()
c := newContext("already-sus-1", "/", v) c := newContext("already-sus-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
assert.Error(t, err, "context should be removed after cleanup") assert.Error(t, err, "context should be removed after cleanup")
} }
func TestReaperRespectsLastSeenAt(t *testing.T) {
v := New()
c := newContext("seen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
seen := time.Now().Add(-2 * time.Minute)
c.lastSeenAt.Store(&seen)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("seen-1")
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
assert.False(t, c.suspended.Load(), "context should not be suspended")
}
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
v := New()
c := newContext("noseen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
// no lastSeenAt set — should fall back to sseDisconnectedAt
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
got, err := v.getCtx("noseen-1")
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
}
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
v := New()
c := newContext("stale-seen-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc)
// lastSeenAt is also old — beyond TTL
seen := time.Now().Add(-90 * time.Minute)
c.lastSeenAt.Store(&seen)
c.suspended.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("stale-seen-1")
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
}
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
v := New()
c := newContext("seen-sse-1", "/", v)
v.registerCtx(c)
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
// Simulate what the SSE handler does after getCtx
now := time.Now()
c.lastSeenAt.Store(&now)
got := c.lastSeenAt.Load()
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
assert.WithinDuration(t, now, *got, time.Second)
}
func TestDevModeRemovePersistedFix(t *testing.T) { func TestDevModeRemovePersistedFix(t *testing.T) {
v := New() v := New()
v.cfg.DevMode = true v.cfg.DevMode = true