feat: introduce via routine; update realtime chart example

This commit is contained in:
Joao Goncalves
2025-11-25 22:56:21 -01:00
parent 6edace647e
commit a71d6f0960
4 changed files with 58 additions and 44 deletions

View File

@@ -171,8 +171,9 @@ func (c *Context) Signal(v any) *signal {
changed: true, changed: true,
} }
// components register signals on parent page c.mutex.Lock()
if c.isComponent() { defer c.mutex.Unlock()
if c.isComponent() { // components register signals on parent page
c.parentPageCtx.signals.Store(sigID, sig) c.parentPageCtx.signals.Store(sigID, sig)
} else { } else {
c.signals.Store(sigID, sig) c.signals.Store(sigID, sig)
@@ -240,11 +241,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
// is dropped to prevent runtime blocks. // is dropped to prevent runtime blocks.
func (c *Context) sendPatch(p patch) { func (c *Context) sendPatch(p patch) {
patchChan := c.getPatchChan() patchChan := c.getPatchChan()
if patchChan == nil {
c.app.logWarn(c, "view out of sync: sse stream closed")
}
select { select {
case patchChan <- p: //queue patch case patchChan <- p:
default: // closed or buffer full - drop patch without blocking default: // closed or buffer full - drop patch without blocking
c.app.logWarn(c, "view out of sync: sse stream closed or queue is full") c.app.logWarn(c, "view out of sync: sse stream closed or queue is full")
} }
@@ -318,9 +316,13 @@ func (c *Context) ExecScript(s string) {
c.sendPatch(patch{patchTypeScript, s}) c.sendPatch(patch{patchTypeScript, s})
} }
// stopAllRoutines safely stops all go routines tied to this Context. Prevents goroutine leakage. // stopAllRoutines stops all go routines tied to this Context preventing goroutine leaks.
func (c *Context) stopAllRoutines() { func (c *Context) stopAllRoutines() {
close(c.ctxDisposedChan) select {
case c.ctxDisposedChan <- struct{}{}:
default:
}
} }
func newContext(id string, route string, v *V) *Context { func newContext(id string, route string, v *V) *Context {

View File

@@ -9,13 +9,13 @@ import (
// Routine allows for defining concurrent goroutines safely. Goroutines started by *Routine // Routine allows for defining concurrent goroutines safely. Goroutines started by *Routine
// are tied to the *Context lifecycle. // are tied to the *Context lifecycle.
type Routine struct { type Routine struct {
mu sync.Mutex mu sync.RWMutex
ctxDisposed chan struct{} ctxDisposed chan struct{}
localInterrupt chan struct{} localInterrupt chan struct{}
isRunning atomic.Bool isRunning atomic.Bool
routineFn func() routineFn func()
tckDuration time.Duration tckDuration time.Duration
tkr *time.Ticker updateTkrChan chan time.Duration
} }
// OnInterval starts a go routine that sets a time.Ticker with the given duration and executes // OnInterval starts a go routine that sets a time.Ticker with the given duration and executes
@@ -27,15 +27,19 @@ func (r *Routine) OnInterval(d time.Duration, fn func()) {
} }
r.tckDuration = d r.tckDuration = d
r.routineFn = func() { r.routineFn = func() {
r.tkr = time.NewTicker(r.tckDuration) r.mu.RLock()
defer r.tkr.Stop() // clean up the ticker when routine stops tkr := time.NewTicker(r.tckDuration)
r.mu.RUnlock()
defer tkr.Stop() // clean up the ticker when routine stops
for { for {
select { select {
case <-r.ctxDisposed: // dispose of the routine when ctx is disposed case <-r.ctxDisposed: // dispose of the routine when ctx is disposed
return return
case <-r.localInterrupt: // dispose of the routine on interrupt signal case <-r.localInterrupt: // dispose of the routine on interrupt signal
return return
case <-r.tkr.C: case d := <-r.updateTkrChan:
tkr.Reset(d)
case <-tkr.C:
fn() fn()
} }
} }
@@ -45,8 +49,10 @@ func (r *Routine) OnInterval(d time.Duration, fn func()) {
// UpdateInterval sets a new interval duration for the internal *time.Ticker. If the provided // UpdateInterval sets a new interval duration for the internal *time.Ticker. If the provided
// duration is equal of less than 0, UpdateInterval does nothing. // duration is equal of less than 0, UpdateInterval does nothing.
func (r *Routine) UpdateInterval(d time.Duration) { func (r *Routine) UpdateInterval(d time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()
r.tckDuration = d r.tckDuration = d
r.tkr.Reset(d) r.updateTkrChan <- d
} }
@@ -72,5 +78,6 @@ func newRoutine(ctxDisposedChan chan struct{}) *Routine {
return &Routine{ return &Routine{
ctxDisposed: ctxDisposedChan, ctxDisposed: ctxDisposedChan,
localInterrupt: make(chan struct{}), localInterrupt: make(chan struct{}),
updateTkrChan: make(chan time.Duration),
} }
} }

View File

@@ -10,55 +10,60 @@ import (
func TestSignalReturnAsString(t *testing.T) { func TestSignalReturnAsString(t *testing.T) {
testcases := []struct { testcases := []struct {
desc string
given any given any
expected string expected string
}{ }{
{"test", "test"}, {"string", "test", "test"},
{"another", "another"}, {"other string", "another", "another"},
{1, "1"}, {"int", 1, "1"},
{-99, "-99"}, {"negative int", -99, "-99"},
{1.1, "1.1"}, {"float", 1.1, "1.1"},
{-34.345, "-34.345"}, {"negative float", -34.345, "-34.345"},
{true, "true"}, {"positive bool", true, "true"},
{false, "false"}, {"negative bool", false, "false"},
} }
for _, testcase := range testcases { for _, testcase := range testcases {
t.Run(testcase.desc, func(t *testing.T) {
t.Parallel()
var sig *signal var sig *signal
v := New() v := New()
v.Page("/", func(c *Context) { v.Page("/", func(c *Context) {
c.View(func() h.H { return nil })
sig = c.Signal(testcase.given) sig = c.Signal(testcase.given)
c.View(func() h.H { return h.Div() })
}) })
assert.Equal(t, testcase.expected, sig.String()) assert.Equal(t, testcase.expected, sig.String())
})
} }
} }
func TestSignalReturnAsStringComplexTypes(t *testing.T) { func TestSignalReturnAsStringComplexTypes(t *testing.T) {
testcases := []struct { testcases := []struct {
desc string
given any given any
expected string expected string
}{ }{
{[]string{"test"}, `["test"]`}, {"string slice", []string{"test"}, `["test"]`},
{[]int{1, 2}, "[1, 2]"}, {"int slice", []int{1, 2}, "[1, 2]"},
{struct{ Val string }{"test"}, `{"Val": "test"}`}, {"struct1", struct{ Val string }{"test"}, `{"Val": "test"}`},
{struct { {"struct2", struct {
Num int Num int
IsPositive bool IsPositive bool
}{1, true}, `{"Num": 1, "IsPositive": true}`}, }{1, true}, `{"Num": 1, "IsPositive": true}`},
} }
for _, testcase := range testcases { for _, testcase := range testcases {
t.Run(testcase.desc, func(t *testing.T) {
t.Parallel()
var sig *signal var sig *signal
v := New() v := New()
v.Page("/", func(c *Context) { v.Page("/", func(c *Context) {
c.View(func() h.H { return nil }) c.View(func() h.H { return nil })
sig = c.Signal(testcase.given) sig = c.Signal(testcase.given)
}) })
assert.JSONEq(t, testcase.expected, sig.String()) assert.JSONEq(t, testcase.expected, sig.String())
})
} }
} }

6
via.go
View File

@@ -207,14 +207,13 @@ func (v *V) currSessionNum() int {
} }
func (v *V) unregisterCtx(c *Context) { func (v *V) unregisterCtx(c *Context) {
v.contextRegistryMutex.Lock()
defer v.contextRegistryMutex.Unlock()
if c.id == "" { if c.id == "" {
v.logErr(c, "unregister ctx failed: ctx contains empty id") v.logErr(c, "unregister ctx failed: ctx contains empty id")
return return
} }
v.contextRegistryMutex.Lock()
defer v.contextRegistryMutex.Unlock()
v.logDebug(c, "ctx removed from registry") v.logDebug(c, "ctx removed from registry")
c.stopAllRoutines()
delete(v.contextRegistry, c.id) delete(v.contextRegistry, c.id)
v.logDebug(nil, "number of sessions in registry: %d", v.currSessionNum()) v.logDebug(nil, "number of sessions in registry: %d", v.currSessionNum())
} }
@@ -463,6 +462,7 @@ func New() *V {
defer r.Body.Close() defer r.Body.Close()
cID := string(body) cID := string(body)
c, err := v.getCtx(cID) c, err := v.getCtx(cID)
c.stopAllRoutines()
if err != nil { if err != nil {
v.logErr(c, "failed to handle session close: %v", err) v.logErr(c, "failed to handle session close: %v", err)
return return