fix: remove context reaper to prevent background tabs from going stale (#4)
Some checks failed
CI / Build and Test (push) Failing after 35s
Some checks failed
CI / Build and Test (push) Failing after 35s
This commit was merged in pull request #4.
This commit is contained in:
93
via.go
93
via.go
@@ -58,7 +58,6 @@ type V struct {
|
||||
datastarPath string
|
||||
datastarContent []byte
|
||||
datastarOnce sync.Once
|
||||
reaperStop chan struct{}
|
||||
middleware []Middleware
|
||||
layout func(func() h.H) h.H
|
||||
}
|
||||
@@ -139,12 +138,6 @@ func (v *V) Config(cfg Options) {
|
||||
v.defaultNATS = nil
|
||||
v.pubsub = cfg.PubSub
|
||||
}
|
||||
if cfg.ContextSuspendAfter != 0 {
|
||||
v.cfg.ContextSuspendAfter = cfg.ContextSuspendAfter
|
||||
}
|
||||
if cfg.ContextTTL != 0 {
|
||||
v.cfg.ContextTTL = cfg.ContextTTL
|
||||
}
|
||||
if cfg.Streams != nil {
|
||||
v.cfg.Streams = cfg.Streams
|
||||
}
|
||||
@@ -292,75 +285,6 @@ func (v *V) getCtx(id string) (*Context, error) {
|
||||
return nil, fmt.Errorf("ctx '%s' not found", id)
|
||||
}
|
||||
|
||||
func (v *V) startReaper() {
|
||||
ttl := v.cfg.ContextTTL
|
||||
if ttl < 0 {
|
||||
return
|
||||
}
|
||||
if ttl == 0 {
|
||||
ttl = time.Hour
|
||||
}
|
||||
suspendAfter := v.cfg.ContextSuspendAfter
|
||||
if suspendAfter == 0 {
|
||||
suspendAfter = 15 * time.Minute
|
||||
}
|
||||
if suspendAfter > ttl {
|
||||
suspendAfter = ttl
|
||||
}
|
||||
interval := suspendAfter / 3
|
||||
if interval < 5*time.Second {
|
||||
interval = 5 * time.Second
|
||||
}
|
||||
v.reaperStop = make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-v.reaperStop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
v.reapOrphanedContexts(suspendAfter, ttl)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
||||
now := time.Now()
|
||||
v.contextRegistryMutex.RLock()
|
||||
var toSuspend, toReap []*Context
|
||||
for _, c := range v.contextRegistry {
|
||||
if c.sseConnected.Load() {
|
||||
continue
|
||||
}
|
||||
// Use the most recent liveness signal
|
||||
lastAlive := c.createdAt
|
||||
if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
|
||||
lastAlive = *dc
|
||||
}
|
||||
if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
|
||||
lastAlive = *seen
|
||||
}
|
||||
silentFor := now.Sub(lastAlive)
|
||||
if silentFor > ttl {
|
||||
toReap = append(toReap, c)
|
||||
} else if silentFor > suspendAfter && !c.suspended.Load() {
|
||||
toSuspend = append(toSuspend, c)
|
||||
}
|
||||
}
|
||||
v.contextRegistryMutex.RUnlock()
|
||||
|
||||
for _, c := range toSuspend {
|
||||
v.logInfo(c, "suspending context (no SSE connection after %s)", suspendAfter)
|
||||
c.suspend()
|
||||
}
|
||||
for _, c := range toReap {
|
||||
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
|
||||
v.cleanupCtx(c)
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
||||
// signal is received, then performs a graceful shutdown.
|
||||
func (v *V) Start() {
|
||||
@@ -389,8 +313,6 @@ func (v *V) Start() {
|
||||
Handler: handler,
|
||||
}
|
||||
|
||||
v.startReaper()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- v.server.ListenAndServe()
|
||||
@@ -417,9 +339,6 @@ func (v *V) Start() {
|
||||
// Shutdown gracefully shuts down the server and all contexts.
|
||||
// Safe for programmatic or test use.
|
||||
func (v *V) Shutdown() {
|
||||
if v.reaperStop != nil {
|
||||
close(v.reaperStop)
|
||||
}
|
||||
v.logInfo(nil, "draining all contexts")
|
||||
v.drainAllContexts()
|
||||
|
||||
@@ -667,8 +586,6 @@ func New() *V {
|
||||
return
|
||||
}
|
||||
c.reqCtx = r.Context()
|
||||
now := time.Now()
|
||||
c.lastSeenAt.Store(&now)
|
||||
|
||||
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
|
||||
|
||||
@@ -690,16 +607,6 @@ func New() *V {
|
||||
c.sseDisconnectedAt.Store(nil)
|
||||
v.logDebug(c, "SSE connection established")
|
||||
|
||||
if c.suspended.Load() {
|
||||
c.navMu.Lock()
|
||||
c.suspended.Store(false)
|
||||
if initFn := v.pageRegistry[c.route]; initFn != nil {
|
||||
v.logInfo(c, "resuming suspended context")
|
||||
initFn(c)
|
||||
}
|
||||
c.navMu.Unlock()
|
||||
}
|
||||
|
||||
go c.Sync()
|
||||
|
||||
keepalive := time.NewTicker(30 * time.Second)
|
||||
|
||||
Reference in New Issue
Block a user