feat: three-tier context lifecycle (grace → suspended → reaped)
All checks were successful
CI / Build and Test (push) Successful in 1m22s
All checks were successful
CI / Build and Test (push) Successful in 1m22s
Contexts that lose their SSE connection now pass through a suspended state before being fully reaped. Suspended contexts keep their shell (ID, route, CSRF token) but free page resources. On reconnect, the page init function is re-run for a seamless resume. Contexts past the TTL trigger a client-side reload instead of a silent dead page. Configurable via ContextSuspendAfter (default 15m) and ContextTTL (default 1h).
This commit is contained in:
@@ -57,9 +57,14 @@ type Options struct {
|
|||||||
// embedded NATS backend, or supply any PubSub implementation.
|
// embedded NATS backend, or supply any PubSub implementation.
|
||||||
PubSub PubSub
|
PubSub PubSub
|
||||||
|
|
||||||
|
// ContextSuspendAfter is the time a context may be disconnected before
|
||||||
|
// the reaper suspends it (frees page resources but keeps the context
|
||||||
|
// shell for seamless re-init on reconnect). Default: 15m.
|
||||||
|
ContextSuspendAfter time.Duration
|
||||||
|
|
||||||
// ContextTTL is the maximum time a context may exist without an SSE
|
// ContextTTL is the maximum time a context may exist without an SSE
|
||||||
// connection before the background reaper disposes it.
|
// connection before the background reaper fully disposes it.
|
||||||
// Default: 30s. Negative value disables the reaper.
|
// Default: 1h. Negative value disables the reaper.
|
||||||
ContextTTL time.Duration
|
ContextTTL time.Duration
|
||||||
|
|
||||||
// ActionRateLimit configures the default token-bucket rate limiter for
|
// ActionRateLimit configures the default token-bucket rate limiter for
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ type Context struct {
|
|||||||
createdAt time.Time
|
createdAt time.Time
|
||||||
sseConnected atomic.Bool
|
sseConnected atomic.Bool
|
||||||
sseDisconnectedAt atomic.Pointer[time.Time]
|
sseDisconnectedAt atomic.Pointer[time.Time]
|
||||||
|
suspended atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// View defines the UI rendered by this context.
|
// View defines the UI rendered by this context.
|
||||||
@@ -400,6 +401,13 @@ func (c *Context) resetPageState() {
|
|||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// suspend frees page-scoped resources while keeping the context shell alive
|
||||||
|
// in the registry for seamless re-init on reconnect.
|
||||||
|
func (c *Context) suspend() {
|
||||||
|
c.resetPageState()
|
||||||
|
c.suspended.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
// Navigate performs an SPA navigation to the given path. It resets page state,
|
// Navigate performs an SPA navigation to the given path. It resets page state,
|
||||||
// runs the target page's init function (with middleware), and pushes the new
|
// runs the target page's init function (with middleware), and pushes the new
|
||||||
// view over the existing SSE connection with a view transition animation.
|
// view over the existing SSE connection with a view transition animation.
|
||||||
|
|||||||
54
via.go
54
via.go
@@ -139,6 +139,9 @@ func (v *V) Config(cfg Options) {
|
|||||||
v.defaultNATS = nil
|
v.defaultNATS = nil
|
||||||
v.pubsub = cfg.PubSub
|
v.pubsub = cfg.PubSub
|
||||||
}
|
}
|
||||||
|
if cfg.ContextSuspendAfter != 0 {
|
||||||
|
v.cfg.ContextSuspendAfter = cfg.ContextSuspendAfter
|
||||||
|
}
|
||||||
if cfg.ContextTTL != 0 {
|
if cfg.ContextTTL != 0 {
|
||||||
v.cfg.ContextTTL = cfg.ContextTTL
|
v.cfg.ContextTTL = cfg.ContextTTL
|
||||||
}
|
}
|
||||||
@@ -292,9 +295,16 @@ func (v *V) startReaper() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ttl == 0 {
|
if ttl == 0 {
|
||||||
ttl = 30 * time.Second
|
ttl = time.Hour
|
||||||
}
|
}
|
||||||
interval := ttl / 3
|
suspendAfter := v.cfg.ContextSuspendAfter
|
||||||
|
if suspendAfter == 0 {
|
||||||
|
suspendAfter = 15 * time.Minute
|
||||||
|
}
|
||||||
|
if suspendAfter > ttl {
|
||||||
|
suspendAfter = ttl
|
||||||
|
}
|
||||||
|
interval := suspendAfter / 3
|
||||||
if interval < 5*time.Second {
|
if interval < 5*time.Second {
|
||||||
interval = 5 * time.Second
|
interval = 5 * time.Second
|
||||||
}
|
}
|
||||||
@@ -307,35 +317,39 @@ func (v *V) startReaper() {
|
|||||||
case <-v.reaperStop:
|
case <-v.reaperStop:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
v.reapOrphanedContexts(ttl)
|
v.reapOrphanedContexts(suspendAfter, ttl)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *V) reapOrphanedContexts(ttl time.Duration) {
|
func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
v.contextRegistryMutex.RLock()
|
v.contextRegistryMutex.RLock()
|
||||||
var orphans []*Context
|
var toSuspend, toReap []*Context
|
||||||
for _, c := range v.contextRegistry {
|
for _, c := range v.contextRegistry {
|
||||||
if c.sseConnected.Load() {
|
if c.sseConnected.Load() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
var disconnectedFor time.Duration
|
||||||
if dc := c.sseDisconnectedAt.Load(); dc != nil {
|
if dc := c.sseDisconnectedAt.Load(); dc != nil {
|
||||||
// SSE was connected then dropped — reap if gone too long
|
disconnectedFor = now.Sub(*dc)
|
||||||
if now.Sub(*dc) > ttl {
|
|
||||||
orphans = append(orphans, c)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// SSE never connected — reap based on creation time
|
disconnectedFor = now.Sub(c.createdAt)
|
||||||
if now.Sub(c.createdAt) > ttl {
|
|
||||||
orphans = append(orphans, c)
|
|
||||||
}
|
}
|
||||||
|
if disconnectedFor > ttl {
|
||||||
|
toReap = append(toReap, c)
|
||||||
|
} else if disconnectedFor > suspendAfter && !c.suspended.Load() {
|
||||||
|
toSuspend = append(toSuspend, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
v.contextRegistryMutex.RUnlock()
|
v.contextRegistryMutex.RUnlock()
|
||||||
|
|
||||||
for _, c := range orphans {
|
for _, c := range toSuspend {
|
||||||
|
v.logInfo(c, "suspending context (no SSE connection after %s)", suspendAfter)
|
||||||
|
c.suspend()
|
||||||
|
}
|
||||||
|
for _, c := range toReap {
|
||||||
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
|
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
|
||||||
v.cleanupCtx(c)
|
v.cleanupCtx(c)
|
||||||
}
|
}
|
||||||
@@ -625,7 +639,9 @@ func New() *V {
|
|||||||
}
|
}
|
||||||
c, err := v.getCtx(cID)
|
c, err := v.getCtx(cID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.logErr(nil, "sse stream failed to start: %v", err)
|
v.logInfo(nil, "context expired, reloading client: %s", cID)
|
||||||
|
sse := datastar.NewSSE(w, r)
|
||||||
|
sse.ExecuteScript("window.location.reload()")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.reqCtx = r.Context()
|
c.reqCtx = r.Context()
|
||||||
@@ -650,6 +666,16 @@ func New() *V {
|
|||||||
c.sseDisconnectedAt.Store(nil)
|
c.sseDisconnectedAt.Store(nil)
|
||||||
v.logDebug(c, "SSE connection established")
|
v.logDebug(c, "SSE connection established")
|
||||||
|
|
||||||
|
if c.suspended.Load() {
|
||||||
|
c.navMu.Lock()
|
||||||
|
c.suspended.Store(false)
|
||||||
|
if initFn := v.pageRegistry[c.route]; initFn != nil {
|
||||||
|
v.logInfo(c, "resuming suspended context")
|
||||||
|
initFn(c)
|
||||||
|
}
|
||||||
|
c.navMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
go c.Sync()
|
go c.Sync()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|||||||
55
via_test.go
55
via_test.go
@@ -303,12 +303,63 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
|
|||||||
_, err := v.getCtx("orphan-1")
|
_, err := v.getCtx("orphan-1")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
v.reapOrphanedContexts(10 * time.Second)
|
v.reapOrphanedContexts(5*time.Second, 10*time.Second)
|
||||||
|
|
||||||
_, err = v.getCtx("orphan-1")
|
_, err = v.getCtx("orphan-1")
|
||||||
assert.Error(t, err, "orphaned context should have been reaped")
|
assert.Error(t, err, "orphaned context should have been reaped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReaperSuspendsContext(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("suspend-1", "/", v)
|
||||||
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
got, err := v.getCtx("suspend-1")
|
||||||
|
assert.NoError(t, err, "suspended context should still be in registry")
|
||||||
|
assert.True(t, got.suspended.Load(), "context should be marked suspended")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperReapsAfterTTL(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("reap-1", "/", v)
|
||||||
|
dc := time.Now().Add(-2 * time.Hour)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
c.suspended.Store(true)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
_, err := v.getCtx("reap-1")
|
||||||
|
assert.Error(t, err, "context past TTL should have been reaped")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("already-sus-1", "/", v)
|
||||||
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
c.suspended.Store(true)
|
||||||
|
// give it a fresh pageStopChan so we can verify it's not re-closed
|
||||||
|
c.pageStopChan = make(chan struct{})
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
got, err := v.getCtx("already-sus-1")
|
||||||
|
assert.NoError(t, err, "already-suspended context within TTL should survive")
|
||||||
|
assert.True(t, got.suspended.Load())
|
||||||
|
// pageStopChan should still be open (not re-suspended)
|
||||||
|
select {
|
||||||
|
case <-got.pageStopChan:
|
||||||
|
t.Fatal("pageStopChan was closed — context was re-suspended")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReaperIgnoresConnectedContexts(t *testing.T) {
|
func TestReaperIgnoresConnectedContexts(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
c := newContext("connected-1", "/", v)
|
c := newContext("connected-1", "/", v)
|
||||||
@@ -316,7 +367,7 @@ func TestReaperIgnoresConnectedContexts(t *testing.T) {
|
|||||||
c.sseConnected.Store(true)
|
c.sseConnected.Store(true)
|
||||||
v.registerCtx(c)
|
v.registerCtx(c)
|
||||||
|
|
||||||
v.reapOrphanedContexts(10 * time.Second)
|
v.reapOrphanedContexts(5*time.Second, 10*time.Second)
|
||||||
|
|
||||||
_, err := v.getCtx("connected-1")
|
_, err := v.getCtx("connected-1")
|
||||||
assert.NoError(t, err, "connected context should survive reaping")
|
assert.NoError(t, err, "connected context should survive reaping")
|
||||||
|
|||||||
Reference in New Issue
Block a user