6 Commits

Author SHA1 Message Date
Ryan Hamamura
742212fd20 feat: add maplibre subpackage for type-safe MapLibre GL JS maps
Some checks failed
CI / Build and Test (push) Has been cancelled
Provides a Go API for interactive maps within Via applications:
- Plugin serves vendored MapLibre GL JS v4.7.1 assets
- Map struct with pre/post-render source, layer, marker, popup management
- Viewport signal sync (center, zoom, bearing, pitch) via hidden inputs
- FlyTo, SetCenter, SetZoom and other viewport setters via ExecScript
- Idempotent init script with SPA cleanup via MutationObserver
- Example app demonstrating markers, GeoJSON layers, and FlyTo actions
2026-02-19 13:37:16 -10:00
Ryan Hamamura
60009124c9 feat: add declarative Options.Streams for automatic JetStream stream creation
Some checks failed
CI / Build and Test (push) Has been cancelled
Streams listed in Options.Streams are created by Start() when the
embedded NATS server initializes, replacing manual EnsureStream calls
during setup. Migrates nats-chatroom and pubsub-crud examples.
2026-02-19 12:24:44 -10:00
Ryan Hamamura
42b21348cb fix: use random port for embedded NATS to avoid binding conflicts
Port 0 is treated as default (4222) by NATS server, causing hangs when
that port is unavailable. Port -1 (RANDOM_PORT) binds to an OS-assigned
free port, which is correct for an embedded server.
2026-02-19 12:24:37 -10:00
Ryan Hamamura
58ad9a2699 feat: add SSE keepalive and liveness tracking for resilient connections
Some checks failed
CI / Build and Test (push) Has been cancelled
Add 30s keepalive pings to prevent proxy/CDN idle timeouts from killing
SSE connections silently. Track lastSeenAt on each SSE connect attempt
so Datastar's retry signals keep contexts alive through the reaper.
2026-02-19 12:07:25 -10:00
Ryan Hamamura
f3a9c8036f refactor: use computed signals in pubsub-crud and chatroom examples
Some checks failed
CI / Build and Test (push) Has been cancelled
2026-02-19 09:03:13 -10:00
Ryan Hamamura
6763e1a420 feat: add computed signals for derived reactive values
All checks were successful
CI / Build and Test (push) Successful in 33s
Read-only signals whose value is a function of other signals,
recomputed automatically at sync time. Supports String, Int, Bool,
and Text methods. Components store computed signals on the parent
page context like regular signals.
2026-02-18 09:22:40 -10:00
18 changed files with 1362 additions and 97 deletions

55
computed.go Normal file
View File

@@ -0,0 +1,55 @@
package via
import (
"fmt"
"strconv"
"strings"
"github.com/ryanhamamura/via/h"
)
// computedSignal is a read-only signal whose value is derived from other signals.
// It recomputes on every read and is included in patches only when the value changes.
type computedSignal struct {
id string
compute func() string
lastVal string
changed bool
}
func (s *computedSignal) ID() string {
return s.id
}
func (s *computedSignal) String() string {
return s.compute()
}
func (s *computedSignal) Int() int {
if n, err := strconv.Atoi(s.String()); err == nil {
return n
}
return 0
}
func (s *computedSignal) Bool() bool {
val := strings.ToLower(s.String())
return val == "true" || val == "1" || val == "yes" || val == "on"
}
func (s *computedSignal) Text() h.H {
return h.Span(h.Data("text", "$"+s.id))
}
// recompute calls the compute function and sets changed if the value differs from lastVal.
func (s *computedSignal) recompute() {
val := s.compute()
if val != s.lastVal {
s.lastVal = val
s.changed = true
}
}
func (s *computedSignal) patchValue() string {
return fmt.Sprintf("%v", s.lastVal)
}

190
computed_test.go Normal file
View File

@@ -0,0 +1,190 @@
package via
import (
"bytes"
"fmt"
"testing"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
)
func TestComputedBasic(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig1 := c.Signal("hello")
sig2 := c.Signal("world")
cs = c.Computed(func() string {
return sig1.String() + " " + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello world", cs.String())
}
func TestComputedReactivity(t *testing.T) {
v := New()
var cs *computedSignal
var sig1 *signal
v.Page("/", func(c *Context) {
sig1 = c.Signal("a")
sig2 := c.Signal("b")
cs = c.Computed(func() string {
return sig1.String() + sig2.String()
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "ab", cs.String())
sig1.SetValue("x")
assert.Equal(t, "xb", cs.String())
}
func TestComputedInt(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal(21)
cs = c.Computed(func() string {
return fmt.Sprintf("%d", sig.Int()*2)
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, 42, cs.Int())
}
func TestComputedBool(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
sig := c.Signal("true")
cs = c.Computed(func() string {
return sig.String()
})
c.View(func() h.H { return h.Div() })
})
assert.True(t, cs.Bool())
}
func TestComputedText(t *testing.T) {
v := New()
var cs *computedSignal
v.Page("/", func(c *Context) {
cs = c.Computed(func() string { return "hi" })
c.View(func() h.H { return h.Div() })
})
var buf bytes.Buffer
err := cs.Text().Render(&buf)
assert.NoError(t, err)
assert.Contains(t, buf.String(), `data-text="$`+cs.ID()+`"`)
}
func TestComputedChangeDetection(t *testing.T) {
v := New()
var ctx *Context
var sig *signal
v.Page("/", func(c *Context) {
ctx = c
sig = c.Signal("a")
c.Computed(func() string {
return sig.String() + "!"
})
c.View(func() h.H { return h.Div() })
})
// First patch includes computed (changed=true from init)
patch1 := ctx.prepareSignalsForPatch()
assert.NotEmpty(t, patch1)
// Second patch: nothing changed, computed should not be included
patch2 := ctx.prepareSignalsForPatch()
// Regular signal still has changed=true (not reset in prepareSignalsForPatch),
// but computed should not appear since its value didn't change.
hasComputed := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
_, inPatch := patch2[cs.ID()]
hasComputed = inPatch
}
return true
})
assert.False(t, hasComputed)
// After changing dependency, computed should reappear
sig.SetValue("b")
patch3 := ctx.prepareSignalsForPatch()
found := false
ctx.signals.Range(func(_, value any) bool {
if cs, ok := value.(*computedSignal); ok {
if v, ok := patch3[cs.ID()]; ok {
assert.Equal(t, "b!", v)
found = true
}
}
return true
})
assert.True(t, found)
}
func TestComputedInComponent(t *testing.T) {
v := New()
var cs *computedSignal
var parentCtx *Context
v.Page("/", func(c *Context) {
parentCtx = c
c.Component(func(comp *Context) {
sig := comp.Signal("via")
cs = comp.Computed(func() string {
return "hello " + sig.String()
})
comp.View(func() h.H { return h.Div() })
})
c.View(func() h.H { return h.Div() })
})
assert.Equal(t, "hello via", cs.String())
// Verify it's stored on the parent page context
found := false
parentCtx.signals.Range(func(_, value any) bool {
if stored, ok := value.(*computedSignal); ok && stored.ID() == cs.ID() {
found = true
}
return true
})
assert.True(t, found)
}
func TestComputedIsReadOnly(t *testing.T) {
// Compile-time guarantee: *computedSignal has no Bind() or SetValue() methods.
// This test exists as documentation — if someone adds those methods, the
// interface assertion below will need updating and serve as a reminder.
var cs interface{} = &computedSignal{}
type writable interface {
SetValue(any)
}
type bindable interface {
Bind() h.H
}
_, isWritable := cs.(writable)
_, isBindable := cs.(bindable)
assert.False(t, isWritable, "computedSignal must not have SetValue")
assert.False(t, isBindable, "computedSignal must not have Bind")
}
func TestComputedInjectSignalsSkips(t *testing.T) {
v := New()
var ctx *Context
var cs *computedSignal
v.Page("/", func(c *Context) {
ctx = c
cs = c.Computed(func() string { return "fixed" })
c.View(func() h.H { return h.Div() })
})
// Simulate browser sending back a value for the computed signal — should be ignored
ctx.injectSignals(map[string]any{
cs.ID(): "injected",
})
assert.Equal(t, "fixed", cs.String())
}

View File

@@ -53,10 +53,15 @@ type Options struct {
// Defaults to "/_datastar.js" if empty.
DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
// embedded NATS backend, or supply any PubSub implementation.
// PubSub enables publish/subscribe messaging. When nil, an embedded NATS
// server starts automatically in Start(). Supply any PubSub implementation
// to replace it.
PubSub PubSub
// Streams declares JetStream streams to create when Start() initializes
// the embedded NATS server. Ignored when a custom PubSub is configured.
Streams []StreamConfig
// ContextSuspendAfter is the time a context may be disconnected before
// the reaper suspends it (frees page resources but keeps the context
// shell for seamless re-init on reconnect). Default: 15m.

View File

@@ -42,6 +42,7 @@ type Context struct {
createdAt time.Time
sseConnected atomic.Bool
sseDisconnectedAt atomic.Pointer[time.Time]
lastSeenAt atomic.Pointer[time.Time]
suspended atomic.Bool
}
@@ -207,6 +208,40 @@ func (c *Context) Signal(v any) *signal {
}
// Computed creates a read-only signal whose value is derived from the given function.
// The function is called on every read (String/Int/Bool) for fresh values,
// and during sync to detect changes for browser patches.
//
// Computed signals cannot be bound to inputs or set manually.
//
// Example:
//
// full := c.Computed(func() string {
// return first.String() + " " + last.String()
// })
// c.View(func() h.H {
// return h.Span(full.Text())
// })
func (c *Context) Computed(fn func() string) *computedSignal {
sigID := genRandID()
initial := fn()
cs := &computedSignal{
id: sigID,
compute: fn,
lastVal: initial,
changed: true,
}
c.mu.Lock()
defer c.mu.Unlock()
if c.isComponent() {
c.parentPageCtx.signals.Store(sigID, cs)
} else {
c.signals.Store(sigID, cs)
}
return cs
}
func (c *Context) injectSignals(sigs map[string]any) {
if sigs == nil {
c.app.logErr(c, "signal injection failed: nil signals")
@@ -248,7 +283,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
defer c.mu.RUnlock()
updatedSigs := make(map[string]any)
c.signals.Range(func(sigID, value any) bool {
if sig, ok := value.(*signal); ok {
switch sig := value.(type) {
case *signal:
if sig.err != nil {
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
return true
@@ -256,6 +292,12 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
if sig.changed {
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val)
}
case *computedSignal:
sig.recompute()
if sig.changed {
updatedSigs[sigID.(string)] = sig.patchValue()
sig.changed = false
}
}
return true
})

View File

@@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state.
## PubSub
Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box.
Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box.
### Interface
@@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
### Ensure a stream exists
### Declaring streams
The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server:
```go
err := via.EnsureStream(v, via.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
v.Config(via.Options{
Streams: []via.StreamConfig{{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
})
```
@@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{
| `MaxMsgs` | Maximum number of messages to retain |
| `MaxAge` | Maximum age before messages are discarded |
Call `EnsureStream` during app initialization, before `v.Start()`.
For dynamic stream creation after startup, `EnsureStream` is also available:
```go
err := via.EnsureStream(v, via.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
MaxMsgs: 500,
MaxAge: 12 * time.Hour,
})
```
### Replay history

View File

@@ -0,0 +1,111 @@
package main
import (
"fmt"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/maplibre"
)
func main() {
v := via.New()
v.Config(via.Options{
DocumentTitle: "MapLibre GL Example",
ServerAddress: ":7331",
DevMode: true,
Plugins: []via.Plugin{maplibre.Plugin},
})
v.Page("/", func(c *via.Context) {
m := maplibre.New(c, maplibre.Options{
Style: "https://demotiles.maplibre.org/style.json",
Center: maplibre.LngLat{Lng: -122.4194, Lat: 37.7749},
Zoom: 10,
Height: "500px",
})
// Markers with popups
m.AddMarker("sf", maplibre.Marker{
LngLat: maplibre.LngLat{Lng: -122.4194, Lat: 37.7749},
Color: "#e74c3c",
Popup: &maplibre.Popup{
Content: "<strong>San Francisco</strong><p>The Golden City</p>",
},
})
m.AddMarker("oak", maplibre.Marker{
LngLat: maplibre.LngLat{Lng: -122.2711, Lat: 37.8044},
Color: "#2ecc71",
Popup: &maplibre.Popup{
Content: "<strong>Oakland</strong>",
},
})
// GeoJSON polygon source + fill layer
m.AddSource("park", maplibre.GeoJSONSource{
Data: map[string]any{
"type": "Feature",
"geometry": map[string]any{
"type": "Polygon",
"coordinates": []any{[]any{
[]float64{-122.4547, 37.7654},
[]float64{-122.4547, 37.7754},
[]float64{-122.4387, 37.7754},
[]float64{-122.4387, 37.7654},
[]float64{-122.4547, 37.7654},
}},
},
"properties": map[string]any{
"name": "Golden Gate Park",
},
},
})
m.AddLayer(maplibre.Layer{
ID: "park-fill",
Type: "fill",
Source: "park",
Paint: map[string]any{
"fill-color": "#2ecc71",
"fill-opacity": 0.3,
},
})
// Viewport info signal (updated on action)
viewportInfo := c.Signal("")
// FlyTo action
flyToSF := c.Action(func() {
m.FlyTo(maplibre.LngLat{Lng: -122.4194, Lat: 37.7749}, 14)
})
flyToOak := c.Action(func() {
m.FlyTo(maplibre.LngLat{Lng: -122.2711, Lat: 37.8044}, 14)
})
// Read viewport action
readViewport := c.Action(func() {
center := m.Center()
zoom := m.Zoom()
viewportInfo.SetValue(fmt.Sprintf("Center: %.4f, %.4f | Zoom: %.1f", center.Lng, center.Lat, zoom))
c.Sync()
})
c.View(func() h.H {
return h.Div(
h.Div(
h.Attr("style", "max-width:960px;margin:0 auto;padding:1rem;font-family:sans-serif"),
h.H1(h.Text("MapLibre GL Example")),
m.Element(),
h.Div(h.Attr("style", "margin-top:1rem;display:flex;gap:0.5rem"),
h.Button(h.Text("Fly to San Francisco"), flyToSF.OnClick()),
h.Button(h.Text("Fly to Oakland"), flyToOak.OnClick()),
h.Button(h.Text("Read Viewport"), readViewport.OnClick()),
),
h.P(viewportInfo.Text()),
),
)
})
})
v.Start()
}

View File

@@ -2,7 +2,7 @@
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required.
Via includes an embedded NATS server that starts automatically no external server required.
## Key Differences from Original Chatroom
@@ -25,21 +25,6 @@ That's it. No separate NATS server needed.
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
## How Embedded NATS Works
```go
// Start embedded NATS server (JetStream enabled by default)
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
ns.WaitForServer()
// Get client connection to embedded server
nc, err := ns.Client()
```
Data is persisted to `./data/nats/` for JetStream durability.
## Architecture
```
@@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability.
## JetStream Durability
Messages persist to disk via JetStream:
Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server:
```go
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000, // Keep last 1000 messages
MaxAge: 24 * time.Hour,
v.Config(via.Options{
Streams: []via.StreamConfig{{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
})
```
@@ -87,23 +74,6 @@ Stop and restart the app - chat history survives.
- Manual join/leave channels
**This example - ~60 lines of NATS integration:**
- `embeddednats.New()` starts the server
- `nc.Subscribe(subject, handler)` for receiving
- `nc.Publish(subject, data)` for sending
- NATS handles delivery, no polling
## Next Steps
If this pattern proves useful, it could be promoted to a Via plugin:
```go
// Hypothetical future API
v.Config(via.WithEmbeddedNATS("./data/nats"))
// In page init
c.Subscribe("events.user.*", func(data []byte) {
c.Sync()
})
c.Publish("events.user.login", userData)
```
- `via.Subscribe(c, subject, handler)` for receiving
- `via.Publish(c, subject, data)` for sending
- Streams declared in `Options` — NATS handles delivery, no polling

View File

@@ -21,18 +21,14 @@ func main() {
DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
Streams: []via.StreamConfig{{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
})
err := via.EnsureStream(v, via.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
h.StyleEl(h.Raw(chatCSS)),
@@ -76,6 +72,6 @@ func main() {
protected := v.Group("", requireProfile)
protected.Page("/", ChatPage)
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)")
log.Println("Starting NATS chatroom on :7331")
v.Start()
}

View File

@@ -18,6 +18,12 @@ func ProfilePage(c *via.Context) {
via.MaxLen(20, "Must be at most 20 characters"),
)
selectedEmoji := c.Signal(existingEmoji)
previewName := c.Computed(func() string {
if name := nameField.String(); name != "" {
return name
}
return "Your Name"
})
saveToSession := func() bool {
if !c.ValidateAll() {
@@ -68,18 +74,13 @@ func ProfilePage(c *via.Context) {
h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()),
)
previewName := nameField.String()
if previewName == "" {
previewName = "Your Name"
}
return h.Div(h.Class("profile-page"),
h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")),
// Live preview
h.Div(h.Class("profile-preview"),
h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())),
h.Span(h.Class("preview-name"), h.Text(previewName)),
h.Span(h.Class("preview-name"), previewName.Text()),
),
h.Div(h.Class("profile-form"),

View File

@@ -53,18 +53,14 @@ func main() {
DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
Streams: []via.StreamConfig{{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
}},
})
err := via.EnsureStream(v, via.StreamConfig{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
h.Script(h.Src("https://cdn.tailwindcss.com")),
@@ -76,6 +72,12 @@ func main() {
titleSignal := c.Signal("")
urlSignal := c.Signal("")
targetIDSignal := c.Signal("")
saveLabel := c.Computed(func() string {
if targetIDSignal.String() != "" {
return "Update Bookmark"
}
return "Add Bookmark"
})
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
if evt.UserID == userID {
@@ -205,11 +207,6 @@ func main() {
}
bookmarksMu.RUnlock()
saveLabel := "Add Bookmark"
if isEditing {
saveLabel = "Update Bookmark"
}
return h.Div(h.Class("min-h-screen bg-base-200"),
// Navbar
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
@@ -225,7 +222,7 @@ func main() {
// Form card
h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text(saveLabel)),
h.H2(h.Class("card-title"), saveLabel.Text()),
h.Div(h.Class("flex flex-col gap-2"),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
@@ -233,7 +230,7 @@ func main() {
h.If(isEditing,
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
),
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()),
h.Button(h.Class("btn btn-primary"), saveLabel.Text(), save.OnClick()),
),
),
),

226
maplibre/js.go Normal file
View File

@@ -0,0 +1,226 @@
package maplibre
import (
"encoding/json"
"fmt"
"strings"
)
// guard wraps JS code so it only runs when the map instance exists.
// The body can reference the map as `m`.
func guard(mapID, body string) string {
return fmt.Sprintf(
`(function(){var m=window.__via_maps&&window.__via_maps[%s];if(!m)return;%s})()`,
jsonStr(mapID), body,
)
}
// jsonStr JSON-encodes a string for safe embedding in JS.
func jsonStr(s string) string {
b, _ := json.Marshal(s)
return string(b)
}
// jsonVal JSON-encodes an arbitrary value for safe embedding in JS.
func jsonVal(v any) string {
b, _ := json.Marshal(v)
return string(b)
}
// initScript generates the idempotent map initialization JS.
func initScript(m *Map) string {
var b strings.Builder
b.WriteString(fmt.Sprintf(
`(function(){if(window.__via_maps&&window.__via_maps[%[1]s])return;`,
jsonStr(m.id),
))
b.WriteString(fmt.Sprintf(
`var map=new maplibregl.Map({container:%s,style:%s,center:[%s,%s],zoom:%s`,
jsonStr("_vmap_"+m.id),
jsonStr(m.opts.Style),
formatFloat(m.opts.Center.Lng),
formatFloat(m.opts.Center.Lat),
formatFloat(m.opts.Zoom),
))
if m.opts.Bearing != 0 {
b.WriteString(fmt.Sprintf(`,bearing:%s`, formatFloat(m.opts.Bearing)))
}
if m.opts.Pitch != 0 {
b.WriteString(fmt.Sprintf(`,pitch:%s`, formatFloat(m.opts.Pitch)))
}
if m.opts.MinZoom != 0 {
b.WriteString(fmt.Sprintf(`,minZoom:%s`, formatFloat(m.opts.MinZoom)))
}
if m.opts.MaxZoom != 0 {
b.WriteString(fmt.Sprintf(`,maxZoom:%s`, formatFloat(m.opts.MaxZoom)))
}
b.WriteString(`});`)
b.WriteString(`if(!window.__via_maps)window.__via_maps={};`)
b.WriteString(fmt.Sprintf(`window.__via_maps[%s]=map;`, jsonStr(m.id)))
b.WriteString(`map._via_markers={};map._via_popups={};`)
// Pre-render sources, layers, markers, popups run on 'load'
if len(m.sources) > 0 || len(m.layers) > 0 || len(m.markers) > 0 || len(m.popups) > 0 {
b.WriteString(`map.on('load',function(){`)
for _, src := range m.sources {
b.WriteString(fmt.Sprintf(`map.addSource(%s,%s);`, jsonStr(src.id), src.js))
}
for _, layer := range m.layers {
if layer.Before != "" {
b.WriteString(fmt.Sprintf(`map.addLayer(%s,%s);`, layer.toJS(), jsonStr(layer.Before)))
} else {
b.WriteString(fmt.Sprintf(`map.addLayer(%s);`, layer.toJS()))
}
}
for _, me := range m.markers {
b.WriteString(markerBodyJS(me.id, me.marker))
}
for _, pe := range m.popups {
b.WriteString(popupBodyJS(pe.id, pe.popup))
}
b.WriteString(`});`)
}
// Sync viewport signals on moveend via hidden inputs
b.WriteString(fmt.Sprintf(`map.on('moveend',function(){`+
`var c=map.getCenter();`+
`var el=document.getElementById(%[1]s);if(!el)return;`+
`var inputs=el.querySelectorAll('input[data-bind]');`+
`inputs.forEach(function(inp){`+
`var sig=inp.getAttribute('data-bind');`+
`if(sig===%[2]s)inp.value=c.lng;`+
`else if(sig===%[3]s)inp.value=c.lat;`+
`else if(sig===%[4]s)inp.value=map.getZoom();`+
`else if(sig===%[5]s)inp.value=map.getBearing();`+
`else if(sig===%[6]s)inp.value=map.getPitch();`+
`inp.dispatchEvent(new Event('input',{bubbles:true}));`+
`});`+
`});`,
jsonStr("_vwrap_"+m.id),
jsonStr(m.centerLng.ID()),
jsonStr(m.centerLat.ID()),
jsonStr(m.zoom.ID()),
jsonStr(m.bearing.ID()),
jsonStr(m.pitch.ID()),
))
// ResizeObserver for auto-resize
b.WriteString(fmt.Sprintf(
`var ro=new ResizeObserver(function(){map.resize();});`+
`ro.observe(document.getElementById(%s));`,
jsonStr("_vmap_"+m.id),
))
// MutationObserver to clean up on DOM removal (SPA nav)
b.WriteString(fmt.Sprintf(
`var container=document.getElementById(%[1]s);`+
`if(container){var mo=new MutationObserver(function(){`+
`if(!document.contains(container)){`+
`mo.disconnect();ro.disconnect();map.remove();`+
`delete window.__via_maps[%[2]s];`+
`}});`+
`mo.observe(document.body,{childList:true,subtree:true});}`,
jsonStr("_vmap_"+m.id),
jsonStr(m.id),
))
b.WriteString(`})()`)
return b.String()
}
// markerBodyJS generates JS to add a marker, assuming `map` is in scope.
// Used inside the init script's load callback.
func markerBodyJS(markerID string, mk Marker) string {
var b strings.Builder
opts := "{"
if mk.Color != "" {
opts += fmt.Sprintf(`color:%s,`, jsonStr(mk.Color))
}
if mk.Draggable {
opts += `draggable:true,`
}
opts += "}"
b.WriteString(fmt.Sprintf(`var mk=new maplibregl.Marker(%s).setLngLat([%s,%s]);`,
opts, formatFloat(mk.LngLat.Lng), formatFloat(mk.LngLat.Lat)))
if mk.Popup != nil {
b.WriteString(popupConstructorJS(*mk.Popup, "pk"))
b.WriteString(`mk.setPopup(pk);`)
}
b.WriteString(fmt.Sprintf(`mk.addTo(map);map._via_markers[%s]=mk;`, jsonStr(markerID)))
return b.String()
}
// addMarkerJS generates a self-contained IIFE to add a marker post-render.
func addMarkerJS(mapID, markerID string, mk Marker) string {
var b strings.Builder
b.WriteString(fmt.Sprintf(
`(function(){var map=window.__via_maps&&window.__via_maps[%s];if(!map)return;`,
jsonStr(mapID)))
// Remove existing marker with same ID
b.WriteString(fmt.Sprintf(
`if(map._via_markers[%[1]s]){map._via_markers[%[1]s].remove();delete map._via_markers[%[1]s];}`,
jsonStr(markerID)))
b.WriteString(markerBodyJS(markerID, mk))
b.WriteString(`})()`)
return b.String()
}
// removeMarkerJS generates JS to remove a marker. Expects `m` in scope (used inside guard).
func removeMarkerJS(markerID string) string {
return fmt.Sprintf(
`if(m._via_markers[%[1]s]){m._via_markers[%[1]s].remove();delete m._via_markers[%[1]s];}`,
jsonStr(markerID))
}
// popupBodyJS generates JS to show a popup, assuming `map` is in scope.
func popupBodyJS(popupID string, p Popup) string {
var b strings.Builder
b.WriteString(popupConstructorJS(p, "p"))
b.WriteString(fmt.Sprintf(
`p.setLngLat([%s,%s]).addTo(map);map._via_popups[%s]=p;`,
formatFloat(p.LngLat.Lng), formatFloat(p.LngLat.Lat), jsonStr(popupID)))
return b.String()
}
// showPopupJS generates a self-contained IIFE to show a popup post-render.
func showPopupJS(mapID, popupID string, p Popup) string {
var b strings.Builder
b.WriteString(fmt.Sprintf(
`(function(){var map=window.__via_maps&&window.__via_maps[%s];if(!map)return;`,
jsonStr(mapID)))
// Close existing popup with same ID
b.WriteString(fmt.Sprintf(
`if(map._via_popups[%[1]s]){map._via_popups[%[1]s].remove();delete map._via_popups[%[1]s];}`,
jsonStr(popupID)))
b.WriteString(popupBodyJS(popupID, p))
b.WriteString(`})()`)
return b.String()
}
// closePopupJS generates JS to close a popup. Expects `m` in scope (used inside guard).
func closePopupJS(popupID string) string {
return fmt.Sprintf(
`if(m._via_popups[%[1]s]){m._via_popups[%[1]s].remove();delete m._via_popups[%[1]s];}`,
jsonStr(popupID))
}
// popupConstructorJS generates JS to create a Popup object stored in varName.
func popupConstructorJS(p Popup, varName string) string {
opts := "{"
if p.HideCloseButton {
opts += `closeButton:false,`
}
if p.MaxWidth != "" {
opts += fmt.Sprintf(`maxWidth:%s,`, jsonStr(p.MaxWidth))
}
opts += "}"
return fmt.Sprintf(`var %s=new maplibregl.Popup(%s).setHTML(%s);`,
varName, opts, jsonStr(p.Content))
}
func formatFloat(f float64) string {
return fmt.Sprintf("%g", f)
}

1
maplibre/maplibre-gl.css Normal file

File diff suppressed because one or more lines are too long

59
maplibre/maplibre-gl.js Normal file

File diff suppressed because one or more lines are too long

326
maplibre/maplibre.go Normal file
View File

@@ -0,0 +1,326 @@
// Package maplibre provides a Go API for MapLibre GL JS maps within Via applications.
//
// It follows the same ExecScript + DataIgnoreMorph pattern used for other client-side
// JS library integrations (e.g. ECharts in the realtimechart example).
package maplibre
import (
"crypto/rand"
_ "embed"
"encoding/hex"
"fmt"
"net/http"
"strconv"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
)
//go:embed maplibre-gl.js
var maplibreJS []byte
//go:embed maplibre-gl.css
var maplibreCSS []byte
// Plugin serves the embedded MapLibre GL JS/CSS and injects them into the document head.
func Plugin(v *via.V) {
v.HTTPServeMux().HandleFunc("GET /_maplibre/maplibre-gl.js", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/javascript")
_, _ = w.Write(maplibreJS)
})
v.HTTPServeMux().HandleFunc("GET /_maplibre/maplibre-gl.css", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/css")
_, _ = w.Write(maplibreCSS)
})
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("/_maplibre/maplibre-gl.css")),
h.Script(h.Src("/_maplibre/maplibre-gl.js")),
)
}
// viaSignal is the interface satisfied by via's *signal type.
type viaSignal interface {
ID() string
String() string
SetValue(any)
Bind() h.H
}
// Map represents a MapLibre GL map instance bound to a Via context.
type Map struct {
id string
ctx *via.Context
opts Options
// Viewport signals for browser → server sync
centerLng, centerLat viaSignal
zoom, bearing, pitch viaSignal
// Pre-render accumulation
sources []sourceEntry
layers []Layer
markers []markerEntry
popups []popupEntry
rendered bool
}
// New creates a Map bound to the given Via context with the provided options.
// It registers viewport signals on the context for browser → server sync.
func New(c *via.Context, opts Options) *Map {
if opts.Width == "" {
opts.Width = "100%"
}
if opts.Height == "" {
opts.Height = "400px"
}
m := &Map{
id: genID(),
ctx: c,
opts: opts,
}
m.centerLng = c.Signal(opts.Center.Lng)
m.centerLat = c.Signal(opts.Center.Lat)
m.zoom = c.Signal(opts.Zoom)
m.bearing = c.Signal(opts.Bearing)
m.pitch = c.Signal(opts.Pitch)
return m
}
// Element returns the h.H DOM tree for the map. Call this once inside your View function.
// After Element() is called, subsequent source/layer/marker/popup operations
// use ExecScript instead of accumulating for the init script.
func (m *Map) Element() h.H {
m.rendered = true
return h.Div(h.ID("_vwrap_"+m.id),
// Map container — morph-ignored so MapLibre's DOM isn't destroyed on Sync()
h.Div(
h.ID("_vmap_"+m.id),
h.DataIgnoreMorph(),
h.Attr("style", fmt.Sprintf("width:%s;height:%s", m.opts.Width, m.opts.Height)),
),
// Hidden inputs for viewport signal binding (outside morph-ignored zone)
h.Input(h.Type("hidden"), m.centerLng.Bind()),
h.Input(h.Type("hidden"), m.centerLat.Bind()),
h.Input(h.Type("hidden"), m.zoom.Bind()),
h.Input(h.Type("hidden"), m.bearing.Bind()),
h.Input(h.Type("hidden"), m.pitch.Bind()),
// Init script
h.Script(h.Raw(initScript(m))),
)
}
// --- Viewport readers (signal → Go) ---
// Center returns the current map center from synced signals.
func (m *Map) Center() LngLat {
return LngLat{
Lng: parseFloat(m.centerLng.String()),
Lat: parseFloat(m.centerLat.String()),
}
}
// Zoom returns the current map zoom level from the synced signal.
func (m *Map) Zoom() float64 {
return parseFloat(m.zoom.String())
}
// Bearing returns the current map bearing from the synced signal.
func (m *Map) Bearing() float64 {
return parseFloat(m.bearing.String())
}
// Pitch returns the current map pitch from the synced signal.
func (m *Map) Pitch() float64 {
return parseFloat(m.pitch.String())
}
// --- Viewport setters (Go → browser) ---
// FlyTo animates the map to the given center and zoom.
func (m *Map) FlyTo(center LngLat, zoom float64) {
m.exec(fmt.Sprintf(`m.flyTo({center:[%s,%s],zoom:%s});`,
formatFloat(center.Lng), formatFloat(center.Lat), formatFloat(zoom)))
}
// SetCenter sets the map center without animation.
func (m *Map) SetCenter(ll LngLat) {
m.exec(fmt.Sprintf(`m.setCenter([%s,%s]);`,
formatFloat(ll.Lng), formatFloat(ll.Lat)))
}
// SetZoom sets the map zoom level without animation.
func (m *Map) SetZoom(z float64) {
m.exec(fmt.Sprintf(`m.setZoom(%s);`, formatFloat(z)))
}
// SetBearing sets the map bearing without animation.
func (m *Map) SetBearing(b float64) {
m.exec(fmt.Sprintf(`m.setBearing(%s);`, formatFloat(b)))
}
// SetPitch sets the map pitch without animation.
func (m *Map) SetPitch(p float64) {
m.exec(fmt.Sprintf(`m.setPitch(%s);`, formatFloat(p)))
}
// SetStyle changes the map's style URL.
func (m *Map) SetStyle(url string) {
m.exec(fmt.Sprintf(`m.setStyle(%s);`, jsonStr(url)))
}
// --- Source methods ---
// AddSource adds a source to the map. src should be a GeoJSONSource,
// VectorSource, RasterSource, or any JSON-marshalable value.
func (m *Map) AddSource(id string, src any) {
js := sourceJSON(src)
if !m.rendered {
m.sources = append(m.sources, sourceEntry{id: id, js: js})
return
}
m.exec(fmt.Sprintf(`m.addSource(%s,%s);`, jsonStr(id), js))
}
// RemoveSource removes a source from the map.
// Before render, it removes a previously accumulated source. After render, it issues an ExecScript.
func (m *Map) RemoveSource(id string) {
if !m.rendered {
for i, s := range m.sources {
if s.id == id {
m.sources = append(m.sources[:i], m.sources[i+1:]...)
return
}
}
return
}
m.exec(fmt.Sprintf(`m.removeSource(%s);`, jsonStr(id)))
}
// UpdateGeoJSONSource replaces the data of an existing GeoJSON source.
func (m *Map) UpdateGeoJSONSource(sourceID string, data any) {
m.exec(fmt.Sprintf(`m.getSource(%s).setData(%s);`, jsonStr(sourceID), jsonVal(data)))
}
// --- Layer methods ---
// AddLayer adds a layer to the map.
func (m *Map) AddLayer(layer Layer) {
if !m.rendered {
m.layers = append(m.layers, layer)
return
}
before := "undefined"
if layer.Before != "" {
before = jsonStr(layer.Before)
}
m.exec(fmt.Sprintf(`m.addLayer(%s,%s);`, layer.toJS(), before))
}
// RemoveLayer removes a layer from the map.
// Before render, it removes a previously accumulated layer. After render, it issues an ExecScript.
func (m *Map) RemoveLayer(id string) {
if !m.rendered {
for i, l := range m.layers {
if l.ID == id {
m.layers = append(m.layers[:i], m.layers[i+1:]...)
return
}
}
return
}
m.exec(fmt.Sprintf(`m.removeLayer(%s);`, jsonStr(id)))
}
// SetPaintProperty sets a paint property on a layer.
func (m *Map) SetPaintProperty(layerID, name string, value any) {
m.exec(fmt.Sprintf(`m.setPaintProperty(%s,%s,%s);`,
jsonStr(layerID), jsonStr(name), jsonVal(value)))
}
// SetLayoutProperty sets a layout property on a layer.
func (m *Map) SetLayoutProperty(layerID, name string, value any) {
m.exec(fmt.Sprintf(`m.setLayoutProperty(%s,%s,%s);`,
jsonStr(layerID), jsonStr(name), jsonVal(value)))
}
// --- Marker methods ---
// AddMarker adds or replaces a marker on the map.
func (m *Map) AddMarker(id string, marker Marker) {
if !m.rendered {
m.markers = append(m.markers, markerEntry{id: id, marker: marker})
return
}
js := addMarkerJS(m.id, id, marker)
m.ctx.ExecScript(js)
}
// RemoveMarker removes a marker from the map.
// Before render, it removes a previously accumulated marker. After render, it issues an ExecScript.
func (m *Map) RemoveMarker(id string) {
if !m.rendered {
for i, me := range m.markers {
if me.id == id {
m.markers = append(m.markers[:i], m.markers[i+1:]...)
return
}
}
return
}
m.exec(removeMarkerJS(id))
}
// --- Popup methods ---
// ShowPopup shows a standalone popup on the map.
func (m *Map) ShowPopup(id string, popup Popup) {
if !m.rendered {
m.popups = append(m.popups, popupEntry{id: id, popup: popup})
return
}
js := showPopupJS(m.id, id, popup)
m.ctx.ExecScript(js)
}
// ClosePopup closes a standalone popup on the map.
// Before render, it removes a previously accumulated popup. After render, it issues an ExecScript.
func (m *Map) ClosePopup(id string) {
if !m.rendered {
for i, pe := range m.popups {
if pe.id == id {
m.popups = append(m.popups[:i], m.popups[i+1:]...)
return
}
}
return
}
m.exec(closePopupJS(id))
}
// --- Escape hatch ---
// Exec runs arbitrary JS with the map available as `m`.
func (m *Map) Exec(js string) {
m.exec(js)
}
// exec sends guarded JS to the browser via ExecScript.
func (m *Map) exec(body string) {
m.ctx.ExecScript(guard(m.id, body))
}
func parseFloat(s string) float64 {
f, _ := strconv.ParseFloat(s, 64)
return f
}
func genID() string {
b := make([]byte, 4)
rand.Read(b)
return hex.EncodeToString(b)
}

175
maplibre/types.go Normal file
View File

@@ -0,0 +1,175 @@
package maplibre
import "encoding/json"
// LngLat represents a geographic coordinate.
type LngLat struct {
Lng float64
Lat float64
}
// Options configures the initial map state.
type Options struct {
// Style is the map style URL (required).
Style string
Center LngLat
Zoom float64
Bearing float64
Pitch float64
MinZoom float64
MaxZoom float64
// CSS dimensions for the map container. Defaults: "100%", "400px".
Width string
Height string
}
// GeoJSONSource provides inline GeoJSON data to MapLibre.
// Data should be a GeoJSON-marshalable value (struct, map, or json.RawMessage).
type GeoJSONSource struct {
Data any
}
func (s GeoJSONSource) toJS() string {
data, _ := json.Marshal(s.Data)
return `{"type":"geojson","data":` + string(data) + `}`
}
// VectorSource references a vector tile source.
type VectorSource struct {
URL string
Tiles []string
}
func (s VectorSource) toJS() string {
obj := map[string]any{"type": "vector"}
if s.URL != "" {
obj["url"] = s.URL
}
if len(s.Tiles) > 0 {
obj["tiles"] = s.Tiles
}
b, _ := json.Marshal(obj)
return string(b)
}
// RasterSource references a raster tile source.
type RasterSource struct {
URL string
Tiles []string
TileSize int
}
func (s RasterSource) toJS() string {
obj := map[string]any{"type": "raster"}
if s.URL != "" {
obj["url"] = s.URL
}
if len(s.Tiles) > 0 {
obj["tiles"] = s.Tiles
}
if s.TileSize > 0 {
obj["tileSize"] = s.TileSize
}
b, _ := json.Marshal(obj)
return string(b)
}
// sourceJSON converts a source value to its JS object literal string.
func sourceJSON(src any) string {
switch s := src.(type) {
case GeoJSONSource:
return s.toJS()
case VectorSource:
return s.toJS()
case RasterSource:
return s.toJS()
default:
b, _ := json.Marshal(src)
return string(b)
}
}
// Layer describes a MapLibre style layer.
type Layer struct {
ID string
Type string
Source string
SourceLayer string
Paint map[string]any
Layout map[string]any
Filter any
MinZoom float64
MaxZoom float64
// Before inserts this layer before the given layer ID in the stack.
Before string
}
func (l Layer) toJS() string {
obj := map[string]any{
"id": l.ID,
"type": l.Type,
}
if l.Source != "" {
obj["source"] = l.Source
}
if l.SourceLayer != "" {
obj["source-layer"] = l.SourceLayer
}
if l.Paint != nil {
obj["paint"] = l.Paint
}
if l.Layout != nil {
obj["layout"] = l.Layout
}
if l.Filter != nil {
obj["filter"] = l.Filter
}
if l.MinZoom > 0 {
obj["minzoom"] = l.MinZoom
}
if l.MaxZoom > 0 {
obj["maxzoom"] = l.MaxZoom
}
b, _ := json.Marshal(obj)
return string(b)
}
// Marker describes a map marker.
type Marker struct {
LngLat LngLat
Color string
Draggable bool
Popup *Popup
}
// Popup describes a map popup.
//
// Content is rendered as HTML via MapLibre's setHTML. Do not pass untrusted
// user input without sanitizing it first.
type Popup struct {
Content string // HTML content
LngLat LngLat
HideCloseButton bool // true removes the close button (MapLibre shows it by default)
MaxWidth string
}
// sourceEntry pairs a source ID with its JS representation for pre-render accumulation.
type sourceEntry struct {
id string
js string
}
// markerEntry pairs a marker ID with its definition for pre-render accumulation.
type markerEntry struct {
id string
marker Marker
}
// popupEntry pairs a popup ID with its definition for pre-render accumulation.
type popupEntry struct {
id string
popup Popup
}

10
nats.go
View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/delaneyj/toolbelt/embeddednats"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
@@ -50,7 +51,14 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
ctx, cancel := context.WithCancel(context.Background())
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory(dataDir),
embeddednats.WithNATSServerOptions(&natsserver.Options{
JetStream: true,
StoreDir: dataDir,
Port: -1,
}),
)
if err != nil {
cancel()
os.RemoveAll(dataDir)

37
via.go
View File

@@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) {
if cfg.ContextTTL != 0 {
v.cfg.ContextTTL = cfg.ContextTTL
}
if cfg.Streams != nil {
v.cfg.Streams = cfg.Streams
}
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
v.actionRateLimit = cfg.ActionRateLimit
}
@@ -331,15 +334,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
if c.sseConnected.Load() {
continue
}
var disconnectedFor time.Duration
if dc := c.sseDisconnectedAt.Load(); dc != nil {
disconnectedFor = now.Sub(*dc)
} else {
disconnectedFor = now.Sub(c.createdAt)
// Use the most recent liveness signal
lastAlive := c.createdAt
if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
lastAlive = *dc
}
if disconnectedFor > ttl {
if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
lastAlive = *seen
}
silentFor := now.Sub(lastAlive)
if silentFor > ttl {
toReap = append(toReap, c)
} else if disconnectedFor > suspendAfter && !c.suspended.Load() {
} else if silentFor > suspendAfter && !c.suspended.Load() {
toSuspend = append(toSuspend, c)
}
}
@@ -368,6 +374,12 @@ func (v *V) Start() {
}
}
for _, sc := range v.cfg.Streams {
if err := EnsureStream(v, sc); err != nil {
v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name)
}
}
handler := http.Handler(v.mux)
if v.sessionManager != nil {
handler = v.sessionManager.LoadAndSave(v.mux)
@@ -655,6 +667,8 @@ func New() *V {
return
}
c.reqCtx = r.Context()
now := time.Now()
c.lastSeenAt.Store(&now)
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
@@ -688,17 +702,22 @@ func New() *V {
go c.Sync()
keepalive := time.NewTicker(30 * time.Second)
defer keepalive.Stop()
for {
select {
case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended")
c.sseConnected.Store(false)
now := time.Now()
c.sseDisconnectedAt.Store(&now)
dcNow := time.Now()
c.sseDisconnectedAt.Store(&dcNow)
return
case <-c.ctxDisposedChan:
v.logDebug(c, "context disposed, closing SSE")
return
case <-keepalive.C:
sse.PatchSignals([]byte("{}"))
case patch := <-c.patchChan:
switch patch.typ {
case patchTypeElements:

View File

@@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
func TestReaperSuspendsContext(t *testing.T) {
v := New()
c := newContext("suspend-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
v.registerCtx(c)
@@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
func TestReaperReapsAfterTTL(t *testing.T) {
v := New()
c := newContext("reap-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true)
@@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
v := New()
c := newContext("already-sus-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
c.suspended.Store(true)
@@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
assert.Error(t, err, "context should be removed after cleanup")
}
func TestReaperRespectsLastSeenAt(t *testing.T) {
v := New()
c := newContext("seen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
seen := time.Now().Add(-2 * time.Minute)
c.lastSeenAt.Store(&seen)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("seen-1")
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
assert.False(t, c.suspended.Load(), "context should not be suspended")
}
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
v := New()
c := newContext("noseen-1", "/", v)
c.createdAt = time.Now().Add(-30 * time.Minute)
dc := time.Now().Add(-20 * time.Minute)
c.sseDisconnectedAt.Store(&dc)
// no lastSeenAt set — should fall back to sseDisconnectedAt
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
got, err := v.getCtx("noseen-1")
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
}
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
v := New()
c := newContext("stale-seen-1", "/", v)
c.createdAt = time.Now().Add(-3 * time.Hour)
dc := time.Now().Add(-2 * time.Hour)
c.sseDisconnectedAt.Store(&dc)
// lastSeenAt is also old — beyond TTL
seen := time.Now().Add(-90 * time.Minute)
c.lastSeenAt.Store(&seen)
c.suspended.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(15*time.Minute, time.Hour)
_, err := v.getCtx("stale-seen-1")
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
}
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
v := New()
c := newContext("seen-sse-1", "/", v)
v.registerCtx(c)
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
// Simulate what the SSE handler does after getCtx
now := time.Now()
c.lastSeenAt.Store(&now)
got := c.lastSeenAt.Load()
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
assert.WithinDuration(t, now, *got, time.Second)
}
func TestDevModeRemovePersistedFix(t *testing.T) {
v := New()
v.cfg.DevMode = true