Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60009124c9 | ||
|
|
42b21348cb | ||
|
|
58ad9a2699 | ||
|
|
f3a9c8036f | ||
|
|
6763e1a420 |
55
computed.go
Normal file
55
computed.go
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
package via
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ryanhamamura/via/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
// computedSignal is a read-only signal whose value is derived from other signals.
|
||||||
|
// It recomputes on every read and is included in patches only when the value changes.
|
||||||
|
type computedSignal struct {
|
||||||
|
id string
|
||||||
|
compute func() string
|
||||||
|
lastVal string
|
||||||
|
changed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) ID() string {
|
||||||
|
return s.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) String() string {
|
||||||
|
return s.compute()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) Int() int {
|
||||||
|
if n, err := strconv.Atoi(s.String()); err == nil {
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) Bool() bool {
|
||||||
|
val := strings.ToLower(s.String())
|
||||||
|
return val == "true" || val == "1" || val == "yes" || val == "on"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) Text() h.H {
|
||||||
|
return h.Span(h.Data("text", "$"+s.id))
|
||||||
|
}
|
||||||
|
|
||||||
|
// recompute calls the compute function and sets changed if the value differs from lastVal.
|
||||||
|
func (s *computedSignal) recompute() {
|
||||||
|
val := s.compute()
|
||||||
|
if val != s.lastVal {
|
||||||
|
s.lastVal = val
|
||||||
|
s.changed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *computedSignal) patchValue() string {
|
||||||
|
return fmt.Sprintf("%v", s.lastVal)
|
||||||
|
}
|
||||||
190
computed_test.go
Normal file
190
computed_test.go
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
package via
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ryanhamamura/via/h"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestComputedBasic(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
sig1 := c.Signal("hello")
|
||||||
|
sig2 := c.Signal("world")
|
||||||
|
cs = c.Computed(func() string {
|
||||||
|
return sig1.String() + " " + sig2.String()
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
assert.Equal(t, "hello world", cs.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedReactivity(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
var sig1 *signal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
sig1 = c.Signal("a")
|
||||||
|
sig2 := c.Signal("b")
|
||||||
|
cs = c.Computed(func() string {
|
||||||
|
return sig1.String() + sig2.String()
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
assert.Equal(t, "ab", cs.String())
|
||||||
|
sig1.SetValue("x")
|
||||||
|
assert.Equal(t, "xb", cs.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedInt(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
sig := c.Signal(21)
|
||||||
|
cs = c.Computed(func() string {
|
||||||
|
return fmt.Sprintf("%d", sig.Int()*2)
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
assert.Equal(t, 42, cs.Int())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedBool(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
sig := c.Signal("true")
|
||||||
|
cs = c.Computed(func() string {
|
||||||
|
return sig.String()
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
assert.True(t, cs.Bool())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedText(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
cs = c.Computed(func() string { return "hi" })
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := cs.Text().Render(&buf)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Contains(t, buf.String(), `data-text="$`+cs.ID()+`"`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedChangeDetection(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var ctx *Context
|
||||||
|
var sig *signal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
ctx = c
|
||||||
|
sig = c.Signal("a")
|
||||||
|
c.Computed(func() string {
|
||||||
|
return sig.String() + "!"
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
|
||||||
|
// First patch includes computed (changed=true from init)
|
||||||
|
patch1 := ctx.prepareSignalsForPatch()
|
||||||
|
assert.NotEmpty(t, patch1)
|
||||||
|
|
||||||
|
// Second patch: nothing changed, computed should not be included
|
||||||
|
patch2 := ctx.prepareSignalsForPatch()
|
||||||
|
// Regular signal still has changed=true (not reset in prepareSignalsForPatch),
|
||||||
|
// but computed should not appear since its value didn't change.
|
||||||
|
hasComputed := false
|
||||||
|
ctx.signals.Range(func(_, value any) bool {
|
||||||
|
if cs, ok := value.(*computedSignal); ok {
|
||||||
|
_, inPatch := patch2[cs.ID()]
|
||||||
|
hasComputed = inPatch
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.False(t, hasComputed)
|
||||||
|
|
||||||
|
// After changing dependency, computed should reappear
|
||||||
|
sig.SetValue("b")
|
||||||
|
patch3 := ctx.prepareSignalsForPatch()
|
||||||
|
found := false
|
||||||
|
ctx.signals.Range(func(_, value any) bool {
|
||||||
|
if cs, ok := value.(*computedSignal); ok {
|
||||||
|
if v, ok := patch3[cs.ID()]; ok {
|
||||||
|
assert.Equal(t, "b!", v)
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.True(t, found)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedInComponent(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var cs *computedSignal
|
||||||
|
var parentCtx *Context
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
parentCtx = c
|
||||||
|
c.Component(func(comp *Context) {
|
||||||
|
sig := comp.Signal("via")
|
||||||
|
cs = comp.Computed(func() string {
|
||||||
|
return "hello " + sig.String()
|
||||||
|
})
|
||||||
|
comp.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, "hello via", cs.String())
|
||||||
|
// Verify it's stored on the parent page context
|
||||||
|
found := false
|
||||||
|
parentCtx.signals.Range(func(_, value any) bool {
|
||||||
|
if stored, ok := value.(*computedSignal); ok && stored.ID() == cs.ID() {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.True(t, found)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedIsReadOnly(t *testing.T) {
|
||||||
|
// Compile-time guarantee: *computedSignal has no Bind() or SetValue() methods.
|
||||||
|
// This test exists as documentation — if someone adds those methods, the
|
||||||
|
// interface assertion below will need updating and serve as a reminder.
|
||||||
|
var cs interface{} = &computedSignal{}
|
||||||
|
type writable interface {
|
||||||
|
SetValue(any)
|
||||||
|
}
|
||||||
|
type bindable interface {
|
||||||
|
Bind() h.H
|
||||||
|
}
|
||||||
|
_, isWritable := cs.(writable)
|
||||||
|
_, isBindable := cs.(bindable)
|
||||||
|
assert.False(t, isWritable, "computedSignal must not have SetValue")
|
||||||
|
assert.False(t, isBindable, "computedSignal must not have Bind")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestComputedInjectSignalsSkips(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
var ctx *Context
|
||||||
|
var cs *computedSignal
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
ctx = c
|
||||||
|
cs = c.Computed(func() string { return "fixed" })
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
})
|
||||||
|
|
||||||
|
// Simulate browser sending back a value for the computed signal — should be ignored
|
||||||
|
ctx.injectSignals(map[string]any{
|
||||||
|
cs.ID(): "injected",
|
||||||
|
})
|
||||||
|
assert.Equal(t, "fixed", cs.String())
|
||||||
|
}
|
||||||
@@ -53,10 +53,15 @@ type Options struct {
|
|||||||
// Defaults to "/_datastar.js" if empty.
|
// Defaults to "/_datastar.js" if empty.
|
||||||
DatastarPath string
|
DatastarPath string
|
||||||
|
|
||||||
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
|
// PubSub enables publish/subscribe messaging. When nil, an embedded NATS
|
||||||
// embedded NATS backend, or supply any PubSub implementation.
|
// server starts automatically in Start(). Supply any PubSub implementation
|
||||||
|
// to replace it.
|
||||||
PubSub PubSub
|
PubSub PubSub
|
||||||
|
|
||||||
|
// Streams declares JetStream streams to create when Start() initializes
|
||||||
|
// the embedded NATS server. Ignored when a custom PubSub is configured.
|
||||||
|
Streams []StreamConfig
|
||||||
|
|
||||||
// ContextSuspendAfter is the time a context may be disconnected before
|
// ContextSuspendAfter is the time a context may be disconnected before
|
||||||
// the reaper suspends it (frees page resources but keeps the context
|
// the reaper suspends it (frees page resources but keeps the context
|
||||||
// shell for seamless re-init on reconnect). Default: 15m.
|
// shell for seamless re-init on reconnect). Default: 15m.
|
||||||
|
|||||||
44
context.go
44
context.go
@@ -42,6 +42,7 @@ type Context struct {
|
|||||||
createdAt time.Time
|
createdAt time.Time
|
||||||
sseConnected atomic.Bool
|
sseConnected atomic.Bool
|
||||||
sseDisconnectedAt atomic.Pointer[time.Time]
|
sseDisconnectedAt atomic.Pointer[time.Time]
|
||||||
|
lastSeenAt atomic.Pointer[time.Time]
|
||||||
suspended atomic.Bool
|
suspended atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,6 +208,40 @@ func (c *Context) Signal(v any) *signal {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Computed creates a read-only signal whose value is derived from the given function.
|
||||||
|
// The function is called on every read (String/Int/Bool) for fresh values,
|
||||||
|
// and during sync to detect changes for browser patches.
|
||||||
|
//
|
||||||
|
// Computed signals cannot be bound to inputs or set manually.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// full := c.Computed(func() string {
|
||||||
|
// return first.String() + " " + last.String()
|
||||||
|
// })
|
||||||
|
// c.View(func() h.H {
|
||||||
|
// return h.Span(full.Text())
|
||||||
|
// })
|
||||||
|
func (c *Context) Computed(fn func() string) *computedSignal {
|
||||||
|
sigID := genRandID()
|
||||||
|
initial := fn()
|
||||||
|
cs := &computedSignal{
|
||||||
|
id: sigID,
|
||||||
|
compute: fn,
|
||||||
|
lastVal: initial,
|
||||||
|
changed: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.isComponent() {
|
||||||
|
c.parentPageCtx.signals.Store(sigID, cs)
|
||||||
|
} else {
|
||||||
|
c.signals.Store(sigID, cs)
|
||||||
|
}
|
||||||
|
return cs
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Context) injectSignals(sigs map[string]any) {
|
func (c *Context) injectSignals(sigs map[string]any) {
|
||||||
if sigs == nil {
|
if sigs == nil {
|
||||||
c.app.logErr(c, "signal injection failed: nil signals")
|
c.app.logErr(c, "signal injection failed: nil signals")
|
||||||
@@ -248,7 +283,8 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
|
|||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
updatedSigs := make(map[string]any)
|
updatedSigs := make(map[string]any)
|
||||||
c.signals.Range(func(sigID, value any) bool {
|
c.signals.Range(func(sigID, value any) bool {
|
||||||
if sig, ok := value.(*signal); ok {
|
switch sig := value.(type) {
|
||||||
|
case *signal:
|
||||||
if sig.err != nil {
|
if sig.err != nil {
|
||||||
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
|
c.app.logWarn(c, "signal '%s' is out of sync: %v", sig.id, sig.err)
|
||||||
return true
|
return true
|
||||||
@@ -256,6 +292,12 @@ func (c *Context) prepareSignalsForPatch() map[string]any {
|
|||||||
if sig.changed {
|
if sig.changed {
|
||||||
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val)
|
updatedSigs[sigID.(string)] = fmt.Sprintf("%v", sig.val)
|
||||||
}
|
}
|
||||||
|
case *computedSignal:
|
||||||
|
sig.recompute()
|
||||||
|
if sig.changed {
|
||||||
|
updatedSigs[sigID.(string)] = sig.patchValue()
|
||||||
|
sig.changed = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ Infrastructure for multi-user real-time communication and persistent state.
|
|||||||
|
|
||||||
## PubSub
|
## PubSub
|
||||||
|
|
||||||
Via includes an embedded NATS server that starts automatically with `via.New()`. No external services required — pub/sub works out of the box.
|
Via includes an embedded NATS server that starts automatically with `v.Start()`. No external services required — pub/sub works out of the box.
|
||||||
|
|
||||||
### Interface
|
### Interface
|
||||||
|
|
||||||
@@ -73,14 +73,18 @@ This disables the embedded NATS server. The `NATSConn()` and `JetStream()` acces
|
|||||||
|
|
||||||
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
|
NATS JetStream provides persistent, replayable message streams. Useful for chat history, event logs, or any scenario where new subscribers need to catch up on past messages.
|
||||||
|
|
||||||
### Ensure a stream exists
|
### Declaring streams
|
||||||
|
|
||||||
|
The recommended approach is to declare streams in `Options.Streams`. They are created automatically when `v.Start()` initializes the embedded NATS server:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
err := via.EnsureStream(v, via.StreamConfig{
|
v.Config(via.Options{
|
||||||
Name: "CHAT",
|
Streams: []via.StreamConfig{{
|
||||||
Subjects: []string{"chat.>"},
|
Name: "CHAT",
|
||||||
MaxMsgs: 1000,
|
Subjects: []string{"chat.>"},
|
||||||
MaxAge: 24 * time.Hour,
|
MaxMsgs: 1000,
|
||||||
|
MaxAge: 24 * time.Hour,
|
||||||
|
}},
|
||||||
})
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -91,7 +95,16 @@ err := via.EnsureStream(v, via.StreamConfig{
|
|||||||
| `MaxMsgs` | Maximum number of messages to retain |
|
| `MaxMsgs` | Maximum number of messages to retain |
|
||||||
| `MaxAge` | Maximum age before messages are discarded |
|
| `MaxAge` | Maximum age before messages are discarded |
|
||||||
|
|
||||||
Call `EnsureStream` during app initialization, before `v.Start()`.
|
For dynamic stream creation after startup, `EnsureStream` is also available:
|
||||||
|
|
||||||
|
```go
|
||||||
|
err := via.EnsureStream(v, via.StreamConfig{
|
||||||
|
Name: "EVENTS",
|
||||||
|
Subjects: []string{"events.>"},
|
||||||
|
MaxMsgs: 500,
|
||||||
|
MaxAge: 12 * time.Hour,
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
### Replay history
|
### Replay history
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
|
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
|
||||||
|
|
||||||
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required.
|
Via includes an embedded NATS server that starts automatically — no external server required.
|
||||||
|
|
||||||
## Key Differences from Original Chatroom
|
## Key Differences from Original Chatroom
|
||||||
|
|
||||||
@@ -25,21 +25,6 @@ That's it. No separate NATS server needed.
|
|||||||
|
|
||||||
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
|
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
|
||||||
|
|
||||||
## How Embedded NATS Works
|
|
||||||
|
|
||||||
```go
|
|
||||||
// Start embedded NATS server (JetStream enabled by default)
|
|
||||||
ns, err := embeddednats.New(ctx,
|
|
||||||
embeddednats.WithDirectory("./data/nats"),
|
|
||||||
)
|
|
||||||
ns.WaitForServer()
|
|
||||||
|
|
||||||
// Get client connection to embedded server
|
|
||||||
nc, err := ns.Client()
|
|
||||||
```
|
|
||||||
|
|
||||||
Data is persisted to `./data/nats/` for JetStream durability.
|
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
```
|
```
|
||||||
@@ -65,14 +50,16 @@ Data is persisted to `./data/nats/` for JetStream durability.
|
|||||||
|
|
||||||
## JetStream Durability
|
## JetStream Durability
|
||||||
|
|
||||||
Messages persist to disk via JetStream:
|
Messages persist to disk via JetStream. Streams are declared in `Options.Streams` and created automatically when `v.Start()` initializes the embedded NATS server:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
js.AddStream(&nats.StreamConfig{
|
v.Config(via.Options{
|
||||||
Name: "CHAT",
|
Streams: []via.StreamConfig{{
|
||||||
Subjects: []string{"chat.>"},
|
Name: "CHAT",
|
||||||
MaxMsgs: 1000, // Keep last 1000 messages
|
Subjects: []string{"chat.>"},
|
||||||
MaxAge: 24 * time.Hour,
|
MaxMsgs: 1000,
|
||||||
|
MaxAge: 24 * time.Hour,
|
||||||
|
}},
|
||||||
})
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -87,23 +74,6 @@ Stop and restart the app - chat history survives.
|
|||||||
- Manual join/leave channels
|
- Manual join/leave channels
|
||||||
|
|
||||||
**This example - ~60 lines of NATS integration:**
|
**This example - ~60 lines of NATS integration:**
|
||||||
- `embeddednats.New()` starts the server
|
- `via.Subscribe(c, subject, handler)` for receiving
|
||||||
- `nc.Subscribe(subject, handler)` for receiving
|
- `via.Publish(c, subject, data)` for sending
|
||||||
- `nc.Publish(subject, data)` for sending
|
- Streams declared in `Options` — NATS handles delivery, no polling
|
||||||
- NATS handles delivery, no polling
|
|
||||||
|
|
||||||
## Next Steps
|
|
||||||
|
|
||||||
If this pattern proves useful, it could be promoted to a Via plugin:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// Hypothetical future API
|
|
||||||
v.Config(via.WithEmbeddedNATS("./data/nats"))
|
|
||||||
|
|
||||||
// In page init
|
|
||||||
c.Subscribe("events.user.*", func(data []byte) {
|
|
||||||
c.Sync()
|
|
||||||
})
|
|
||||||
|
|
||||||
c.Publish("events.user.login", userData)
|
|
||||||
```
|
|
||||||
|
|||||||
@@ -21,18 +21,14 @@ func main() {
|
|||||||
DocumentTitle: "NATS Chat",
|
DocumentTitle: "NATS Chat",
|
||||||
LogLevel: via.LogLevelInfo,
|
LogLevel: via.LogLevelInfo,
|
||||||
ServerAddress: ":7331",
|
ServerAddress: ":7331",
|
||||||
|
Streams: []via.StreamConfig{{
|
||||||
|
Name: "CHAT",
|
||||||
|
Subjects: []string{"chat.>"},
|
||||||
|
MaxMsgs: 1000,
|
||||||
|
MaxAge: 24 * time.Hour,
|
||||||
|
}},
|
||||||
})
|
})
|
||||||
|
|
||||||
err := via.EnsureStream(v, via.StreamConfig{
|
|
||||||
Name: "CHAT",
|
|
||||||
Subjects: []string{"chat.>"},
|
|
||||||
MaxMsgs: 1000,
|
|
||||||
MaxAge: 24 * time.Hour,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to ensure stream: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.AppendToHead(
|
v.AppendToHead(
|
||||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
||||||
h.StyleEl(h.Raw(chatCSS)),
|
h.StyleEl(h.Raw(chatCSS)),
|
||||||
@@ -76,6 +72,6 @@ func main() {
|
|||||||
protected := v.Group("", requireProfile)
|
protected := v.Group("", requireProfile)
|
||||||
protected.Page("/", ChatPage)
|
protected.Page("/", ChatPage)
|
||||||
|
|
||||||
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)")
|
log.Println("Starting NATS chatroom on :7331")
|
||||||
v.Start()
|
v.Start()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,12 @@ func ProfilePage(c *via.Context) {
|
|||||||
via.MaxLen(20, "Must be at most 20 characters"),
|
via.MaxLen(20, "Must be at most 20 characters"),
|
||||||
)
|
)
|
||||||
selectedEmoji := c.Signal(existingEmoji)
|
selectedEmoji := c.Signal(existingEmoji)
|
||||||
|
previewName := c.Computed(func() string {
|
||||||
|
if name := nameField.String(); name != "" {
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
return "Your Name"
|
||||||
|
})
|
||||||
|
|
||||||
saveToSession := func() bool {
|
saveToSession := func() bool {
|
||||||
if !c.ValidateAll() {
|
if !c.ValidateAll() {
|
||||||
@@ -68,18 +74,13 @@ func ProfilePage(c *via.Context) {
|
|||||||
h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()),
|
h.Button(h.Text("Start Chatting"), saveAndChat.OnClick()),
|
||||||
)
|
)
|
||||||
|
|
||||||
previewName := nameField.String()
|
|
||||||
if previewName == "" {
|
|
||||||
previewName = "Your Name"
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.Div(h.Class("profile-page"),
|
return h.Div(h.Class("profile-page"),
|
||||||
h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")),
|
h.H2(h.Text("Your Profile"), h.DataViewTransition("page-title")),
|
||||||
|
|
||||||
// Live preview
|
// Live preview
|
||||||
h.Div(h.Class("profile-preview"),
|
h.Div(h.Class("profile-preview"),
|
||||||
h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())),
|
h.Div(h.Class("avatar avatar-lg"), h.Text(selectedEmoji.String())),
|
||||||
h.Span(h.Class("preview-name"), h.Text(previewName)),
|
h.Span(h.Class("preview-name"), previewName.Text()),
|
||||||
),
|
),
|
||||||
|
|
||||||
h.Div(h.Class("profile-form"),
|
h.Div(h.Class("profile-form"),
|
||||||
|
|||||||
@@ -53,18 +53,14 @@ func main() {
|
|||||||
DocumentTitle: "Bookmarks",
|
DocumentTitle: "Bookmarks",
|
||||||
LogLevel: via.LogLevelInfo,
|
LogLevel: via.LogLevelInfo,
|
||||||
ServerAddress: ":7331",
|
ServerAddress: ":7331",
|
||||||
|
Streams: []via.StreamConfig{{
|
||||||
|
Name: "BOOKMARKS",
|
||||||
|
Subjects: []string{"bookmarks.>"},
|
||||||
|
MaxMsgs: 1000,
|
||||||
|
MaxAge: 24 * time.Hour,
|
||||||
|
}},
|
||||||
})
|
})
|
||||||
|
|
||||||
err := via.EnsureStream(v, via.StreamConfig{
|
|
||||||
Name: "BOOKMARKS",
|
|
||||||
Subjects: []string{"bookmarks.>"},
|
|
||||||
MaxMsgs: 1000,
|
|
||||||
MaxAge: 24 * time.Hour,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to ensure stream: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.AppendToHead(
|
v.AppendToHead(
|
||||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
||||||
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
||||||
@@ -76,6 +72,12 @@ func main() {
|
|||||||
titleSignal := c.Signal("")
|
titleSignal := c.Signal("")
|
||||||
urlSignal := c.Signal("")
|
urlSignal := c.Signal("")
|
||||||
targetIDSignal := c.Signal("")
|
targetIDSignal := c.Signal("")
|
||||||
|
saveLabel := c.Computed(func() string {
|
||||||
|
if targetIDSignal.String() != "" {
|
||||||
|
return "Update Bookmark"
|
||||||
|
}
|
||||||
|
return "Add Bookmark"
|
||||||
|
})
|
||||||
|
|
||||||
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
|
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
|
||||||
if evt.UserID == userID {
|
if evt.UserID == userID {
|
||||||
@@ -205,11 +207,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
bookmarksMu.RUnlock()
|
bookmarksMu.RUnlock()
|
||||||
|
|
||||||
saveLabel := "Add Bookmark"
|
|
||||||
if isEditing {
|
|
||||||
saveLabel = "Update Bookmark"
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.Div(h.Class("min-h-screen bg-base-200"),
|
return h.Div(h.Class("min-h-screen bg-base-200"),
|
||||||
// Navbar
|
// Navbar
|
||||||
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
|
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
|
||||||
@@ -225,7 +222,7 @@ func main() {
|
|||||||
// Form card
|
// Form card
|
||||||
h.Div(h.Class("card bg-base-100 shadow"),
|
h.Div(h.Class("card bg-base-100 shadow"),
|
||||||
h.Div(h.Class("card-body"),
|
h.Div(h.Class("card-body"),
|
||||||
h.H2(h.Class("card-title"), h.Text(saveLabel)),
|
h.H2(h.Class("card-title"), saveLabel.Text()),
|
||||||
h.Div(h.Class("flex flex-col gap-2"),
|
h.Div(h.Class("flex flex-col gap-2"),
|
||||||
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
|
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
|
||||||
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
|
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
|
||||||
@@ -233,7 +230,7 @@ func main() {
|
|||||||
h.If(isEditing,
|
h.If(isEditing,
|
||||||
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
|
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
|
||||||
),
|
),
|
||||||
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()),
|
h.Button(h.Class("btn btn-primary"), saveLabel.Text(), save.OnClick()),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
|||||||
10
nats.go
10
nats.go
@@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/delaneyj/toolbelt/embeddednats"
|
"github.com/delaneyj/toolbelt/embeddednats"
|
||||||
|
natsserver "github.com/nats-io/nats-server/v2/server"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -50,7 +51,14 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
|
|||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
ns, err := embeddednats.New(ctx,
|
||||||
|
embeddednats.WithDirectory(dataDir),
|
||||||
|
embeddednats.WithNATSServerOptions(&natsserver.Options{
|
||||||
|
JetStream: true,
|
||||||
|
StoreDir: dataDir,
|
||||||
|
Port: -1,
|
||||||
|
}),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
os.RemoveAll(dataDir)
|
os.RemoveAll(dataDir)
|
||||||
|
|||||||
37
via.go
37
via.go
@@ -145,6 +145,9 @@ func (v *V) Config(cfg Options) {
|
|||||||
if cfg.ContextTTL != 0 {
|
if cfg.ContextTTL != 0 {
|
||||||
v.cfg.ContextTTL = cfg.ContextTTL
|
v.cfg.ContextTTL = cfg.ContextTTL
|
||||||
}
|
}
|
||||||
|
if cfg.Streams != nil {
|
||||||
|
v.cfg.Streams = cfg.Streams
|
||||||
|
}
|
||||||
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
|
if cfg.ActionRateLimit.Rate != 0 || cfg.ActionRateLimit.Burst != 0 {
|
||||||
v.actionRateLimit = cfg.ActionRateLimit
|
v.actionRateLimit = cfg.ActionRateLimit
|
||||||
}
|
}
|
||||||
@@ -331,15 +334,18 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
|||||||
if c.sseConnected.Load() {
|
if c.sseConnected.Load() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var disconnectedFor time.Duration
|
// Use the most recent liveness signal
|
||||||
if dc := c.sseDisconnectedAt.Load(); dc != nil {
|
lastAlive := c.createdAt
|
||||||
disconnectedFor = now.Sub(*dc)
|
if dc := c.sseDisconnectedAt.Load(); dc != nil && dc.After(lastAlive) {
|
||||||
} else {
|
lastAlive = *dc
|
||||||
disconnectedFor = now.Sub(c.createdAt)
|
|
||||||
}
|
}
|
||||||
if disconnectedFor > ttl {
|
if seen := c.lastSeenAt.Load(); seen != nil && seen.After(lastAlive) {
|
||||||
|
lastAlive = *seen
|
||||||
|
}
|
||||||
|
silentFor := now.Sub(lastAlive)
|
||||||
|
if silentFor > ttl {
|
||||||
toReap = append(toReap, c)
|
toReap = append(toReap, c)
|
||||||
} else if disconnectedFor > suspendAfter && !c.suspended.Load() {
|
} else if silentFor > suspendAfter && !c.suspended.Load() {
|
||||||
toSuspend = append(toSuspend, c)
|
toSuspend = append(toSuspend, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -368,6 +374,12 @@ func (v *V) Start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, sc := range v.cfg.Streams {
|
||||||
|
if err := EnsureStream(v, sc); err != nil {
|
||||||
|
v.logger.Fatal().Err(err).Msgf("failed to create stream %q", sc.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
handler := http.Handler(v.mux)
|
handler := http.Handler(v.mux)
|
||||||
if v.sessionManager != nil {
|
if v.sessionManager != nil {
|
||||||
handler = v.sessionManager.LoadAndSave(v.mux)
|
handler = v.sessionManager.LoadAndSave(v.mux)
|
||||||
@@ -655,6 +667,8 @@ func New() *V {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.reqCtx = r.Context()
|
c.reqCtx = r.Context()
|
||||||
|
now := time.Now()
|
||||||
|
c.lastSeenAt.Store(&now)
|
||||||
|
|
||||||
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
|
sse := datastar.NewSSE(w, r, datastar.WithCompression(datastar.WithBrotli(datastar.WithBrotliLevel(5))))
|
||||||
|
|
||||||
@@ -688,17 +702,22 @@ func New() *V {
|
|||||||
|
|
||||||
go c.Sync()
|
go c.Sync()
|
||||||
|
|
||||||
|
keepalive := time.NewTicker(30 * time.Second)
|
||||||
|
defer keepalive.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sse.Context().Done():
|
case <-sse.Context().Done():
|
||||||
v.logDebug(c, "SSE connection ended")
|
v.logDebug(c, "SSE connection ended")
|
||||||
c.sseConnected.Store(false)
|
c.sseConnected.Store(false)
|
||||||
now := time.Now()
|
dcNow := time.Now()
|
||||||
c.sseDisconnectedAt.Store(&now)
|
c.sseDisconnectedAt.Store(&dcNow)
|
||||||
return
|
return
|
||||||
case <-c.ctxDisposedChan:
|
case <-c.ctxDisposedChan:
|
||||||
v.logDebug(c, "context disposed, closing SSE")
|
v.logDebug(c, "context disposed, closing SSE")
|
||||||
return
|
return
|
||||||
|
case <-keepalive.C:
|
||||||
|
sse.PatchSignals([]byte("{}"))
|
||||||
case patch := <-c.patchChan:
|
case patch := <-c.patchChan:
|
||||||
switch patch.typ {
|
switch patch.typ {
|
||||||
case patchTypeElements:
|
case patchTypeElements:
|
||||||
|
|||||||
71
via_test.go
71
via_test.go
@@ -418,6 +418,7 @@ func TestReaperCleansOrphanedContexts(t *testing.T) {
|
|||||||
func TestReaperSuspendsContext(t *testing.T) {
|
func TestReaperSuspendsContext(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
c := newContext("suspend-1", "/", v)
|
c := newContext("suspend-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||||
dc := time.Now().Add(-20 * time.Minute)
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
c.sseDisconnectedAt.Store(&dc)
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
v.registerCtx(c)
|
v.registerCtx(c)
|
||||||
@@ -432,6 +433,7 @@ func TestReaperSuspendsContext(t *testing.T) {
|
|||||||
func TestReaperReapsAfterTTL(t *testing.T) {
|
func TestReaperReapsAfterTTL(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
c := newContext("reap-1", "/", v)
|
c := newContext("reap-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||||
dc := time.Now().Add(-2 * time.Hour)
|
dc := time.Now().Add(-2 * time.Hour)
|
||||||
c.sseDisconnectedAt.Store(&dc)
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
c.suspended.Store(true)
|
c.suspended.Store(true)
|
||||||
@@ -446,6 +448,7 @@ func TestReaperReapsAfterTTL(t *testing.T) {
|
|||||||
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
|
func TestReaperIgnoresAlreadySuspended(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
c := newContext("already-sus-1", "/", v)
|
c := newContext("already-sus-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||||
dc := time.Now().Add(-20 * time.Minute)
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
c.sseDisconnectedAt.Store(&dc)
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
c.suspended.Store(true)
|
c.suspended.Store(true)
|
||||||
@@ -500,6 +503,74 @@ func TestCleanupCtxIdempotent(t *testing.T) {
|
|||||||
assert.Error(t, err, "context should be removed after cleanup")
|
assert.Error(t, err, "context should be removed after cleanup")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReaperRespectsLastSeenAt(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("seen-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||||
|
// Disconnected 20 min ago, but client retried (lastSeenAt) 2 min ago
|
||||||
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
seen := time.Now().Add(-2 * time.Minute)
|
||||||
|
c.lastSeenAt.Store(&seen)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
_, err := v.getCtx("seen-1")
|
||||||
|
assert.NoError(t, err, "context with recent lastSeenAt should survive suspend threshold")
|
||||||
|
assert.False(t, c.suspended.Load(), "context should not be suspended")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperFallsBackWithoutLastSeenAt(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("noseen-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-30 * time.Minute)
|
||||||
|
dc := time.Now().Add(-20 * time.Minute)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
// no lastSeenAt set — should fall back to sseDisconnectedAt
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
got, err := v.getCtx("noseen-1")
|
||||||
|
assert.NoError(t, err, "context should still be in registry (suspended, not reaped)")
|
||||||
|
assert.True(t, got.suspended.Load(), "context should be suspended using sseDisconnectedAt fallback")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperReapsWithStaleLastSeenAt(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("stale-seen-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-3 * time.Hour)
|
||||||
|
dc := time.Now().Add(-2 * time.Hour)
|
||||||
|
c.sseDisconnectedAt.Store(&dc)
|
||||||
|
// lastSeenAt is also old — beyond TTL
|
||||||
|
seen := time.Now().Add(-90 * time.Minute)
|
||||||
|
c.lastSeenAt.Store(&seen)
|
||||||
|
c.suspended.Store(true)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(15*time.Minute, time.Hour)
|
||||||
|
|
||||||
|
_, err := v.getCtx("stale-seen-1")
|
||||||
|
assert.Error(t, err, "context with stale lastSeenAt beyond TTL should be reaped")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLastSeenAtUpdatedOnSSEConnect(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("seen-sse-1", "/", v)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
assert.Nil(t, c.lastSeenAt.Load(), "lastSeenAt should be nil before SSE connect")
|
||||||
|
|
||||||
|
// Simulate what the SSE handler does after getCtx
|
||||||
|
now := time.Now()
|
||||||
|
c.lastSeenAt.Store(&now)
|
||||||
|
|
||||||
|
got := c.lastSeenAt.Load()
|
||||||
|
assert.NotNil(t, got, "lastSeenAt should be set after SSE connect")
|
||||||
|
assert.WithinDuration(t, now, *got, time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDevModeRemovePersistedFix(t *testing.T) {
|
func TestDevModeRemovePersistedFix(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
v.cfg.DevMode = true
|
v.cfg.DevMode = true
|
||||||
|
|||||||
Reference in New Issue
Block a user