Compare commits
1 Commits
v0.21.3
...
b3c2d3ae32
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3c2d3ae32 |
@@ -1,8 +1,6 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/alexedwards/scs/v2"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
@@ -62,16 +60,6 @@ type Options struct {
|
||||
// the embedded NATS server. Ignored when a custom PubSub is configured.
|
||||
Streams []StreamConfig
|
||||
|
||||
// ContextSuspendAfter is the time a context may be disconnected before
|
||||
// the reaper suspends it (frees page resources but keeps the context
|
||||
// shell for seamless re-init on reconnect). Default: 15m.
|
||||
ContextSuspendAfter time.Duration
|
||||
|
||||
// ContextTTL is the maximum time a context may exist without an SSE
|
||||
// connection before the background reaper fully disposes it.
|
||||
// Default: 1h. Negative value disables the reaper.
|
||||
ContextTTL time.Duration
|
||||
|
||||
// ActionRateLimit configures the default token-bucket rate limiter for
|
||||
// action endpoints. Zero values use built-in defaults (10 req/s, burst 20).
|
||||
// Set Rate to -1 to disable rate limiting entirely.
|
||||
|
||||
@@ -42,8 +42,6 @@ type Context struct {
|
||||
createdAt time.Time
|
||||
sseConnected atomic.Bool
|
||||
sseDisconnectedAt atomic.Pointer[time.Time]
|
||||
lastSeenAt atomic.Pointer[time.Time]
|
||||
suspended atomic.Bool
|
||||
}
|
||||
|
||||
// View defines the UI rendered by this context.
|
||||
@@ -444,13 +442,6 @@ func (c *Context) resetPageState() {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// suspend frees page-scoped resources while keeping the context shell alive
|
||||
// in the registry for seamless re-init on reconnect.
|
||||
func (c *Context) suspend() {
|
||||
c.resetPageState()
|
||||
c.suspended.Store(true)
|
||||
}
|
||||
|
||||
// Navigate performs an SPA navigation to the given path. It resets page state,
|
||||
// runs the target page's init function (with middleware), and pushes the new
|
||||
// view over the existing SSE connection with a view transition animation.
|
||||
|
||||
@@ -66,7 +66,6 @@ v.Config(via.Options{
|
||||
Plugins: []via.Plugin{MyPlugin},
|
||||
SessionManager: sm,
|
||||
PubSub: customBackend,
|
||||
ContextTTL: 60 * time.Second,
|
||||
ActionRateLimit: via.RateLimitConfig{Rate: 20, Burst: 40},
|
||||
})
|
||||
```
|
||||
@@ -83,7 +82,6 @@ v.Config(via.Options{
|
||||
| `DatastarContent` | (embedded) | Custom Datastar JS bytes |
|
||||
| `DatastarPath` | `"/_datastar.js"` | URL path for the Datastar script |
|
||||
| `PubSub` | embedded NATS | Custom PubSub backend. Replaces the default NATS. See [PubSub and Sessions](pubsub-and-sessions.md) |
|
||||
| `ContextTTL` | `30s` | Max time a context survives without an SSE connection before cleanup. Negative value disables the reaper |
|
||||
| `ActionRateLimit` | `10 req/s, burst 20` | Default token-bucket rate limiter for action endpoints. Rate of `-1` disables limiting |
|
||||
|
||||
## Static Files
|
||||
|
||||
@@ -14,7 +14,7 @@ Browser hits page → new Context created → init function runs → HTML render
|
||||
action fires → signals injected from browser → handler runs → Sync() → DOM patched
|
||||
```
|
||||
|
||||
The context is disposed when the SSE connection closes (tab close, navigation away, network loss). A background reaper also cleans up contexts that never establish an SSE connection within `ContextTTL` (default 30s).
|
||||
The context lives until the browser tab closes (detected via a `beforeunload` beacon) or the server shuts down. There is no background reaper — contexts persist across temporary SSE disconnections so backgrounded tabs resume seamlessly.
|
||||
|
||||
During [SPA navigation](routing-and-navigation.md#spa-navigation), the context itself survives — only page-level state (signals, actions, fields, intervals, subscriptions) is reset. The SSE connection persists.
|
||||
|
||||
|
||||
93
via.go
93
via.go
@@ -58,7 +58,6 @@ type V struct {
|
||||
datastarPath string
|
||||
datastarContent []byte
|
||||
datastarOnce sync.Once
|
||||
reaperStop chan struct{}
|
||||
middleware []Middleware
|
||||
layout func(func() h.H) h.H
|
||||
}
|
||||
@@ -139,12 +138,6 @@ func (v *V) Config(cfg Options) {
|
||||
v.defaultNATS = nil
|
||||
v.pubsub = cfg.PubSub
|
||||
}
|
||||
if cfg.ContextSuspendAfter != 0 {
|
||||
v.cfg.ContextSuspendAfter = cfg.ContextSuspendAfter
|
||||
}
|
||||
if cfg.ContextTTL != 0 {
|
||||
v.cfg.ContextTTL = cfg.ContextTTL
|
||||
}
|
||||
if cfg.Streams != nil {
|
||||
v.cfg.Streams = cfg.Streams
|
||||
}
|
||||
@@ -292,75 +285,6 @@ func (v *V) getCtx(id string) (*Context, error) {
|
||||
return nil, fmt.Errorf("ctx '%s' not found", id)
|
||||
}
|
||||
|
||||
func (v *V) startReaper() {
|
||||
ttl := v.cfg.ContextTTL
|
||||
if ttl < 0 {
|
||||
return
|
||||
}
|
||||
if ttl == 0 {
|
||||
ttl = time.Hour
|
||||
}
|
||||
suspendAfter := v.cfg.ContextSuspendAfter
|
||||
if suspendAfter == 0 {
|
||||
suspendAfter = 15 * time.Minute
|
||||
}
|
||||
if suspendAfter > ttl {
|
||||
suspendAfter = ttl
|
||||
}
|
||||
interval := suspendAfter / 3
|
||||
if interval < 5*time.Second {
|
||||
interval = 5 * time.Second
|
||||
}
|
||||
v.reaperStop = make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-v.reaperStop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
v.reapOrphanedContexts(suspendAfter, ttl)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
||||
now := time.Now()
|
||||
v.contextRegistryMutex.RLock()
|
||||
var toSuspend, toReap []*Context
|
||||
for _, c := range v.contextRegistry {
|
||||
if c.sseConnected.Load() {
|
||||
continue
|
||||
}
|
||||
// Use the most recent liveness signal
|
||||
lastAlive := c.createdAt
|
||||
if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
|
||||
lastAlive = *dc
|
||||
}
|
||||
if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
|
||||
lastAlive = *seen
|
||||
}
|
||||
silentFor := now.Sub(lastAlive)
|
||||
if silentFor > ttl {
|
||||
toReap = append(toReap, c)
|
||||
} else if silentFor > suspendAfter && !c.suspended.Load() {
|
||||
toSuspend = append(toSuspend, c)
|
||||
}
|
||||
}
|
||||
v.contextRegistryMutex.RUnlock()
|
||||
|
||||
for _, c := range toSuspend {
|
||||
v.logInfo(c, "suspending context (no SSE connection after %s)", suspendAfter)
|
||||
c.suspend()
|
||||
}
|
||||
for _, c := range toReap {
|
||||
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
|
||||
v.cleanupCtx(c)
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
||||
// signal is received, then performs a graceful shutdown.
|
||||
func (v *V) Start() {
|
||||
@@ -389,8 +313,6 @@ func (v *V) Start() {
|
||||
Handler: handler,
|
||||
}
|
||||
|
||||
v.startReaper()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- v.server.ListenAndServe()
|
||||
@@ -417,9 +339,6 @@ func (v *V) Start() {
|
||||
// Shutdown gracefully shuts down the server and all contexts.
|
||||
// Safe for programmatic or test use.
|
||||
func (v *V) Shutdown() {
|
||||
if v.reaperStop != nil {
|
||||
close(v.reaperStop)
|
||||
}
|
||||
v.logInfo(nil, "draining all contexts")
|
||||
v.drainAllContexts()
|
||||
|
||||
@@ -667,8 +586,6 @@ func New() *V {
|
||||
return
|
||||
}
|
||||
c.reqCtx = r.Context()
|
||||
now := time.Now()
|
||||
c.lastSeenAt.Store(&now)
|
||||
|
||||
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
|
||||
|
||||
@@ -690,16 +607,6 @@ func New() *V {
|
||||
c.sseDisconnectedAt.Store(nil)
|
||||
v.logDebug(c, "SSE connection established")
|
||||
|
||||
if c.suspended.Load() {
|
||||
c.navMu.Lock()
|
||||
c.suspended.Store(false)
|
||||
if initFn := v.pageRegistry[c.route]; initFn != nil {
|
||||
v.logInfo(c, "resuming suspended context")
|
||||
initFn(c)
|
||||
}
|
||||
c.navMu.Unlock()
|
||||
}
|
||||
|
||||
go c.Sync()
|
||||
|
||||
keepalive := time.NewTicker(30 * time.Second)
|
||||
|
||||
157
via_test.go
157
via_test.go
@@ -400,95 +400,6 @@ func TestPage_PanicsOnNoView(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestReaperCleansOrphanedContexts(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("orphan-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-time.Minute) // created 1 min ago
|
||||
v.registerCtx(c)
|
||||
|
||||
_, err := v.getCtx("orphan-1")
|
||||
assert.NoError(t, err)
|
||||
|
||||
v.reapOrphanedContexts(5*time.Second, 10*time.Second)
|
||||
|
||||
_, err = v.getCtx("orphan-1")
|
||||
assert.Error(t, err, "orphaned context should have been reaped")
|
||||
}
|
||||
|
||||
func TestReaperSuspendsContext(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("suspend-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
got, err := v.getCtx("suspend-1")
|
||||
assert.NoError(t, err, "suspended context should still be in registry")
|
||||
assert.True(t, got.suspended.Load(), "context should be marked suspended")
|
||||
}
|
||||
|
||||
func TestReaperReapsAfterTTL(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("reap-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||
dc := time.Now().Add(-2 * time.Hour)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
c.suspended.Store(true)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
_, err := v.getCtx("reap-1")
|
||||
assert.Error(t, err, "context past TTL should have been reaped")
|
||||
}
|
||||
|
||||
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("already-sus-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
c.suspended.Store(true)
|
||||
// give it a fresh pageStopChan so we can verify it's not re-closed
|
||||
c.pageStopChan = make(chan struct{})
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
got, err := v.getCtx("already-sus-1")
|
||||
assert.NoError(t, err, "already-suspended context within TTL should survive")
|
||||
assert.True(t, got.suspended.Load())
|
||||
// pageStopChan should still be open (not re-suspended)
|
||||
select {
|
||||
case <-got.pageStopChan:
|
||||
t.Fatal("pageStopChan was closed — context was re-suspended")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestReaperIgnoresConnectedContexts(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("connected-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-time.Minute)
|
||||
c.sseConnected.Store(true)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(5*time.Second, 10*time.Second)
|
||||
|
||||
_, err := v.getCtx("connected-1")
|
||||
assert.NoError(t, err, "connected context should survive reaping")
|
||||
}
|
||||
|
||||
func TestReaperDisabledWithNegativeTTL(t *testing.T) {
|
||||
v := New()
|
||||
v.cfg.ContextTTL = -1
|
||||
v.startReaper()
|
||||
assert.Nil(t, v.reaperStop, "reaper should not start with negative TTL")
|
||||
}
|
||||
|
||||
func TestCleanupCtxIdempotent(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("idempotent-1", "/", v)
|
||||
@@ -503,74 +414,6 @@ func TestCleanupCtxIdempotent(t *testing.T) {
|
||||
assert.Error(t, err, "context should be removed after cleanup")
|
||||
}
|
||||
|
||||
func TestReaperRespectsLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("seen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
seen := time.Now().Add(-2 * time.Minute)
|
||||
c.lastSeenAt.Store(&seen)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
_, err := v.getCtx("seen-1")
|
||||
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
|
||||
assert.False(t, c.suspended.Load(), "context should not be suspended")
|
||||
}
|
||||
|
||||
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("noseen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||
dc := time.Now().Add(-20 * time.Minute)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
// no lastSeenAt set — should fall back to sseDisconnectedAt
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
got, err := v.getCtx("noseen-1")
|
||||
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
|
||||
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
|
||||
}
|
||||
|
||||
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("stale-seen-1", "/", v)
|
||||
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||
dc := time.Now().Add(-2 * time.Hour)
|
||||
c.sseDisconnectedAt.Store(&dc)
|
||||
// lastSeenAt is also old — beyond TTL
|
||||
seen := time.Now().Add(-90 * time.Minute)
|
||||
c.lastSeenAt.Store(&seen)
|
||||
c.suspended.Store(true)
|
||||
v.registerCtx(c)
|
||||
|
||||
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||
|
||||
_, err := v.getCtx("stale-seen-1")
|
||||
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
|
||||
}
|
||||
|
||||
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
|
||||
v := New()
|
||||
c := newContext("seen-sse-1", "/", v)
|
||||
v.registerCtx(c)
|
||||
|
||||
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
|
||||
|
||||
// Simulate what the SSE handler does after getCtx
|
||||
now := time.Now()
|
||||
c.lastSeenAt.Store(&now)
|
||||
|
||||
got := c.lastSeenAt.Load()
|
||||
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
|
||||
assert.WithinDuration(t, now, *got, time.Second)
|
||||
}
|
||||
|
||||
func TestDevModeRemovePersistedFix(t *testing.T) {
|
||||
v := New()
|
||||
v.cfg.DevMode = true
|
||||
|
||||
Reference in New Issue
Block a user