fix: clean up leaked contexts on SSE disconnect and add orphan reaper

When clients disconnect without beforeunload firing (network drops,
mobile kills, crashes), contexts leaked in the registry permanently.

- Extract cleanupCtx helper for dispose/unregister sequence
- Call cleanupCtx on SSE disconnect (sse.Context().Done())
- Add background reaper for contexts where SSE never connected
- Add ContextTTL config option (default 30s, negative disables)
- Fix inverted condition in devModeRemovePersisted
This commit is contained in:
Ryan Hamamura
2026-02-06 10:34:28 -10:00
parent 2c44671d0e
commit 6dcd54c88b
4 changed files with 184 additions and 23 deletions

View File

@@ -1,6 +1,8 @@
package via package via
import ( import (
"time"
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
@@ -54,4 +56,9 @@ type Options struct {
// PubSub enables publish/subscribe messaging. Use vianats.New() for an // PubSub enables publish/subscribe messaging. Use vianats.New() for an
// embedded NATS backend, or supply any PubSub implementation. // embedded NATS backend, or supply any PubSub implementation.
PubSub PubSub PubSub PubSub
// ContextTTL is the maximum time a context may exist without an SSE
// connection before the background reaper disposes it.
// Default: 30s. Negative value disables the reaper.
ContextTTL time.Duration
} }

View File

@@ -8,6 +8,7 @@ import (
"maps" "maps"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
@@ -33,6 +34,8 @@ type Context struct {
subscriptions []Subscription subscriptions []Subscription
subsMu sync.Mutex subsMu sync.Mutex
disposeOnce sync.Once disposeOnce sync.Once
createdAt time.Time
sseConnected atomic.Bool
} }
// View defines the UI rendered by this context. // View defines the UI rendered by this context.
@@ -481,5 +484,6 @@ func newContext(id string, route string, v *V) *Context {
signals: new(sync.Map), signals: new(sync.Map),
patchChan: make(chan patch, 1), patchChan: make(chan patch, 1),
ctxDisposedChan: make(chan struct{}, 1), ctxDisposedChan: make(chan struct{}, 1),
createdAt: time.Now(),
} }
} }

72
via.go
View File

@@ -50,6 +50,7 @@ type V struct {
datastarPath string datastarPath string
datastarContent []byte datastarContent []byte
datastarOnce sync.Once datastarOnce sync.Once
reaperStop chan struct{}
} }
func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event { func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event {
@@ -127,6 +128,9 @@ func (v *V) Config(cfg Options) {
if cfg.PubSub != nil { if cfg.PubSub != nil {
v.pubsub = cfg.PubSub v.pubsub = cfg.PubSub
} }
if cfg.ContextTTL != 0 {
v.cfg.ContextTTL = cfg.ContextTTL
}
} }
// AppendToHead appends the given h.H nodes to the head of the base HTML document. // AppendToHead appends the given h.H nodes to the head of the base HTML document.
@@ -238,6 +242,14 @@ func (v *V) currSessionNum() int {
return len(v.contextRegistry) return len(v.contextRegistry)
} }
func (v *V) cleanupCtx(c *Context) {
c.dispose()
if v.cfg.DevMode {
v.devModeRemovePersisted(c)
}
v.unregisterCtx(c)
}
func (v *V) unregisterCtx(c *Context) { func (v *V) unregisterCtx(c *Context) {
if c.id == "" { if c.id == "" {
v.logErr(c, "unregister ctx failed: ctx contains empty id") v.logErr(c, "unregister ctx failed: ctx contains empty id")
@@ -259,6 +271,50 @@ func (v *V) getCtx(id string) (*Context, error) {
return nil, fmt.Errorf("ctx '%s' not found", id) return nil, fmt.Errorf("ctx '%s' not found", id)
} }
func (v *V) startReaper() {
ttl := v.cfg.ContextTTL
if ttl < 0 {
return
}
if ttl == 0 {
ttl = 30 * time.Second
}
interval := ttl / 3
if interval < 5*time.Second {
interval = 5 * time.Second
}
v.reaperStop = make(chan struct{})
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-v.reaperStop:
return
case <-ticker.C:
v.reapOrphanedContexts(ttl)
}
}
}()
}
func (v *V) reapOrphanedContexts(ttl time.Duration) {
now := time.Now()
v.contextRegistryMutex.RLock()
var orphans []*Context
for _, c := range v.contextRegistry {
if !c.sseConnected.Load() && now.Sub(c.createdAt) > ttl {
orphans = append(orphans, c)
}
}
v.contextRegistryMutex.RUnlock()
for _, c := range orphans {
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
v.cleanupCtx(c)
}
}
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM // Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
// signal is received, then performs a graceful shutdown. // signal is received, then performs a graceful shutdown.
func (v *V) Start() { func (v *V) Start() {
@@ -271,6 +327,8 @@ func (v *V) Start() {
Handler: handler, Handler: handler,
} }
v.startReaper()
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
errCh <- v.server.ListenAndServe() errCh <- v.server.ListenAndServe()
@@ -301,6 +359,9 @@ func (v *V) Shutdown() {
} }
func (v *V) shutdown() { func (v *V) shutdown() {
if v.reaperStop != nil {
close(v.reaperStop)
}
v.logInfo(nil, "draining all contexts") v.logInfo(nil, "draining all contexts")
v.drainAllContexts() v.drainAllContexts()
@@ -400,10 +461,7 @@ func (v *V) devModeRemovePersisted(c *Context) {
} }
file.Close() file.Close()
// remove ctx to persisted list
if _, ok := ctxRegMap[c.id]; !ok {
delete(ctxRegMap, c.id) delete(ctxRegMap, c.id)
}
// write persisted list to file // write persisted list to file
file, err = os.Create(p) file, err = os.Create(p)
@@ -507,6 +565,7 @@ func New() *V {
// use last-event-id to tell if request is a sse reconnect // use last-event-id to tell if request is a sse reconnect
sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via")) sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via"))
c.sseConnected.Store(true)
v.logDebug(c, "SSE connection established") v.logDebug(c, "SSE connection established")
go func() { go func() {
@@ -517,6 +576,7 @@ func New() *V {
select { select {
case <-sse.Context().Done(): case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended") v.logDebug(c, "SSE connection ended")
v.cleanupCtx(c)
return return
case <-c.ctxDisposedChan: case <-c.ctxDisposedChan:
v.logDebug(c, "context disposed, closing SSE") v.logDebug(c, "context disposed, closing SSE")
@@ -603,12 +663,8 @@ func New() *V {
v.logErr(c, "failed to handle session close: %v", err) v.logErr(c, "failed to handle session close: %v", err)
return return
} }
c.dispose()
v.logDebug(c, "session close event triggered") v.logDebug(c, "session close event triggered")
if v.cfg.DevMode { v.cleanupCtx(c)
v.devModeRemovePersisted(c)
}
v.unregisterCtx(c)
}) })
return v return v
} }

View File

@@ -1,9 +1,13 @@
package via package via
import ( import (
"encoding/json"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"path/filepath"
"testing" "testing"
"time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -235,3 +239,93 @@ func TestPage_PanicsOnNoView(t *testing.T) {
v.Page("/", func(c *Context) {}) v.Page("/", func(c *Context) {})
}) })
} }
func TestReaperCleansOrphanedContexts(t *testing.T) {
v := New()
c := newContext("orphan-1", "/", v)
c.createdAt = time.Now().Add(-time.Minute) // created 1 min ago
v.registerCtx(c)
_, err := v.getCtx("orphan-1")
assert.NoError(t, err)
v.reapOrphanedContexts(10 * time.Second)
_, err = v.getCtx("orphan-1")
assert.Error(t, err, "orphaned context should have been reaped")
}
func TestReaperIgnoresConnectedContexts(t *testing.T) {
v := New()
c := newContext("connected-1", "/", v)
c.createdAt = time.Now().Add(-time.Minute)
c.sseConnected.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(10 * time.Second)
_, err := v.getCtx("connected-1")
assert.NoError(t, err, "connected context should survive reaping")
}
func TestReaperDisabledWithNegativeTTL(t *testing.T) {
v := New()
v.cfg.ContextTTL = -1
v.startReaper()
assert.Nil(t, v.reaperStop, "reaper should not start with negative TTL")
}
func TestCleanupCtxIdempotent(t *testing.T) {
v := New()
c := newContext("idempotent-1", "/", v)
v.registerCtx(c)
assert.NotPanics(t, func() {
v.cleanupCtx(c)
v.cleanupCtx(c)
})
_, err := v.getCtx("idempotent-1")
assert.Error(t, err, "context should be removed after cleanup")
}
func TestDevModeRemovePersistedFix(t *testing.T) {
v := New()
v.cfg.DevMode = true
dir := filepath.Join(t.TempDir(), ".via", "devmode")
p := filepath.Join(dir, "ctx.json")
assert.NoError(t, os.MkdirAll(dir, 0755))
// Write a persisted context
ctxRegMap := map[string]string{"test-ctx-1": "/"}
f, err := os.Create(p)
assert.NoError(t, err)
assert.NoError(t, json.NewEncoder(f).Encode(ctxRegMap))
f.Close()
// Patch devModeRemovePersisted to use our temp path by calling it
// directly — we need to override the path. Instead, test via the
// actual function by temporarily changing the working dir.
origDir, _ := os.Getwd()
assert.NoError(t, os.Chdir(t.TempDir()))
defer os.Chdir(origDir)
// Re-create the structure in the temp dir
assert.NoError(t, os.MkdirAll(filepath.Join(".via", "devmode"), 0755))
p2 := filepath.Join(".via", "devmode", "ctx.json")
f2, _ := os.Create(p2)
json.NewEncoder(f2).Encode(map[string]string{"test-ctx-1": "/"})
f2.Close()
c := newContext("test-ctx-1", "/", v)
v.devModeRemovePersisted(c)
// Read back and verify
f3, err := os.Open(p2)
assert.NoError(t, err)
defer f3.Close()
var result map[string]string
assert.NoError(t, json.NewDecoder(f3).Decode(&result))
assert.Empty(t, result, "persisted context should be removed")
}