1 Commits

Author SHA1 Message Date
Ryan Hamamura
58ad9a2699 feat: add SSE keepalive and liveness tracking for resilient connections
Some checks failed
CI / Build and Test (push) Has been cancelled
Add 30s keepalive pings to prevent proxy/CDN idle timeouts from killing
SSE connections silently. Track lastSeenAt on each SSE connect attempt
so Datastar's retry signals keep contexts alive through the reaper.
2026-02-19 12:07:25 -10:00
3 changed files with 91 additions and 9 deletions

View File

@@ -42,6 +42,7 @@ type Context struct {
createdAt time.Time createdAt time.Time
sseConnected atomic.Bool sseConnected atomic.Bool
sseDisconnectedAt atomic.Pointer[time.Time] sseDisconnectedAt atomic.Pointer[time.Time]
lastSeenAt atomic.Pointer[time.Time]
suspended atomic.Bool suspended atomic.Bool
} }

28
via.go
View File

@@ -331,15 +331,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
if c.sseConnected.Load() { if c.sseConnected.Load() {
continue continue
} }
var disconnectedFor time.Duration // Use the most recent liveness signal
if dc := c.sseDisconnectedAt.Load(); dc != nil { lastAlive := c.createdAt
disconnectedFor = now.Sub(*dc) if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
} else { lastAlive = *dc
disconnectedFor = now.Sub(c.createdAt)
} }
if disconnectedFor > ttl { if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
lastAlive = *seen
}
silentFor := now.Sub(lastAlive)
if silentFor > ttl {
toReap = append(toReap, c) toReap = append(toReap, c)
} else if disconnectedFor > suspendAfter && !c.suspended.Load() { } else if silentFor > suspendAfter && !c.suspended.Load() {
toSuspend = append(toSuspend, c) toSuspend = append(toSuspend, c)
} }
} }
@@ -655,6 +658,8 @@ func New() *V {
return return
} }
c.reqCtx = r.Context() c.reqCtx = r.Context()
now := time.Now()
c.lastSeenAt.Store(&now)
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5)))) sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
@@ -688,17 +693,22 @@ func New() *V {
go c.Sync() go c.Sync()
keepalive := time.NewTicker(30 * time.Second)
defer keepalive.Stop()
for { for {
select { select {
case <-sse.Context().Done(): case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended") v.logDebug(c, "SSE connection ended")
c.sseConnected.Store(false) c.sseConnected.Store(false)
now := time.Now() dcNow := time.Now()
c.sseDisconnectedAt.Store(&now) c.sseDisconnectedAt.Store(&dcNow)
return return
case <-c.ctxDisposedChan: case <-c.ctxDisposedChan:
v.logDebug(c, "context disposed, closing SSE") v.logDebug(c, "context disposed, closing SSE")
return return
case <-keepalive.C:
sse.PatchSignals([]byte("{}"))
case patch := <-c.patchChan: case patch := <-c.patchChan:
switch patch.typ { switch patch.typ {
case patchTypeElements: case patchTypeElements:

View File

@@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
func TestReaperSuspendsContext(t *testing.T) { func TestReaperSuspendsContext(t *testing.T) {
v := New() v := New()
c := newContext("suspend-1", "/", v) c := newContext("suspend-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
v.registerCtx(c) v.registerCtx(c)
@@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
func TestReaperReapsAfterTTL(t *testing.T) { func TestReaperReapsAfterTTL(t *testing.T) {
v := New() v := New()
c := newContext("reap-1", "/", v) c := newContext("reap-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour) dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
func TestReaperIgnoresAlreadySuspended(t *testing.T) { func TestReaperIgnoresAlreadySuspended(t *testing.T) {
v := New() v := New()
c := newContext("already-sus-1", "/", v) c := newContext("already-sus-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
assert.Error(t, err, "context should be removed after cleanup") assert.Error(t, err, "context should be removed after cleanup")
} }
func TestReaperRespectsLastSeenAt(t *testing.T) {
v := New()
c := newContext("seen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
seen := time.Now().Add(-2 * time.Minute)
c.lastSeenAt.Store(&seen)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("seen-1")
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
assert.False(t, c.suspended.Load(), "context should not be suspended")
}
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
v := New()
c := newContext("noseen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
// no lastSeenAt set — should fall back to sseDisconnectedAt
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
got, err := v.getCtx("noseen-1")
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
}
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
v := New()
c := newContext("stale-seen-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc)
// lastSeenAt is also old — beyond TTL
seen := time.Now().Add(-90 * time.Minute)
c.lastSeenAt.Store(&seen)
c.suspended.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("stale-seen-1")
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
}
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
v := New()
c := newContext("seen-sse-1", "/", v)
v.registerCtx(c)
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
// Simulate what the SSE handler does after getCtx
now := time.Now()
c.lastSeenAt.Store(&now)
got := c.lastSeenAt.Load()
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
assert.WithinDuration(t, now, *got, time.Second)
}
func TestDevModeRemovePersistedFix(t *testing.T) { func TestDevModeRemovePersistedFix(t *testing.T) {
v := New() v := New()
v.cfg.DevMode = true v.cfg.DevMode = true