From 58ad9a2699a435274c84337c3230797e4c8ba714 Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Thu, 19 Feb 2026 12:07:25 -1000 Subject: [PATCH] feat: add SSE keepalive and liveness tracking for resilient connections Add 30s keepalive pings to prevent proxy/CDN idle timeouts from killing SSE connections silently. Track lastSeenAt on each SSE connect attempt so Datastar's retry signals keep contexts alive through the reaper. --- context.go | 1 + via.go | 28 ++++++++++++++------- via_test.go | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 9 deletions(-) diff --git a/context.go b/context.go index ac8e167..b143a25 100644 --- a/context.go +++ b/context.go @@ -42,6 +42,7 @@ type Context struct { createdAt time.Time sseConnected atomic.Bool sseDisconnectedAt atomic.Pointer[time.Time] + lastSeenAt atomic.Pointer[time.Time] suspended atomic.Bool } diff --git a/via.go b/via.go index b2e1568..eb62f85 100644 --- a/via.go +++ b/via.go @@ -331,15 +331,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) { if c.sseConnected.Load() { continue } - var disconnectedFor time.Duration - if dc := c.sseDisconnectedAt.Load(); dc != nil { - disconnectedFor = now.Sub(*dc) - } else { - disconnectedFor = now.Sub(c.createdAt) + // Use the most recent liveness signal + lastAlive := c.createdAt + if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) { + lastAlive = *dc } - if disconnectedFor > ttl { + if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) { + lastAlive = *seen + } + silentFor := now.Sub(lastAlive) + if silentFor > ttl { toReap = append(toReap, c) - } else if disconnectedFor > suspendAfter && !c.suspended.Load() { + } else if silentFor > suspendAfter && !c.suspended.Load() { toSuspend = append(toSuspend, c) } } @@ -655,6 +658,8 @@ func New() *V { return } c.reqCtx = r.Context() + now := time.Now() + c.lastSeenAt.Store(&now) sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5)))) @@ -688,17 +693,22 @@ func New() *V { go c.Sync() + keepalive := time.NewTicker(30 * time.Second) + defer keepalive.Stop() + for { select { case <-sse.Context().Done(): v.logDebug(c, "SSE connection ended") c.sseConnected.Store(false) - now := time.Now() - c.sseDisconnectedAt.Store(&now) + dcNow := time.Now() + c.sseDisconnectedAt.Store(&dcNow) return case <-c.ctxDisposedChan: v.logDebug(c, "context disposed, closing SSE") return + case <-keepalive.C: + sse.PatchSignals([]byte("{}")) case patch := <-c.patchChan: switch patch.typ { case patchTypeElements: diff --git a/via_test.go b/via_test.go index aa0c52e..f485344 100644 --- a/via_test.go +++ b/via_test.go @@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) { func TestReaperSuspendsContext(t *testing.T) { v := New() c := newContext("suspend-1", "/", v) + c.createdAt = time.Now().Add(-30 * time.Minute) dc := time.Now().Add(-20 * time.Minute) c.sseDisconnectedAt.Store(&dc) v.registerCtx(c) @@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) { func TestReaperReapsAfterTTL(t *testing.T) { v := New() c := newContext("reap-1", "/", v) + c.createdAt = time.Now().Add(-3 * time.Hour) dc := time.Now().Add(-2 * time.Hour) c.sseDisconnectedAt.Store(&dc) c.suspended.Store(true) @@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) { func TestReaperIgnoresAlreadySuspended(t *testing.T) { v := New() c := newContext("already-sus-1", "/", v) + c.createdAt = time.Now().Add(-30 * time.Minute) dc := time.Now().Add(-20 * time.Minute) c.sseDisconnectedAt.Store(&dc) c.suspended.Store(true) @@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) { assert.Error(t, err, "context should be removed after cleanup") } +func TestReaperRespectsLastSeenAt(t *testing.T) { + v := New() + c := newContext("seen-1", "/", v) + c.createdAt = time.Now().Add(-30 * time.Minute) + // Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago + dc := time.Now().Add(-20 * time.Minute) + c.sseDisconnectedAt.Store(&dc) + seen := time.Now().Add(-2 * time.Minute) + c.lastSeenAt.Store(&seen) + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + _, err := v.getCtx("seen-1") + assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold") + assert.False(t, c.suspended.Load(), "context should not be suspended") +} + +func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) { + v := New() + c := newContext("noseen-1", "/", v) + c.createdAt = time.Now().Add(-30 * time.Minute) + dc := time.Now().Add(-20 * time.Minute) + c.sseDisconnectedAt.Store(&dc) + // no lastSeenAt set — should fall back to sseDisconnectedAt + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + got, err := v.getCtx("noseen-1") + assert.NoError(t, err, "context should still be in registry (suspended, not reaped)") + assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback") +} + +func TestReaperReapsWithStaleLastSeenAt(t *testing.T) { + v := New() + c := newContext("stale-seen-1", "/", v) + c.createdAt = time.Now().Add(-3 * time.Hour) + dc := time.Now().Add(-2 * time.Hour) + c.sseDisconnectedAt.Store(&dc) + // lastSeenAt is also old — beyond TTL + seen := time.Now().Add(-90 * time.Minute) + c.lastSeenAt.Store(&seen) + c.suspended.Store(true) + v.registerCtx(c) + + v.reapOrphanedContexts(15*time.Minute, time.Hour) + + _, err := v.getCtx("stale-seen-1") + assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped") +} + +func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) { + v := New() + c := newContext("seen-sse-1", "/", v) + v.registerCtx(c) + + assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect") + + // Simulate what the SSE handler does after getCtx + now := time.Now() + c.lastSeenAt.Store(&now) + + got := c.lastSeenAt.Load() + assert.NotNil(t, got, "lastSeenAt should be set after SSE connect") + assert.WithinDuration(t, now, *got, time.Second) +} + func TestDevModeRemovePersistedFix(t *testing.T) { v := New() v.cfg.DevMode = true