diff --git a/context.go b/context.go index 7e1b554..eb6d5e4 100644 --- a/context.go +++ b/context.go @@ -39,8 +39,9 @@ type Context struct { subscriptions []Subscription subsMu sync.Mutex disposeOnce sync.Once - createdAt time.Time - sseConnected atomic.Bool + createdAt time.Time + sseConnected atomic.Bool + sseDisconnectedAt atomic.Pointer[time.Time] } // View defines the UI rendered by this context. diff --git a/via.go b/via.go index 5b5668a..03098fe 100644 --- a/via.go +++ b/via.go @@ -318,8 +318,19 @@ func (v *V) reapOrphanedContexts(ttl time.Duration) { v.contextRegistryMutex.RLock() var orphans []*Context for _, c := range v.contextRegistry { - if !c.sseConnected.Load() && now.Sub(c.createdAt) > ttl { - orphans = append(orphans, c) + if c.sseConnected.Load() { + continue + } + if dc := c.sseDisconnectedAt.Load(); dc != nil { + // SSE was connected then dropped — reap if gone too long + if now.Sub(*dc) > ttl { + orphans = append(orphans, c) + } + } else { + // SSE never connected — reap based on creation time + if now.Sub(c.createdAt) > ttl { + orphans = append(orphans, c) + } } } v.contextRegistryMutex.RUnlock() @@ -624,7 +635,19 @@ func New() *V { // use last-event-id to tell if request is a sse reconnect sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via")) + // Drain stale patches on reconnect so the client gets fresh state + if c.sseDisconnectedAt.Load() != nil { + for { + select { + case <-c.patchChan: + default: + goto drained + } + } + drained: + } c.sseConnected.Store(true) + c.sseDisconnectedAt.Store(nil) v.logDebug(c, "SSE connection established") go c.Sync() @@ -633,7 +656,9 @@ func New() *V { select { case <-sse.Context().Done(): v.logDebug(c, "SSE connection ended") - v.cleanupCtx(c) + c.sseConnected.Store(false) + now := time.Now() + c.sseDisconnectedAt.Store(&now) return case <-c.ctxDisposedChan: v.logDebug(c, "context disposed, closing SSE")