5 Commits

Author SHA1 Message Date
Ryan Hamamura
58ad9a2699 feat: add SSE keepalive and liveness tracking for resilient connections
Some checks failed
CI / Build and Test (push) Has been cancelled
Add 30s keepalive pings to prevent proxy/CDN idle timeouts from killing
SSE connections silently. Track lastSeenAt on each SSE connect attempt
so Datastar's retry signals keep contexts alive through the reaper.
2026-02-19 12:07:25 -10:00
Ryan Hamamura
f3a9c8036f refactor: use computed signals in pubsub-crud and chatroom examples
Some checks failed
CI / Build and Test (push) Has been cancelled
2026-02-19 09:03:13 -10:00
Ryan Hamamura
6763e1a420 feat: add computed signals for derived reactive values
All checks were successful
CI / Build and Test (push) Successful in 33s
Read-only signals whose value is a function of other signals,
recomputed automatically at sync time. Supports String, Int, Bool,
and Text methods. Components store computed signals on the parent
page context like regular signals.
2026-02-18 09:22:40 -10:00
Ryan Hamamura
5d61149fa3 fix: make embedded NATS opt-in so tests don't hang
All checks were successful
CI / Build and Test (push) Successful in 31s
Move NATS startup from New() to Start(), so tests that don't use
pubsub never block on server initialization. Add a 10s timeout to
WaitForServer() and skip NATS tests gracefully when unavailable.
2026-02-18 08:45:03 -10:00
Ryan Hamamura
08b7dbd17f feat: add WithDebounce and WithThrottle action trigger options
Unify event attribute construction through buildAttrKey() so debounce,
throttle, and window modifiers compose cleanly. OnChange no longer
hardcodes a 200ms debounce — callers opt in explicitly.
2026-02-18 08:44:58 -10:00
12 changed files with 598 additions and 64 deletions

View File

@@ -3,6 +3,7 @@ package via
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
) )
@@ -23,6 +24,8 @@ type triggerOpts struct {
value string value string
window bool window bool
preventDefault bool preventDefault bool
debounce time.Duration
throttle time.Duration
} }
type withSignalOpt struct { type withSignalOpt struct {
@@ -58,6 +61,41 @@ func WithPreventDefault() ActionTriggerOption {
return withPreventDefaultOpt{} return withPreventDefaultOpt{}
} }
type withDebounceOpt struct{ d time.Duration }
func (o withDebounceOpt) apply(opts *triggerOpts) { opts.debounce = o.d }
// WithDebounce adds a debounce modifier to the event trigger.
func WithDebounce(d time.Duration) ActionTriggerOption { return withDebounceOpt{d} }
type withThrottleOpt struct{ d time.Duration }
func (o withThrottleOpt) apply(opts *triggerOpts) { opts.throttle = o.d }
// WithThrottle adds a throttle modifier to the event trigger.
func WithThrottle(d time.Duration) ActionTriggerOption { return withThrottleOpt{d} }
// formatDuration renders a duration as e.g. "200ms" for Datastar modifiers.
func formatDuration(d time.Duration) string {
return fmt.Sprintf("%dms", d.Milliseconds())
}
// buildAttrKey constructs a Datastar attribute key with modifiers.
// Order: event → debounce/throttle → window.
func buildAttrKey(event string, opts *triggerOpts) string {
key := "on:" + event
if opts.debounce > 0 {
key += "__debounce." + formatDuration(opts.debounce)
}
if opts.throttle > 0 {
key += "__throttle." + formatDuration(opts.throttle)
}
if opts.window {
key += "__window"
}
return key
}
// WithSignal sets a signal value before triggering the action. // WithSignal sets a signal value before triggering the action.
func WithSignal(sig *signal, value string) ActionTriggerOption { func WithSignal(sig *signal, value string) ActionTriggerOption {
return withSignalOpt{ return withSignalOpt{
@@ -97,62 +135,62 @@ func actionURL(id string) string {
// to element nodes in a view. // to element nodes in a view.
func (a *actionTrigger) OnClick(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnClick(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:click", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("click", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnChange returns a via.h DOM attribute that triggers on input change. It can be added // OnChange returns a via.h DOM attribute that triggers on input change. It can be added
// to element nodes in a view. // to element nodes in a view.
func (a *actionTrigger) OnChange(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnChange(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:change__debounce.200ms", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("change", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnSubmit returns a via.h DOM attribute that triggers on form submit. // OnSubmit returns a via.h DOM attribute that triggers on form submit.
func (a *actionTrigger) OnSubmit(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnSubmit(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:submit", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("submit", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnInput returns a via.h DOM attribute that triggers on input (without debounce). // OnInput returns a via.h DOM attribute that triggers on input (without debounce).
func (a *actionTrigger) OnInput(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnInput(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:input", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("input", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnFocus returns a via.h DOM attribute that triggers when the element gains focus. // OnFocus returns a via.h DOM attribute that triggers when the element gains focus.
func (a *actionTrigger) OnFocus(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnFocus(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:focus", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("focus", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnBlur returns a via.h DOM attribute that triggers when the element loses focus. // OnBlur returns a via.h DOM attribute that triggers when the element loses focus.
func (a *actionTrigger) OnBlur(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnBlur(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:blur", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("blur", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnMouseEnter returns a via.h DOM attribute that triggers when the mouse enters the element. // OnMouseEnter returns a via.h DOM attribute that triggers when the mouse enters the element.
func (a *actionTrigger) OnMouseEnter(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnMouseEnter(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:mouseenter", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("mouseenter", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnMouseLeave returns a via.h DOM attribute that triggers when the mouse leaves the element. // OnMouseLeave returns a via.h DOM attribute that triggers when the mouse leaves the element.
func (a *actionTrigger) OnMouseLeave(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnMouseLeave(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:mouseleave", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("mouseleave", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnScroll returns a via.h DOM attribute that triggers on scroll. // OnScroll returns a via.h DOM attribute that triggers on scroll.
func (a *actionTrigger) OnScroll(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnScroll(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:scroll", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("scroll", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnDblClick returns a via.h DOM attribute that triggers on double click. // OnDblClick returns a via.h DOM attribute that triggers on double click.
func (a *actionTrigger) OnDblClick(options ...ActionTriggerOption) h.H { func (a *actionTrigger) OnDblClick(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...) opts := applyOptions(options...)
return h.Data("on:dblclick", buildOnExpr(actionURL(a.id), &opts)) return h.Data(buildAttrKey("dblclick", &opts), buildOnExpr(actionURL(a.id), &opts))
} }
// OnKeyDown returns a via.h DOM attribute that triggers when a key is pressed. // OnKeyDown returns a via.h DOM attribute that triggers when a key is pressed.
@@ -164,11 +202,7 @@ func (a *actionTrigger) OnKeyDown(key string, options ...ActionTriggerOption) h.
if key != "" { if key != "" {
condition = fmt.Sprintf("evt.key==='%s' &&", key) condition = fmt.Sprintf("evt.key==='%s' &&", key)
} }
attrName := "on:keydown" return h.Data(buildAttrKey("keydown", &opts), fmt.Sprintf("%s%s", condition, buildOnExpr(actionURL(a.id), &opts)))
if opts.window {
attrName = "on:keydown__window"
}
return h.Data(attrName, fmt.Sprintf("%s%s", condition, buildOnExpr(actionURL(a.id), &opts)))
} }
// KeyBinding pairs a key with an action and per-binding options. // KeyBinding pairs a key with an action and per-binding options.

55
computed.go Normal file
View File

@@ -0,0 +1,55 @@
package via
import (
"fmt"
"strconv"
"strings"
"github.com/ryanhamamura/via/h"
)
// computedSignal is a read-only signal whose value is derived from other signals.
// It recomputes on every read and is included in patches only when the value changes.
type computedSignal struct {
id string
compute func() string
lastVal string
changed bool
}
func (s *computedSignal) ID() string {
return s.id
}
func (s *computedSignal) String() string {
return s.compute()
}
func (s *computedSignal) Int() int {
if n, err := strconv.Atoi(s.String()); err == nil {
return n
}
return 0
}
func (s *computedSignal) Bool() bool {
val := strings.ToLower(s.String())
return val == "true" || val == "1" || val == "yes" || val == "on"
}
func (s *computedSignal) Text() h.H {
return h.Span(h.Data("text", "$"+s.id))
}
// recompute calls the compute function and sets changed if the value differs from lastVal.
func (s *computedSignal) recompute() {
val := s.compute()
if val != s.lastVal {
s.lastVal = val
s.changed = true
}
}
func (s *computedSignal) patchValue() string {
return fmt.Sprintf("%v", s.lastVal)
}

190
computed_test.go Normal file
View File

@@ -0,0 +1,190 @@
package via
import (
"bytes"
"fmt"
"testing"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
)
func TestComputedBasic(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig1 := c.Signal("hello")
sig2 := c.Signal("world")
cs = c.Computed(func() string {
return sig1.String() + " " + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello world", cs.String())
}
func TestComputedReactivity(t *testing.T) {
v := New()
var cs *computedSignal
var sig1 *signal
v.Page("/", func(c *Context) {
sig1 = c.Signal("a")
sig2 := c.Signal("b")
cs = c.Computed(func() string {
return sig1.String() + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "ab", cs.String())
sig1.SetValue("x")
assert.Equal(t, "xb", cs.String())
}
func TestComputedInt(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal(21)
cs = c.Computed(func() string {
return fmt.Sprintf("%d", sig.Int()*2)
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, 42, cs.Int())
}
func TestComputedBool(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal("true")
cs = c.Computed(func() string {
return sig.String()
})
c.View(func() h.H { return h.Div() })
})
assert.True(t, cs.Bool())
}
func TestComputedText(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
cs = c.Computed(func() string { return "hi" })
c.View(func() h.H { return h.Div() })
})
var buf bytes.Buffer
err := cs.Text().Render(&buf)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `data-text="$`+cs.ID()+`"`)
}
func TestComputedChangeDetection(t *testing.T) {
v := New()
var ctx *Context
var sig *signal
v.Page("/", func(c *Context) {
ctx = c
sig = c.Signal("a")
c.Computed(func() string {
return sig.String() + "!"
})
c.View(func() h.H { return h.Div() })
})
// First patch includes computed (changed=true from init)
patch1 := ctx.prepareSignalsForPatch()
assert.NotEmpty(t, patch1)
// Second patch: nothing changed, computed should not be included
patch2 := ctx.prepareSignalsForPatch()
// Regular signal still has changed=true (not reset in prepareSignalsForPatch),
// but computed should not appear since its value didn't change.
hasComputed := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
_, inPatch := patch2[cs.ID()]
hasComputed = inPatch
}
return true
})
assert.False(t, hasComputed)
// After changing dependency, computed should reappear
sig.SetValue("b")
patch3 := ctx.prepareSignalsForPatch()
found := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
if v, ok := patch3[cs.ID()]; ok {
assert.Equal(t, "b!", v)
found = true
}
}
return true
})
assert.True(t, found)
}
func TestComputedInComponent(t *testing.T) {
v := New()
var cs *computedSignal
var parentCtx *Context
v.Page("/", func(c *Context) {
parentCtx = c
c.Component(func(comp *Context) {
sig := comp.Signal("via")
cs = comp.Computed(func() string {
return "hello " + sig.String()
})
comp.View(func() h.H { return h.Div() })
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello via", cs.String())
// Verify it's stored on the parent page context
found := false
parentCtx.signals.Range(func(_, value any) bool {
if stored, ok := value.(*computedSignal); ok && stored.ID() == cs.ID() {
found = true
}
return true
})
assert.True(t, found)
}
func TestComputedIsReadOnly(t *testing.T) {
// Compile-time guarantee: *computedSignal has no Bind() or SetValue() methods.
// This test exists as documentation — if someone adds those methods, the
// interface assertion below will need updating and serve as a reminder.
var cs interface{} = &computedSignal{}
type writable interface {
SetValue(any)
}
type bindable interface {
Bind() h.H
}
_, isWritable := cs.(writable)
_, isBindable := cs.(bindable)
assert.False(t, isWritable, "computedSignal must not have SetValue")
assert.False(t, isBindable, "computedSignal must not have Bind")
}
func TestComputedInjectSignalsSkips(t *testing.T) {
v := New()
var ctx *Context
var cs *computedSignal
v.Page("/", func(c *Context) {
ctx = c
cs = c.Computed(func() string { return "fixed" })
c.View(func() h.H { return h.Div() })
})
// Simulate browser sending back a value for the computed signal — should be ignored
ctx.injectSignals(map[string]any{
cs.ID(): "injected",
})
assert.Equal(t, "fixed", cs.String())
}

View File

@@ -42,6 +42,7 @@ type Context struct {
createdAt time.Time createdAt time.Time
sseConnected atomic.Bool sseConnected atomic.Bool
sseDisconnectedAt atomic.Pointer[time.Time] sseDisconnectedAt atomic.Pointer[time.Time]
lastSeenAt atomic.Pointer[time.Time]
suspended atomic.Bool suspended atomic.Bool
} }
@@ -207,6 +208,40 @@ func (c *Context) Signal(v any) *signal {
} }
// Computed creates a read-only signal whose value is derived from the given function.
// The function is called on every read (String/Int/Bool) for fresh values,
// and during sync to detect changes for browser patches.
//
// Computed signals cannot be bound to inputs or set manually.
//
// Example:
//
// full := c.Computed(func() string {
// return first.String() + " " + last.String()
// })
// c.View(func() h.H {
// return h.Span(full.Text())
// })
func (c *Context) Computed(fn func() string) *computedSignal {
sigID := genRandID()
initial := fn()
cs := &computedSignal{
id: sigID,
compute: fn,
lastVal: initial,
changed: true,
}
c.mu.Lock()
defer c.mu.Unlock()
if c.isComponent() {
c.parentPageCtx.signals.Store(sigID, cs)
} else {
c.signals.Store(sigID, cs)
}
return cs
}
func (c *Context) injectSignals(sigs map[string]any) { func (c *Context) injectSignals(sigs map[string]any) {
if sigs == nil { if sigs == nil {
c.app.logErr(c, "signal injection failed: nil signals") c.app.logErr(c, "signal injection failed: nil signals")
@@ -248,7 +283,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
defer c.mu.RUnlock() defer c.mu.RUnlock()
updatedSigs := make(map[string]any) updatedSigs := make(map[string]any)
c.signals.Range(func(sigID, value any) bool { c.signals.Range(func(sigID, value any) bool {
if sig, ok := value.(*signal); ok { switch sig := value.(type) {
case *signal:
if sig.err != nil { if sig.err != nil {
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err) c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
return true return true
@@ -256,6 +292,12 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
if sig.changed { if sig.changed {
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val) updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val)
} }
case *computedSignal:
sig.recompute()
if sig.changed {
updatedSigs[sigID.(string)] = sig.patchValue()
sig.changed = false
}
} }
return true return true
}) })

View File

@@ -18,6 +18,12 @@ func ProfilePage(c *via.Context) {
via.MaxLen(20, "Must be at most 20 characters"), via.MaxLen(20, "Must be at most 20 characters"),
) )
selectedEmoji := c.Signal(existingEmoji) selectedEmoji := c.Signal(existingEmoji)
previewName := c.Computed(func() string {
if name := nameField.String(); name != "" {
return name
}
return "Your Name"
})
saveToSession := func() bool { saveToSession := func() bool {
if !c.ValidateAll() { if !c.ValidateAll() {
@@ -68,18 +74,13 @@ func ProfilePage(c *via.Context) {
h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()), h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()),
) )
previewName := nameField.String()
if previewName == "" {
previewName = "Your Name"
}
return h.Div(h.Class("profile-page"), return h.Div(h.Class("profile-page"),
h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")), h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")),
// Live preview // Live preview
h.Div(h.Class("profile-preview"), h.Div(h.Class("profile-preview"),
h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())), h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())),
h.Span(h.Class("preview-name"), h.Text(previewName)), h.Span(h.Class("preview-name"), previewName.Text()),
), ),
h.Div(h.Class("profile-form"), h.Div(h.Class("profile-form"),

View File

@@ -76,6 +76,12 @@ func main() {
titleSignal := c.Signal("") titleSignal := c.Signal("")
urlSignal := c.Signal("") urlSignal := c.Signal("")
targetIDSignal := c.Signal("") targetIDSignal := c.Signal("")
saveLabel := c.Computed(func() string {
if targetIDSignal.String() != "" {
return "Update Bookmark"
}
return "Add Bookmark"
})
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) { via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
if evt.UserID == userID { if evt.UserID == userID {
@@ -205,11 +211,6 @@ func main() {
} }
bookmarksMu.RUnlock() bookmarksMu.RUnlock()
saveLabel := "Add Bookmark"
if isEditing {
saveLabel = "Update Bookmark"
}
return h.Div(h.Class("min-h-screen bg-base-200"), return h.Div(h.Class("min-h-screen bg-base-200"),
// Navbar // Navbar
h.Div(h.Class("navbar bg-base-100 shadow-sm"), h.Div(h.Class("navbar bg-base-100 shadow-sm"),
@@ -225,7 +226,7 @@ func main() {
// Form card // Form card
h.Div(h.Class("card bg-base-100 shadow"), h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"), h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text(saveLabel)), h.H2(h.Class("card-title"), saveLabel.Text()),
h.Div(h.Class("flex flex-col gap-2"), h.Div(h.Class("flex flex-col gap-2"),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()), h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()), h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
@@ -233,7 +234,7 @@ func main() {
h.If(isEditing, h.If(isEditing,
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()), h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
), ),
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()), h.Button(h.Class("btn btn-primary"), saveLabel.Text(), save.OnClick()),
), ),
), ),
), ),

14
nats.go
View File

@@ -56,7 +56,19 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
os.RemoveAll(dataDir) os.RemoveAll(dataDir)
return nil, fmt.Errorf("start embedded nats: %w", err) return nil, fmt.Errorf("start embedded nats: %w", err)
} }
ns.WaitForServer() ready := make(chan struct{})
go func() {
ns.WaitForServer()
close(ready)
}()
select {
case <-ready:
case <-time.After(10 * time.Second):
ns.Close()
cancel()
os.RemoveAll(dataDir)
return nil, fmt.Errorf("embedded nats server did not start within 10s")
}
nc, err := ns.Client() nc, err := ns.Client()
if err != nil { if err != nil {

View File

@@ -10,9 +10,24 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestPubSub_RoundTrip(t *testing.T) { // setupNATSTest creates a *V with an embedded NATS server.
// Skips the test if NATS fails to start (e.g. port conflict in CI).
func setupNATSTest(t *testing.T) *V {
t.Helper()
v := New() v := New()
defer v.Shutdown() dn, err := getSharedNATS()
if err != nil {
v.Shutdown()
t.Skipf("embedded NATS unavailable: %v", err)
}
v.defaultNATS = dn
v.pubsub = &natsRef{dn: dn}
t.Cleanup(v.Shutdown)
return v
}
func TestPubSub_RoundTrip(t *testing.T) {
v := setupNATSTest(t)
var received []byte var received []byte
done := make(chan struct{}) done := make(chan struct{})
@@ -38,8 +53,7 @@ func TestPubSub_RoundTrip(t *testing.T) {
} }
func TestPubSub_MultipleSubscribers(t *testing.T) { func TestPubSub_MultipleSubscribers(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
var mu sync.Mutex var mu sync.Mutex
var results []string var results []string
@@ -84,8 +98,7 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
} }
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) { func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
c := newContext("cleanup-ctx", "/", v) c := newContext("cleanup-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
@@ -100,8 +113,7 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
} }
func TestPubSub_ManualUnsubscribe(t *testing.T) { func TestPubSub_ManualUnsubscribe(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
c := newContext("unsub-ctx", "/", v) c := newContext("unsub-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
@@ -120,8 +132,7 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) {
} }
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) { func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
// Panic-check context has id="" // Panic-check context has id=""
c := newContext("", "/", v) c := newContext("", "/", v)

View File

@@ -1,8 +1,9 @@
package via package via
// PubSub is an interface for publish/subscribe messaging backends. // PubSub is an interface for publish/subscribe messaging backends.
// By default, New() starts an embedded NATS server. Supply a custom // By default, Start() launches an embedded NATS server if no backend
// implementation via Config(Options{PubSub: yourBackend}) to override. // has been configured. Supply a custom implementation via
// Config(Options{PubSub: yourBackend}) to override.
type PubSub interface { type PubSub interface {
Publish(subject string, data []byte) error Publish(subject string, data []byte) error
Subscribe(subject string, handler func(data []byte)) (Subscription, error) Subscribe(subject string, handler func(data []byte)) (Subscription, error)

View File

@@ -10,8 +10,7 @@ import (
) )
func TestPublishSubscribe_RoundTrip(t *testing.T) { func TestPublishSubscribe_RoundTrip(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
type event struct { type event struct {
Name string `json:"name"` Name string `json:"name"`
@@ -43,8 +42,7 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) {
} }
func TestSubscribe_SkipsBadJSON(t *testing.T) { func TestSubscribe_SkipsBadJSON(t *testing.T) {
v := New() v := setupNATSTest(t)
defer v.Shutdown()
type msg struct { type msg struct {
Text string `json:"text"` Text string `json:"text"`

46
via.go
View File

@@ -331,15 +331,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
if c.sseConnected.Load() { if c.sseConnected.Load() {
continue continue
} }
var disconnectedFor time.Duration // Use the most recent liveness signal
if dc := c.sseDisconnectedAt.Load(); dc != nil { lastAlive := c.createdAt
disconnectedFor = now.Sub(*dc) if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
} else { lastAlive = *dc
disconnectedFor = now.Sub(c.createdAt)
} }
if disconnectedFor > ttl { if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
lastAlive = *seen
}
silentFor := now.Sub(lastAlive)
if silentFor > ttl {
toReap = append(toReap, c) toReap = append(toReap, c)
} else if disconnectedFor > suspendAfter && !c.suspended.Load() { } else if silentFor > suspendAfter && !c.suspended.Load() {
toSuspend = append(toSuspend, c) toSuspend = append(toSuspend, c)
} }
} }
@@ -358,6 +361,16 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM // Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
// signal is received, then performs a graceful shutdown. // signal is received, then performs a graceful shutdown.
func (v *V) Start() { func (v *V) Start() {
if v.pubsub == nil {
dn, err := getSharedNATS()
if err != nil {
v.logWarn(nil, "embedded NATS unavailable: %v", err)
} else {
v.defaultNATS = dn
v.pubsub = &natsRef{dn: dn}
}
}
handler := http.Handler(v.mux) handler := http.Handler(v.mux)
if v.sessionManager != nil { if v.sessionManager != nil {
handler = v.sessionManager.LoadAndSave(v.mux) handler = v.sessionManager.LoadAndSave(v.mux)
@@ -645,6 +658,8 @@ func New() *V {
return return
} }
c.reqCtx = r.Context() c.reqCtx = r.Context()
now := time.Now()
c.lastSeenAt.Store(&now)
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5)))) sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
@@ -678,17 +693,22 @@ func New() *V {
go c.Sync() go c.Sync()
keepalive := time.NewTicker(30 * time.Second)
defer keepalive.Stop()
for { for {
select { select {
case <-sse.Context().Done(): case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended") v.logDebug(c, "SSE connection ended")
c.sseConnected.Store(false) c.sseConnected.Store(false)
now := time.Now() dcNow := time.Now()
c.sseDisconnectedAt.Store(&now) c.sseDisconnectedAt.Store(&dcNow)
return return
case <-c.ctxDisposedChan: case <-c.ctxDisposedChan:
v.logDebug(c, "context disposed, closing SSE") v.logDebug(c, "context disposed, closing SSE")
return return
case <-keepalive.C:
sse.PatchSignals([]byte("{}"))
case patch := <-c.patchChan: case patch := <-c.patchChan:
switch patch.typ { switch patch.typ {
case patchTypeElements: case patchTypeElements:
@@ -833,14 +853,6 @@ func New() *V {
v.cleanupCtx(c) v.cleanupCtx(c)
}) })
dn, err := getSharedNATS()
if err != nil {
v.logWarn(nil, "embedded NATS unavailable: %v", err)
} else {
v.defaultNATS = dn
v.pubsub = &natsRef{dn: dn}
}
return v return v
} }

View File

@@ -127,7 +127,7 @@ func TestAction(t *testing.T) {
v.mux.ServeHTTP(w, req) v.mux.ServeHTTP(w, req)
body := w.Body.String() body := w.Body.String()
assert.Contains(t, body, "data-on:click") assert.Contains(t, body, "data-on:click")
assert.Contains(t, body, "data-on:change__debounce.200ms") assert.Contains(t, body, "data-on:change")
assert.Contains(t, body, "data-on:keydown") assert.Contains(t, body, "data-on:keydown")
assert.Contains(t, body, "/_action/") assert.Contains(t, body, "/_action/")
} }
@@ -281,6 +281,112 @@ func TestOnKeyDownMap(t *testing.T) {
}) })
} }
func TestFormatDuration(t *testing.T) {
tests := []struct {
d time.Duration
want string
}{
{200 * time.Millisecond, "200ms"},
{1 * time.Second, "1000ms"},
{50 * time.Millisecond, "50ms"},
}
for _, tt := range tests {
assert.Equal(t, tt.want, formatDuration(tt.d))
}
}
func TestBuildAttrKey(t *testing.T) {
tests := []struct {
name string
event string
opts triggerOpts
want string
}{
{"bare event", "click", triggerOpts{}, "on:click"},
{"debounce only", "change", triggerOpts{debounce: 200 * time.Millisecond}, "on:change__debounce.200ms"},
{"throttle only", "scroll", triggerOpts{throttle: 100 * time.Millisecond}, "on:scroll__throttle.100ms"},
{"window only", "keydown", triggerOpts{window: true}, "on:keydown__window"},
{"debounce + window", "input", triggerOpts{debounce: 300 * time.Millisecond, window: true}, "on:input__debounce.300ms__window"},
{"throttle + window", "scroll", triggerOpts{throttle: 500 * time.Millisecond, window: true}, "on:scroll__throttle.500ms__window"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, buildAttrKey(tt.event, &tt.opts))
})
}
}
func TestWithDebounce(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H {
return h.Button(trigger.OnClick(WithDebounce(300 * time.Millisecond)))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:click__debounce.300ms")
assert.Contains(t, body, "/_action/"+trigger.id)
}
func TestWithThrottle(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H {
return h.Div(trigger.OnScroll(WithThrottle(100 * time.Millisecond)))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:scroll__throttle.100ms")
assert.Contains(t, body, "/_action/"+trigger.id)
}
func TestWithDebounceOnChange(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H {
return h.Input(trigger.OnChange(WithDebounce(200 * time.Millisecond)))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:change__debounce.200ms")
assert.Contains(t, body, "/_action/"+trigger.id)
}
func TestDebounceWithWindow(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H {
return h.Div(trigger.OnKeyDown("Enter", WithDebounce(150*time.Millisecond), WithWindow()))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:keydown__debounce.150ms__window")
}
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
v := New() v := New()
v.Config(Options{DocumentTitle: "Test"}) v.Config(Options{DocumentTitle: "Test"})
@@ -312,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
func TestReaperSuspendsContext(t *testing.T) { func TestReaperSuspendsContext(t *testing.T) {
v := New() v := New()
c := newContext("suspend-1", "/", v) c := newContext("suspend-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
v.registerCtx(c) v.registerCtx(c)
@@ -326,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
func TestReaperReapsAfterTTL(t *testing.T) { func TestReaperReapsAfterTTL(t *testing.T) {
v := New() v := New()
c := newContext("reap-1", "/", v) c := newContext("reap-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour) dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -340,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
func TestReaperIgnoresAlreadySuspended(t *testing.T) { func TestReaperIgnoresAlreadySuspended(t *testing.T) {
v := New() v := New()
c := newContext("already-sus-1", "/", v) c := newContext("already-sus-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute) dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc) c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true) c.suspended.Store(true)
@@ -394,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
assert.Error(t, err, "context should be removed after cleanup") assert.Error(t, err, "context should be removed after cleanup")
} }
func TestReaperRespectsLastSeenAt(t *testing.T) {
v := New()
c := newContext("seen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
seen := time.Now().Add(-2 * time.Minute)
c.lastSeenAt.Store(&seen)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("seen-1")
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
assert.False(t, c.suspended.Load(), "context should not be suspended")
}
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
v := New()
c := newContext("noseen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
// no lastSeenAt set — should fall back to sseDisconnectedAt
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
got, err := v.getCtx("noseen-1")
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
}
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
v := New()
c := newContext("stale-seen-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc)
// lastSeenAt is also old — beyond TTL
seen := time.Now().Add(-90 * time.Minute)
c.lastSeenAt.Store(&seen)
c.suspended.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("stale-seen-1")
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
}
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
v := New()
c := newContext("seen-sse-1", "/", v)
v.registerCtx(c)
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
// Simulate what the SSE handler does after getCtx
now := time.Now()
c.lastSeenAt.Store(&now)
got := c.lastSeenAt.Load()
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
assert.WithinDuration(t, now, *got, time.Second)
}
func TestDevModeRemovePersistedFix(t *testing.T) { func TestDevModeRemovePersistedFix(t *testing.T) {
v := New() v := New()
v.cfg.DevMode = true v.cfg.DevMode = true