diff --git a/configuration.go b/configuration.go index d2ec955..ad9bb61 100644 --- a/configuration.go +++ b/configuration.go @@ -57,9 +57,14 @@ type Options struct { // embedded NATS backend, or supply any PubSub implementation. PubSub PubSub + // ContextSuspendAfter is the time a context may be disconnected before + // the reaper suspends it (frees page resources but keeps the context + // shell for seamless re-init on reconnect). Default: 15m. + ContextSuspendAfter time.Duration + // ContextTTL is the maximum time a context may exist without an SSE - // connection before the background reaper disposes it. - // Default: 30s. Negative value disables the reaper. + // connection before the background reaper fully disposes it. + // Default: 1h. Negative value disables the reaper. ContextTTL time.Duration // ActionRateLimit configures the default token-bucket rate limiter for diff --git a/context.go b/context.go index eb6d5e4..536a53a 100644 --- a/context.go +++ b/context.go @@ -42,6 +42,7 @@ type Context struct { createdAt time.Time sseConnected atomic.Bool sseDisconnectedAt atomic.Pointer[time.Time] + suspended atomic.Bool } // View defines the UI rendered by this context. @@ -400,6 +401,13 @@ func (c *Context) resetPageState() { c.mu.Unlock() } +// suspend frees page-scoped resources while keeping the context shell alive +// in the registry for seamless re-init on reconnect. +func (c *Context) suspend() { + c.resetPageState() + c.suspended.Store(true) +} + // Navigate performs an SPA navigation to the given path. It resets page state, // runs the target page's init function (with middleware), and pushes the new // view over the existing SSE connection with a view transition animation. diff --git a/via.go b/via.go index 03098fe..26b845b 100644 --- a/via.go +++ b/via.go @@ -139,6 +139,9 @@ func (v *V) Config(cfg Options) { v.defaultNATS = nil v.pubsub = cfg.PubSub } + if cfg.ContextSuspendAfter != 0 { + v.cfg.ContextSuspendAfter = cfg.ContextSuspendAfter + } if cfg.ContextTTL != 0 { v.cfg.ContextTTL = cfg.ContextTTL } @@ -292,9 +295,16 @@ func (v *V) startReaper() { return } if ttl == 0 { - ttl = 30 * time.Second + ttl = time.Hour } - interval := ttl / 3 + suspendAfter := v.cfg.ContextSuspendAfter + if suspendAfter == 0 { + suspendAfter = 15 * time.Minute + } + if suspendAfter > ttl { + suspendAfter = ttl + } + interval := suspendAfter / 3 if interval < 5*time.Second { interval = 5 * time.Second } @@ -307,35 +317,39 @@ func (v *V) startReaper() { case <-v.reaperStop: return case <-ticker.C: - v.reapOrphanedContexts(ttl) + v.reapOrphanedContexts(suspendAfter, ttl) } } }() } -func (v *V) reapOrphanedContexts(ttl time.Duration) { +func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) { now := time.Now() v.contextRegistryMutex.RLock() - var orphans []*Context + var toSuspend, toReap []*Context for _, c := range v.contextRegistry { if c.sseConnected.Load() { continue } + var disconnectedFor time.Duration if dc := c.sseDisconnectedAt.Load(); dc != nil { - // SSE was connected then dropped — reap if gone too long - if now.Sub(*dc) > ttl { - orphans = append(orphans, c) - } + disconnectedFor = now.Sub(*dc) } else { - // SSE never connected — reap based on creation time - if now.Sub(c.createdAt) > ttl { - orphans = append(orphans, c) - } + disconnectedFor = now.Sub(c.createdAt) + } + if disconnectedFor > ttl { + toReap = append(toReap, c) + } else if disconnectedFor > suspendAfter && !c.suspended.Load() { + toSuspend = append(toSuspend, c) } } v.contextRegistryMutex.RUnlock() - for _, c := range orphans { + for _, c := range toSuspend { + v.logInfo(c, "suspending context (no SSE connection after %s)", suspendAfter) + c.suspend() + } + for _, c := range toReap { v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl) v.cleanupCtx(c) } @@ -625,7 +639,9 @@ func New() *V { } c, err := v.getCtx(cID) if err != nil { - v.logErr(nil, "sse stream failed to start: %v", err) + v.logInfo(nil, "context expired, reloading client: %s", cID) + sse := datastar.NewSSE(w, r) + sse.ExecuteScript("window.location.reload()") return } c.reqCtx = r.Context() @@ -650,6 +666,16 @@ func New() *V { c.sseDisconnectedAt.Store(nil) v.logDebug(c, "SSE connection established") + if c.suspended.Load() { + c.navMu.Lock() + c.suspended.Store(false) + if initFn := v.pageRegistry[c.route]; initFn != nil { + v.logInfo(c, "resuming suspended context") + initFn(c) + } + c.navMu.Unlock() + } + go c.Sync() for { diff --git a/via_test.go b/via_test.go index 190eb14..8f22771 100644 --- a/via_test.go +++ b/via_test.go @@ -303,12 +303,63 @@ func TestReaperCleansOrphanedContexts(t *testing.T) { _, err := v.getCtx("orphan-1") assert.NoError(t, err) - v.reapOrphanedContexts(10 * time.Second) + v.reapOrphanedContexts(5*time.Second, 10*time.Second) _, err = v.getCtx("orphan-1") assert.Error(t, err, "orphaned context should have been reaped") } +func TestReaperSuspendsContext(t *testing.T) { + v := New() + c := newContext("suspend-1", "/", v) + dc := time.Now().Add(-20 * time.Minute) + c.sseDisconnectedAt.Store(&dc) + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + got, err := v.getCtx("suspend-1") + assert.NoError(t, err, "suspended context should still be in registry") + assert.True(t, got.suspended.Load(), "context should be marked suspended") +} + +func TestReaperReapsAfterTTL(t *testing.T) { + v := New() + c := newContext("reap-1", "/", v) + dc := time.Now().Add(-2 * time.Hour) + c.sseDisconnectedAt.Store(&dc) + c.suspended.Store(true) + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + _, err := v.getCtx("reap-1") + assert.Error(t, err, "context past TTL should have been reaped") +} + +func TestReaperIgnoresAlreadySuspended(t *testing.T) { + v := New() + c := newContext("already-sus-1", "/", v) + dc := time.Now().Add(-20 * time.Minute) + c.sseDisconnectedAt.Store(&dc) + c.suspended.Store(true) + // give it a fresh pageStopChan so we can verify it's not re-closed + c.pageStopChan = make(chan struct{}) + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + got, err := v.getCtx("already-sus-1") + assert.NoError(t, err, "already-suspended context within TTL should survive") + assert.True(t, got.suspended.Load()) + // pageStopChan should still be open (not re-suspended) + select { + case <-got.pageStopChan: + t.Fatal("pageStopChan was closed — context was re-suspended") + default: + } +} + func TestReaperIgnoresConnectedContexts(t *testing.T) { v := New() c := newContext("connected-1", "/", v) @@ -316,7 +367,7 @@ func TestReaperIgnoresConnectedContexts(t *testing.T) { c.sseConnected.Store(true) v.registerCtx(c) - v.reapOrphanedContexts(10 * time.Second) + v.reapOrphanedContexts(5*time.Second, 10*time.Second) _, err := v.getCtx("connected-1") assert.NoError(t, err, "connected context should survive reaping")