fix: try solution for race conditions; use brotli included in datastar sdk; small improvements

This commit is contained in:
Joao Goncalves
2025-11-14 17:16:09 -01:00
parent 351bed3ea1
commit 808d4dd0d1
6 changed files with 166 additions and 99 deletions

View File

@@ -2,13 +2,13 @@ package via
import (
"bytes"
"encoding/json"
"fmt"
"log"
"reflect"
"sync"
"github.com/go-via/via/h"
"github.com/starfederation/datastar-go/datastar"
)
// Context is the living bridge between Go and the browser.
@@ -21,10 +21,10 @@ type Context struct {
view func() h.H
componentRegistry map[string]*Context
parentPageCtx *Context
sse *datastar.ServerSentEventGenerator
patchChan chan patch
actionRegistry map[string]func()
signals map[string]*signal
signalsMux sync.Mutex
signals *sync.Map
mutex sync.RWMutex
}
// View defines the UI rendered by this context.
@@ -155,9 +155,9 @@ func (c *Context) Signal(v any) *signal {
// components register signals on parent page
if c.isComponent() {
c.parentPageCtx.signals[sigID] = sig
c.parentPageCtx.signals.Store(sigID, sig)
} else {
c.signals[sigID] = sig
c.signals.Store(sigID, sig)
}
return sig
@@ -168,56 +168,72 @@ func (c *Context) injectSignals(sigs map[string]any) {
c.app.logErr(c, "signal injection failed: nil signals in ctx")
return
}
for k, v := range sigs {
if _, ok := c.signals[k]; !ok {
c.signals[k] = &signal{
id: k,
t: reflect.TypeOf(v),
v: reflect.ValueOf(v),
}
c.mutex.Lock()
defer c.mutex.Unlock()
for sigID, val := range sigs {
if _, ok := c.signals.Load(sigID); !ok {
c.signals.Store(sigID, &signal{
id: sigID,
t: reflect.TypeOf(val),
v: reflect.ValueOf(val),
})
continue
}
c.signals[k].v = reflect.ValueOf(v)
c.signals[k].changed = false
item, _ := c.signals.Load(sigID)
if sig, ok := item.(*signal); ok {
sig.v = reflect.ValueOf(val)
sig.changed = false
}
}
}
func (c *Context) getSSE() *datastar.ServerSentEventGenerator {
func (c *Context) getSSE() chan patch {
// components use parent page sse stream
var sse *datastar.ServerSentEventGenerator
var patchChan chan patch
if c.isComponent() {
sse = c.parentPageCtx.sse
patchChan = c.parentPageCtx.parentPageCtx.patchChan
} else {
sse = c.sse
patchChan = c.patchChan
}
return sse
return patchChan
}
func (c *Context) prepareSignalsForPatch() map[string]any {
updatedSigs := make(map[string]any)
c.signals.Range(func(sigID, value any) bool {
if sig, ok := value.(*signal); ok {
if sig.err != nil {
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
return true
}
if sig.changed {
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.v)
}
}
return true
})
return updatedSigs
}
// Sync pushes the current view state and signal changes to the browser immediately
// over the live SSE event stream.
func (c *Context) Sync() {
sse := c.getSSE()
if sse == nil {
c.app.logWarn(c, "view out of sync: no sse stream")
return
}
elemsPatch := bytes.NewBuffer(make([]byte, 0))
if err := c.view().Render(elemsPatch); err != nil {
c.app.logErr(c, "sync view failed: %v", err)
return
}
_ = sse.PatchElements(elemsPatch.String())
updatedSigs := make(map[string]any)
for id, sig := range c.signals {
if sig.err != nil {
c.app.logWarn(c, "failed to sync signal '%s': %v", sig.id, sig.err)
}
if sig.changed && sig.err == nil {
updatedSigs[id] = fmt.Sprintf("%v", sig.v)
}
}
c.patchChan <- patch{patchTypeElements, elemsPatch.String()}
c.mutex.RLock()
defer c.mutex.RUnlock()
updatedSigs := c.prepareSignalsForPatch()
if len(updatedSigs) != 0 {
_ = sse.MarshalAndPatchSignals(updatedSigs)
outgoingSigs, _ := json.Marshal(updatedSigs)
c.patchChan <- patch{patchTypeSignals, string(outgoingSigs)}
}
}
@@ -254,28 +270,19 @@ func (c *Context) SyncElements(elem h.H) {
}
b := bytes.NewBuffer(make([]byte, 0))
_ = elem.Render(b)
_ = sse.PatchElements(b.String())
c.patchChan <- patch{patchTypeElements, b.String()}
}
// SyncSignals pushes the current signal changes to the browser immediately
// over the live SSE event stream.
func (c *Context) SyncSignals() {
sse := c.getSSE()
if sse == nil {
c.app.logWarn(c, "signals out of sync: no sse stream")
return
}
updatedSigs := make(map[string]any)
for id, sig := range c.signals {
if sig.err != nil {
c.app.logWarn(c, "signal out of sync'%s': %v", sig.id, sig.err)
}
if sig.changed && sig.err == nil {
updatedSigs[id] = fmt.Sprintf("%v", sig.v)
}
}
c.mutex.RLock()
updatedSigs := c.prepareSignalsForPatch()
defer c.mutex.RUnlock()
if len(updatedSigs) != 0 {
_ = sse.MarshalAndPatchSignals(updatedSigs)
outgoingSignals, _ := json.Marshal(updatedSigs)
c.patchChan <- patch{patchTypeSignals, string(outgoingSignals)}
}
}
@@ -285,7 +292,7 @@ func (c *Context) ExecScript(s string) {
c.app.logWarn(c, "script out of sync: no sse stream")
return
}
_ = sse.ExecuteScript(s)
c.patchChan <- patch{patchTypeScript, s}
}
func newContext(id string, route string, app *V) *Context {
@@ -299,6 +306,7 @@ func newContext(id string, route string, app *V) *Context {
app: app,
componentRegistry: make(map[string]*Context),
actionRegistry: make(map[string]func()),
signals: make(map[string]*signal),
signals: new(sync.Map),
patchChan: make(chan patch, 100),
}
}