From a71d6f0960767b650f5c0216082d450a6fc32f96 Mon Sep 17 00:00:00 2001 From: Joao Goncalves Date: Tue, 25 Nov 2025 22:56:21 -0100 Subject: [PATCH] feat: introduce via routine; update realtime chart example --- context.go | 18 ++++++++------- routine.go | 19 +++++++++++----- signal_test.go | 59 +++++++++++++++++++++++++++----------------------- via.go | 6 ++--- 4 files changed, 58 insertions(+), 44 deletions(-) diff --git a/context.go b/context.go index 4ce73d2..01b30fc 100644 --- a/context.go +++ b/context.go @@ -171,8 +171,9 @@ func (c *Context) Signal(v any) *signal { changed: true, } - // components register signals on parent page - if c.isComponent() { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.isComponent() { // components register signals on parent page c.parentPageCtx.signals.Store(sigID, sig) } else { c.signals.Store(sigID, sig) @@ -240,11 +241,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any { // is dropped to prevent runtime blocks. func (c *Context) sendPatch(p patch) { patchChan := c.getPatchChan() - if patchChan == nil { - c.app.logWarn(c, "view out of sync: sse stream closed") - } select { - case patchChan <- p: //queue patch + case patchChan <- p: default: // closed or buffer full - drop patch without blocking c.app.logWarn(c, "view out of sync: sse stream closed or queue is full") } @@ -318,9 +316,13 @@ func (c *Context) ExecScript(s string) { c.sendPatch(patch{patchTypeScript, s}) } -// stopAllRoutines safely stops all go routines tied to this Context. Prevents goroutine leakage. +// stopAllRoutines stops all go routines tied to this Context preventing goroutine leaks. func (c *Context) stopAllRoutines() { - close(c.ctxDisposedChan) + select { + case c.ctxDisposedChan <- struct{}{}: + default: + } + } func newContext(id string, route string, v *V) *Context { diff --git a/routine.go b/routine.go index 5aaf7c7..d0276db 100644 --- a/routine.go +++ b/routine.go @@ -9,13 +9,13 @@ import ( // Routine allows for defining concurrent goroutines safely. Goroutines started by *Routine // are tied to the *Context lifecycle. type Routine struct { - mu sync.Mutex + mu sync.RWMutex ctxDisposed chan struct{} localInterrupt chan struct{} isRunning atomic.Bool routineFn func() tckDuration time.Duration - tkr *time.Ticker + updateTkrChan chan time.Duration } // OnInterval starts a go routine that sets a time.Ticker with the given duration and executes @@ -27,15 +27,19 @@ func (r *Routine) OnInterval(d time.Duration, fn func()) { } r.tckDuration = d r.routineFn = func() { - r.tkr = time.NewTicker(r.tckDuration) - defer r.tkr.Stop() // clean up the ticker when routine stops + r.mu.RLock() + tkr := time.NewTicker(r.tckDuration) + r.mu.RUnlock() + defer tkr.Stop() // clean up the ticker when routine stops for { select { case <-r.ctxDisposed: // dispose of the routine when ctx is disposed return case <-r.localInterrupt: // dispose of the routine on interrupt signal return - case <-r.tkr.C: + case d := <-r.updateTkrChan: + tkr.Reset(d) + case <-tkr.C: fn() } } @@ -45,8 +49,10 @@ func (r *Routine) OnInterval(d time.Duration, fn func()) { // UpdateInterval sets a new interval duration for the internal *time.Ticker. If the provided // duration is equal of less than 0, UpdateInterval does nothing. func (r *Routine) UpdateInterval(d time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() r.tckDuration = d - r.tkr.Reset(d) + r.updateTkrChan <- d } @@ -72,5 +78,6 @@ func newRoutine(ctxDisposedChan chan struct{}) *Routine { return &Routine{ ctxDisposed: ctxDisposedChan, localInterrupt: make(chan struct{}), + updateTkrChan: make(chan time.Duration), } } diff --git a/signal_test.go b/signal_test.go index 6abec3a..3cdfa8e 100644 --- a/signal_test.go +++ b/signal_test.go @@ -10,55 +10,60 @@ import ( func TestSignalReturnAsString(t *testing.T) { testcases := []struct { + desc string given any expected string }{ - {"test", "test"}, - {"another", "another"}, - {1, "1"}, - {-99, "-99"}, - {1.1, "1.1"}, - {-34.345, "-34.345"}, - {true, "true"}, - {false, "false"}, + {"string", "test", "test"}, + {"other string", "another", "another"}, + {"int", 1, "1"}, + {"negative int", -99, "-99"}, + {"float", 1.1, "1.1"}, + {"negative float", -34.345, "-34.345"}, + {"positive bool", true, "true"}, + {"negative bool", false, "false"}, } for _, testcase := range testcases { - var sig *signal - v := New() - v.Page("/", func(c *Context) { - c.View(func() h.H { return nil }) - sig = c.Signal(testcase.given) + t.Run(testcase.desc, func(t *testing.T) { + t.Parallel() + var sig *signal + v := New() + v.Page("/", func(c *Context) { + sig = c.Signal(testcase.given) + c.View(func() h.H { return h.Div() }) + }) + assert.Equal(t, testcase.expected, sig.String()) }) - assert.Equal(t, testcase.expected, sig.String()) - } } func TestSignalReturnAsStringComplexTypes(t *testing.T) { testcases := []struct { + desc string given any expected string }{ - {[]string{"test"}, `["test"]`}, - {[]int{1, 2}, "[1, 2]"}, - {struct{ Val string }{"test"}, `{"Val": "test"}`}, - {struct { + {"string slice", []string{"test"}, `["test"]`}, + {"int slice", []int{1, 2}, "[1, 2]"}, + {"struct1", struct{ Val string }{"test"}, `{"Val": "test"}`}, + {"struct2", struct { Num int IsPositive bool }{1, true}, `{"Num": 1, "IsPositive": true}`}, } for _, testcase := range testcases { - var sig *signal - v := New() - v.Page("/", func(c *Context) { - c.View(func() h.H { return nil }) - sig = c.Signal(testcase.given) + t.Run(testcase.desc, func(t *testing.T) { + t.Parallel() + var sig *signal + v := New() + v.Page("/", func(c *Context) { + c.View(func() h.H { return nil }) + sig = c.Signal(testcase.given) + }) + assert.JSONEq(t, testcase.expected, sig.String()) }) - - assert.JSONEq(t, testcase.expected, sig.String()) - } } diff --git a/via.go b/via.go index 10324b3..2e2a9b5 100644 --- a/via.go +++ b/via.go @@ -207,14 +207,13 @@ func (v *V) currSessionNum() int { } func (v *V) unregisterCtx(c *Context) { - v.contextRegistryMutex.Lock() - defer v.contextRegistryMutex.Unlock() if c.id == "" { v.logErr(c, "unregister ctx failed: ctx contains empty id") return } + v.contextRegistryMutex.Lock() + defer v.contextRegistryMutex.Unlock() v.logDebug(c, "ctx removed from registry") - c.stopAllRoutines() delete(v.contextRegistry, c.id) v.logDebug(nil, "number of sessions in registry: %d", v.currSessionNum()) } @@ -463,6 +462,7 @@ func New() *V { defer r.Body.Close() cID := string(body) c, err := v.getCtx(cID) + c.stopAllRoutines() if err != nil { v.logErr(c, "failed to handle session close: %v", err) return