Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
785f11e52d | ||
|
|
2f19874c17 | ||
|
|
27b8540b71 | ||
|
|
532651552a | ||
|
|
2310e45d35 | ||
|
|
10b4838f8d | ||
|
|
5362614c3e |
@@ -69,7 +69,7 @@ func main() {
|
||||
- **CSRF protection** — automatic token generation and validation on every action
|
||||
- **Rate limiting** — token-bucket algorithm, configurable globally and per-action
|
||||
- **Event handling** — `OnClick`, `OnChange`, `OnSubmit`, `OnInput`, `OnFocus`, `OnBlur`, `OnMouseEnter`, `OnMouseLeave`, `OnScroll`, `OnDblClick`, `OnKeyDown`, and `OnKeyDownMap` for multi-key bindings
|
||||
- **Timed routines** — `OnInterval` with start/stop/update controls, tied to context lifecycle
|
||||
- **Timed routines** — `OnInterval` auto-starts a ticker goroutine, returns a stop function, tied to context lifecycle
|
||||
- **Redirects** — `Redirect`, `ReplaceURL`, and format-string variants
|
||||
- **Plugin system** — `func(v *V)` hooks for integrating CSS/JS libraries
|
||||
- **Structured logging** — zerolog with configurable levels; console output in dev, JSON in production
|
||||
|
||||
130
context.go
130
context.go
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -30,8 +31,11 @@ type Context struct {
|
||||
actionRegistry map[string]actionEntry
|
||||
signals *sync.Map
|
||||
mu sync.RWMutex
|
||||
navMu sync.Mutex
|
||||
ctxDisposedChan chan struct{}
|
||||
pageStopChan chan struct{}
|
||||
reqCtx context.Context
|
||||
fields []*Field
|
||||
subscriptions []Subscription
|
||||
subsMu sync.Mutex
|
||||
disposeOnce sync.Once
|
||||
@@ -47,7 +51,11 @@ func (c *Context) View(f func() h.H) {
|
||||
if f == nil {
|
||||
panic("nil viewfn")
|
||||
}
|
||||
c.view = func() h.H { return h.Div(h.ID(c.id), f()) }
|
||||
if c.app.layout != nil {
|
||||
c.view = func() h.H { return h.Div(h.ID(c.id), c.app.layout(f)) }
|
||||
} else {
|
||||
c.view = func() h.H { return h.Div(h.ID(c.id), f()) }
|
||||
}
|
||||
}
|
||||
|
||||
// Component registers a subcontext that has self contained data, actions and signals.
|
||||
@@ -130,17 +138,19 @@ func (c *Context) getAction(id string) (actionEntry, error) {
|
||||
return actionEntry{}, fmt.Errorf("action '%s' not found", id)
|
||||
}
|
||||
|
||||
// OnInterval starts a go routine that sets a time.Ticker with the given duration and executes
|
||||
// the given handler func() on every tick. Use *Routine.UpdateInterval to update the interval.
|
||||
func (c *Context) OnInterval(duration time.Duration, handler func()) *OnIntervalRoutine {
|
||||
var cn chan struct{}
|
||||
if c.isComponent() { // components use the chan on the parent page ctx
|
||||
cn = c.parentPageCtx.ctxDisposedChan
|
||||
// OnInterval starts a goroutine that executes handler on every tick of the given duration.
|
||||
// The goroutine is tied to the context lifecycle and will stop when the context is disposed.
|
||||
// Returns a func() that stops the interval when called.
|
||||
func (c *Context) OnInterval(duration time.Duration, handler func()) func() {
|
||||
var disposeCh, pageCh chan struct{}
|
||||
if c.isComponent() {
|
||||
disposeCh = c.parentPageCtx.ctxDisposedChan
|
||||
pageCh = c.parentPageCtx.pageStopChan
|
||||
} else {
|
||||
cn = c.ctxDisposedChan
|
||||
disposeCh = c.ctxDisposedChan
|
||||
pageCh = c.pageStopChan
|
||||
}
|
||||
r := newOnIntervalRoutine(cn, duration, handler)
|
||||
return r
|
||||
return newOnInterval(disposeCh, pageCh, duration, handler)
|
||||
}
|
||||
|
||||
// Signal creates a reactive signal and initializes it with the given value.
|
||||
@@ -263,15 +273,22 @@ func (c *Context) sendPatch(p patch) {
|
||||
// Sync pushes the current view state and signal changes to the browser immediately
|
||||
// over the live SSE event stream.
|
||||
func (c *Context) Sync() {
|
||||
c.syncView(false)
|
||||
}
|
||||
|
||||
func (c *Context) syncView(viewTransition bool) {
|
||||
elemsPatch := new(bytes.Buffer)
|
||||
if err := c.view().Render(elemsPatch); err != nil {
|
||||
c.app.logErr(c, "sync view failed: %v", err)
|
||||
return
|
||||
}
|
||||
c.sendPatch(patch{patchTypeElements, elemsPatch.String()})
|
||||
typ := patchType(patchTypeElements)
|
||||
if viewTransition {
|
||||
typ = patchTypeElementsWithVT
|
||||
}
|
||||
c.sendPatch(patch{typ, elemsPatch.String()})
|
||||
|
||||
updatedSigs := c.prepareSignalsForPatch()
|
||||
|
||||
if len(updatedSigs) != 0 {
|
||||
outgoingSigs, _ := json.Marshal(updatedSigs)
|
||||
c.sendPatch(patch{patchTypeSignals, string(outgoingSigs)})
|
||||
@@ -368,6 +385,46 @@ func (c *Context) ReplaceURLf(format string, a ...any) {
|
||||
c.ReplaceURL(fmt.Sprintf(format, a...))
|
||||
}
|
||||
|
||||
// resetPageState tears down page-specific state (intervals, subscriptions,
|
||||
// actions, signals, fields) without disposing the context itself. The SSE
|
||||
// connection and context lifetime are unaffected.
|
||||
func (c *Context) resetPageState() {
|
||||
close(c.pageStopChan)
|
||||
c.unsubscribeAll()
|
||||
c.mu.Lock()
|
||||
c.actionRegistry = make(map[string]actionEntry)
|
||||
c.signals = new(sync.Map)
|
||||
c.fields = nil
|
||||
c.pageStopChan = make(chan struct{})
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Navigate performs an SPA navigation to the given path. It resets page state,
|
||||
// runs the target page's init function (with middleware), and pushes the new
|
||||
// view over the existing SSE connection with a view transition animation.
|
||||
// If popstate is true, replaceState is used instead of pushState.
|
||||
func (c *Context) Navigate(path string, popstate bool) {
|
||||
c.navMu.Lock()
|
||||
defer c.navMu.Unlock()
|
||||
|
||||
route, initFn, params := c.app.matchRoute(path)
|
||||
if initFn == nil {
|
||||
c.Redirect(path)
|
||||
return
|
||||
}
|
||||
c.resetPageState()
|
||||
c.route = route
|
||||
c.injectRouteParams(params)
|
||||
initFn(c)
|
||||
c.syncView(true)
|
||||
safe := strings.NewReplacer(`\`, `\\`, `'`, `\'`).Replace(path)
|
||||
if popstate {
|
||||
c.ExecScript(fmt.Sprintf("history.replaceState({},'','%s')", safe))
|
||||
} else {
|
||||
c.ExecScript(fmt.Sprintf("history.pushState({},'','%s')", safe))
|
||||
}
|
||||
}
|
||||
|
||||
// dispose idempotently tears down this context: unsubscribes all pubsub
|
||||
// subscriptions and closes ctxDisposedChan to stop routines and exit the SSE loop.
|
||||
func (c *Context) dispose() {
|
||||
@@ -378,7 +435,7 @@ func (c *Context) dispose() {
|
||||
}
|
||||
|
||||
// stopAllRoutines closes ctxDisposedChan, broadcasting to all listening
|
||||
// goroutines (OnIntervalRoutine, SSE loop) that this context is done.
|
||||
// goroutines (OnInterval, SSE loop) that this context is done.
|
||||
func (c *Context) stopAllRoutines() {
|
||||
select {
|
||||
case <-c.ctxDisposedChan:
|
||||
@@ -480,6 +537,50 @@ func (c *Context) unsubscribeAll() {
|
||||
}
|
||||
}
|
||||
|
||||
// Field creates a signal with validation rules attached.
|
||||
// The initial value seeds both the signal and the reset target.
|
||||
// The field is tracked on the context so ValidateAll/ResetFields
|
||||
// can operate on all fields by default.
|
||||
func (c *Context) Field(initial any, rules ...Rule) *Field {
|
||||
f := &Field{
|
||||
signal: c.Signal(initial),
|
||||
rules: rules,
|
||||
initialVal: initial,
|
||||
}
|
||||
target := c
|
||||
if c.isComponent() {
|
||||
target = c.parentPageCtx
|
||||
}
|
||||
target.fields = append(target.fields, f)
|
||||
return f
|
||||
}
|
||||
|
||||
// ValidateAll runs Validate on each field, returning true only if all pass.
|
||||
// With no arguments it validates every field tracked on this context.
|
||||
func (c *Context) ValidateAll(fields ...*Field) bool {
|
||||
if len(fields) == 0 {
|
||||
fields = c.fields
|
||||
}
|
||||
ok := true
|
||||
for _, f := range fields {
|
||||
if !f.Validate() {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// ResetFields resets each field to its initial value and clears errors.
|
||||
// With no arguments it resets every field tracked on this context.
|
||||
func (c *Context) ResetFields(fields ...*Field) {
|
||||
if len(fields) == 0 {
|
||||
fields = c.fields
|
||||
}
|
||||
for _, f := range fields {
|
||||
f.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func newContext(id string, route string, v *V) *Context {
|
||||
if v == nil {
|
||||
panic("create context failed: app pointer is nil")
|
||||
@@ -494,8 +595,9 @@ func newContext(id string, route string, v *V) *Context {
|
||||
actionLimiter: newLimiter(v.actionRateLimit, defaultActionRate, defaultActionBurst),
|
||||
actionRegistry: make(map[string]actionEntry),
|
||||
signals: new(sync.Map),
|
||||
patchChan: make(chan patch, 1),
|
||||
patchChan: make(chan patch, 8),
|
||||
ctxDisposedChan: make(chan struct{}, 1),
|
||||
pageStopChan: make(chan struct{}),
|
||||
createdAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
58
field.go
Normal file
58
field.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package via
|
||||
|
||||
// Field is a signal with built-in validation rules and error state.
|
||||
// It embeds *signal, so all signal methods (Bind, String, Int, Bool, SetValue, Text, ID)
|
||||
// work transparently.
|
||||
type Field struct {
|
||||
*signal
|
||||
rules []Rule
|
||||
errors []string
|
||||
initialVal any
|
||||
}
|
||||
|
||||
// Validate runs all rules against the current value.
|
||||
// Clears previous errors, populates new ones, returns true if all rules pass.
|
||||
func (f *Field) Validate() bool {
|
||||
f.errors = nil
|
||||
val := f.String()
|
||||
for _, r := range f.rules {
|
||||
if err := r.validate(val); err != nil {
|
||||
f.errors = append(f.errors, err.Error())
|
||||
}
|
||||
}
|
||||
return len(f.errors) == 0
|
||||
}
|
||||
|
||||
// HasError returns true if this field has any validation errors.
|
||||
func (f *Field) HasError() bool {
|
||||
return len(f.errors) > 0
|
||||
}
|
||||
|
||||
// FirstError returns the first validation error message, or "" if valid.
|
||||
func (f *Field) FirstError() string {
|
||||
if len(f.errors) > 0 {
|
||||
return f.errors[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Errors returns all current validation error messages.
|
||||
func (f *Field) Errors() []string {
|
||||
return f.errors
|
||||
}
|
||||
|
||||
// AddError manually adds an error message (useful for server-side or cross-field validation).
|
||||
func (f *Field) AddError(msg string) {
|
||||
f.errors = append(f.errors, msg)
|
||||
}
|
||||
|
||||
// ClearErrors removes all validation errors from this field.
|
||||
func (f *Field) ClearErrors() {
|
||||
f.errors = nil
|
||||
}
|
||||
|
||||
// Reset restores the field value to its initial value and clears all errors.
|
||||
func (f *Field) Reset() {
|
||||
f.SetValue(f.initialVal)
|
||||
f.errors = nil
|
||||
}
|
||||
206
field_test.go
Normal file
206
field_test.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/ryanhamamura/via/h"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func newTestField(initial any, rules ...Rule) *Field {
|
||||
v := New()
|
||||
var f *Field
|
||||
v.Page("/", func(c *Context) {
|
||||
f = c.Field(initial, rules...)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
})
|
||||
return f
|
||||
}
|
||||
|
||||
func TestFieldCreation(t *testing.T) {
|
||||
f := newTestField("hello", Required())
|
||||
assert.Equal(t, "hello", f.String())
|
||||
assert.NotEmpty(t, f.ID())
|
||||
}
|
||||
|
||||
func TestFieldSignalDelegation(t *testing.T) {
|
||||
f := newTestField(42)
|
||||
assert.Equal(t, "42", f.String())
|
||||
assert.Equal(t, 42, f.Int())
|
||||
|
||||
f.SetValue("new")
|
||||
assert.Equal(t, "new", f.String())
|
||||
|
||||
// Bind returns an h.H element
|
||||
assert.NotNil(t, f.Bind())
|
||||
}
|
||||
|
||||
func TestFieldValidateSingleRule(t *testing.T) {
|
||||
f := newTestField("", Required())
|
||||
assert.False(t, f.Validate())
|
||||
assert.True(t, f.HasError())
|
||||
assert.Equal(t, "This field is required", f.FirstError())
|
||||
|
||||
f.SetValue("ok")
|
||||
assert.True(t, f.Validate())
|
||||
assert.False(t, f.HasError())
|
||||
assert.Equal(t, "", f.FirstError())
|
||||
}
|
||||
|
||||
func TestFieldValidateMultipleRules(t *testing.T) {
|
||||
f := newTestField("ab", Required(), MinLen(3))
|
||||
assert.False(t, f.Validate())
|
||||
errs := f.Errors()
|
||||
assert.Len(t, errs, 1)
|
||||
assert.Equal(t, "Must be at least 3 characters", errs[0])
|
||||
|
||||
f.SetValue("")
|
||||
assert.False(t, f.Validate())
|
||||
errs = f.Errors()
|
||||
assert.Len(t, errs, 2)
|
||||
}
|
||||
|
||||
func TestFieldErrors(t *testing.T) {
|
||||
f := newTestField("")
|
||||
assert.Nil(t, f.Errors())
|
||||
assert.False(t, f.HasError())
|
||||
assert.Equal(t, "", f.FirstError())
|
||||
}
|
||||
|
||||
func TestFieldAddError(t *testing.T) {
|
||||
f := newTestField("ok")
|
||||
f.AddError("username taken")
|
||||
assert.True(t, f.HasError())
|
||||
assert.Equal(t, "username taken", f.FirstError())
|
||||
assert.Len(t, f.Errors(), 1)
|
||||
}
|
||||
|
||||
func TestFieldClearErrors(t *testing.T) {
|
||||
f := newTestField("", Required())
|
||||
f.Validate()
|
||||
assert.True(t, f.HasError())
|
||||
f.ClearErrors()
|
||||
assert.False(t, f.HasError())
|
||||
}
|
||||
|
||||
func TestFieldReset(t *testing.T) {
|
||||
f := newTestField("initial", Required(), MinLen(3))
|
||||
f.SetValue("changed")
|
||||
f.AddError("some error")
|
||||
|
||||
f.Reset()
|
||||
assert.Equal(t, "initial", f.String())
|
||||
assert.False(t, f.HasError())
|
||||
}
|
||||
|
||||
func TestValidateAll(t *testing.T) {
|
||||
v := New()
|
||||
v.Page("/", func(c *Context) {
|
||||
c.Field("", Required(), MinLen(3))
|
||||
c.Field("", Required(), Email())
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
// both empty → both fail
|
||||
assert.False(t, c.ValidateAll())
|
||||
})
|
||||
|
||||
v2 := New()
|
||||
v2.Page("/", func(c *Context) {
|
||||
c.Field("joe", Required(), MinLen(3))
|
||||
c.Field("joe@x.com", Required(), Email())
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
assert.True(t, c.ValidateAll())
|
||||
})
|
||||
}
|
||||
|
||||
func TestValidateAllPartialFailure(t *testing.T) {
|
||||
v := New()
|
||||
v.Page("/", func(c *Context) {
|
||||
good := c.Field("valid", Required())
|
||||
bad := c.Field("", Required())
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
ok := c.ValidateAll()
|
||||
assert.False(t, ok)
|
||||
assert.False(t, good.HasError())
|
||||
assert.True(t, bad.HasError())
|
||||
})
|
||||
}
|
||||
|
||||
func TestValidateAllSelectiveArgs(t *testing.T) {
|
||||
v := New()
|
||||
v.Page("/", func(c *Context) {
|
||||
a := c.Field("", Required())
|
||||
b := c.Field("ok", Required())
|
||||
cField := c.Field("", Required())
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
// only validate a and b — cField should be untouched
|
||||
ok := c.ValidateAll(a, b)
|
||||
assert.False(t, ok)
|
||||
assert.True(t, a.HasError())
|
||||
assert.False(t, b.HasError())
|
||||
assert.False(t, cField.HasError(), "unselected field should not be validated")
|
||||
})
|
||||
}
|
||||
|
||||
func TestResetFields(t *testing.T) {
|
||||
v := New()
|
||||
v.Page("/", func(c *Context) {
|
||||
a := c.Field("a", Required())
|
||||
b := c.Field("b", Required())
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
a.SetValue("changed-a")
|
||||
b.SetValue("changed-b")
|
||||
a.AddError("err")
|
||||
|
||||
c.ResetFields()
|
||||
assert.Equal(t, "a", a.String())
|
||||
assert.Equal(t, "b", b.String())
|
||||
assert.False(t, a.HasError())
|
||||
})
|
||||
}
|
||||
|
||||
func TestResetFieldsSelectiveArgs(t *testing.T) {
|
||||
v := New()
|
||||
v.Page("/", func(c *Context) {
|
||||
a := c.Field("a")
|
||||
b := c.Field("b")
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
a.SetValue("changed-a")
|
||||
b.SetValue("changed-b")
|
||||
|
||||
// only reset a
|
||||
c.ResetFields(a)
|
||||
assert.Equal(t, "a", a.String())
|
||||
assert.Equal(t, "changed-b", b.String(), "unselected field should not be reset")
|
||||
})
|
||||
}
|
||||
|
||||
func TestFieldValidateClearsPreviousErrors(t *testing.T) {
|
||||
f := newTestField("", Required())
|
||||
f.Validate()
|
||||
assert.True(t, f.HasError())
|
||||
|
||||
f.SetValue("ok")
|
||||
f.Validate()
|
||||
assert.False(t, f.HasError())
|
||||
}
|
||||
|
||||
func TestFieldCustomValidator(t *testing.T) {
|
||||
f := newTestField("bad", Custom(func(val string) error {
|
||||
if val == "bad" {
|
||||
return fmt.Errorf("no bad words")
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
assert.False(t, f.Validate())
|
||||
assert.Equal(t, "no bad words", f.FirstError())
|
||||
|
||||
f.SetValue("good")
|
||||
assert.True(t, f.Validate())
|
||||
}
|
||||
1
go.mod
1
go.mod
@@ -38,6 +38,5 @@ require (
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
golang.org/x/crypto v0.45.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/time v0.14.0
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -11,3 +11,11 @@ func DataEffect(expression string) H {
|
||||
func DataIgnoreMorph() H {
|
||||
return Attr("data-ignore-morph")
|
||||
}
|
||||
|
||||
// DataViewTransition sets the view-transition-name CSS property on an element
|
||||
// via an inline style. Elements with matching names animate between pages
|
||||
// during SPA navigation. If the element also needs other inline styles,
|
||||
// include view-transition-name directly in the Style() call instead.
|
||||
func DataViewTransition(name string) H {
|
||||
return Attr("style", "view-transition-name: "+name)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
@@ -9,7 +8,6 @@ import (
|
||||
|
||||
"github.com/ryanhamamura/via"
|
||||
"github.com/ryanhamamura/via/h"
|
||||
"github.com/ryanhamamura/via/vianats"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -36,15 +34,15 @@ func (u *UserInfo) Avatar() h.H {
|
||||
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DevMode: true,
|
||||
DocumentTitle: "NATS Chat",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
})
|
||||
|
||||
ps, err := vianats.New(ctx, "./data/nats")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start embedded NATS: %v", err)
|
||||
}
|
||||
defer ps.Close()
|
||||
|
||||
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "CHAT",
|
||||
Subjects: []string{"chat.>"},
|
||||
MaxMsgs: 1000,
|
||||
@@ -54,15 +52,6 @@ func main() {
|
||||
log.Fatalf("Failed to ensure stream: %v", err)
|
||||
}
|
||||
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DevMode: true,
|
||||
DocumentTitle: "NATS Chat",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
PubSub: ps,
|
||||
})
|
||||
|
||||
v.AppendToHead(
|
||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
||||
h.StyleEl(h.Raw(`
|
||||
@@ -148,7 +137,7 @@ func main() {
|
||||
subject := "chat.room." + room
|
||||
|
||||
// Replay history from JetStream
|
||||
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil {
|
||||
if hist, err := via.ReplayHistory[ChatMessage](v, subject, 50); err == nil {
|
||||
messages = hist
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"html"
|
||||
@@ -11,7 +10,6 @@ import (
|
||||
|
||||
"github.com/ryanhamamura/via"
|
||||
"github.com/ryanhamamura/via/h"
|
||||
"github.com/ryanhamamura/via/vianats"
|
||||
)
|
||||
|
||||
var WithSignal = via.WithSignal
|
||||
@@ -49,15 +47,15 @@ func findBookmark(id string) (Bookmark, int) {
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DevMode: true,
|
||||
DocumentTitle: "Bookmarks",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
})
|
||||
|
||||
ps, err := vianats.New(ctx, "./data/nats")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start embedded NATS: %v", err)
|
||||
}
|
||||
defer ps.Close()
|
||||
|
||||
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
||||
err := via.EnsureStream(v, via.StreamConfig{
|
||||
Name: "BOOKMARKS",
|
||||
Subjects: []string{"bookmarks.>"},
|
||||
MaxMsgs: 1000,
|
||||
@@ -67,15 +65,6 @@ func main() {
|
||||
log.Fatalf("Failed to ensure stream: %v", err)
|
||||
}
|
||||
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DevMode: true,
|
||||
DocumentTitle: "Bookmarks",
|
||||
LogLevel: via.LogLevelInfo,
|
||||
ServerAddress: ":7331",
|
||||
PubSub: ps,
|
||||
})
|
||||
|
||||
v.AppendToHead(
|
||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
||||
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
||||
|
||||
@@ -37,29 +37,33 @@ func main() {
|
||||
return 1000 / time.Duration(refreshRate.Int()) * time.Millisecond
|
||||
}
|
||||
|
||||
updateData := c.OnInterval(computedTickDuration(), func() {
|
||||
ts := time.Now().UnixMilli()
|
||||
val := rand.ExpFloat64() * 10
|
||||
var stopUpdate func()
|
||||
startInterval := func() {
|
||||
stopUpdate = c.OnInterval(computedTickDuration(), func() {
|
||||
ts := time.Now().UnixMilli()
|
||||
val := rand.ExpFloat64() * 10
|
||||
|
||||
c.ExecScript(fmt.Sprintf(`
|
||||
if (myChart) {
|
||||
myChart.appendData({seriesIndex: 0, data: [[%d, %f]]});
|
||||
myChart.setOption({},{notMerge:false,lazyUpdate:true});
|
||||
};
|
||||
`, ts, val))
|
||||
})
|
||||
updateData.Start()
|
||||
c.ExecScript(fmt.Sprintf(`
|
||||
if (myChart) {
|
||||
myChart.appendData({seriesIndex: 0, data: [[%d, %f]]});
|
||||
myChart.setOption({},{notMerge:false,lazyUpdate:true});
|
||||
};
|
||||
`, ts, val))
|
||||
})
|
||||
}
|
||||
startInterval()
|
||||
|
||||
updateRefreshRate := c.Action(func() {
|
||||
updateData.UpdateInterval(computedTickDuration())
|
||||
stopUpdate()
|
||||
startInterval()
|
||||
})
|
||||
|
||||
toggleIsLive := c.Action(func() {
|
||||
isLive = isLiveSig.Bool()
|
||||
if isLive {
|
||||
updateData.Start()
|
||||
startInterval()
|
||||
} else {
|
||||
updateData.Stop()
|
||||
stopUpdate()
|
||||
}
|
||||
})
|
||||
c.View(func() h.H {
|
||||
|
||||
87
internal/examples/signup/main.go
Normal file
87
internal/examples/signup/main.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/ryanhamamura/via"
|
||||
"github.com/ryanhamamura/via/h"
|
||||
)
|
||||
|
||||
func main() {
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DocumentTitle: "Signup",
|
||||
ServerAddress: ":8080",
|
||||
})
|
||||
|
||||
v.AppendToHead(h.StyleEl(h.Raw(`
|
||||
body { font-family: system-ui, sans-serif; max-width: 420px; margin: 2rem auto; padding: 0 1rem; }
|
||||
label { display: block; font-weight: 600; margin-top: 1rem; }
|
||||
input { display: block; width: 100%; padding: 0.4rem; margin-top: 0.25rem; box-sizing: border-box; }
|
||||
.error { color: #c00; font-size: 0.85rem; margin-top: 0.2rem; }
|
||||
.success { color: #080; margin-top: 1rem; }
|
||||
.actions { margin-top: 1.5rem; display: flex; gap: 0.5rem; }
|
||||
`)))
|
||||
|
||||
v.Page("/", func(c *via.Context) {
|
||||
username := c.Field("", via.Required(), via.MinLen(3), via.MaxLen(20))
|
||||
email := c.Field("", via.Required(), via.Email())
|
||||
age := c.Field("", via.Required(), via.Min(13), via.Max(120))
|
||||
// Optional field — only validated when non-empty
|
||||
website := c.Field("", via.Pattern(`^$|^https?://\S+$`, "Must be a valid URL"))
|
||||
|
||||
var success string
|
||||
|
||||
signup := c.Action(func() {
|
||||
success = ""
|
||||
if !c.ValidateAll() {
|
||||
c.Sync()
|
||||
return
|
||||
}
|
||||
// Server-side check
|
||||
if username.String() == "admin" {
|
||||
username.AddError("Username is already taken")
|
||||
c.Sync()
|
||||
return
|
||||
}
|
||||
success = "Account created for " + username.String() + "!"
|
||||
c.ResetFields()
|
||||
c.Sync()
|
||||
})
|
||||
|
||||
reset := c.Action(func() {
|
||||
success = ""
|
||||
c.ResetFields()
|
||||
c.Sync()
|
||||
})
|
||||
|
||||
c.View(func() h.H {
|
||||
return h.Div(
|
||||
h.H1(h.Text("Sign Up")),
|
||||
|
||||
h.Label(h.Text("Username")),
|
||||
h.Input(h.Type("text"), h.Placeholder("pick a username"), username.Bind()),
|
||||
h.If(username.HasError(), h.Div(h.Class("error"), h.Text(username.FirstError()))),
|
||||
|
||||
h.Label(h.Text("Email")),
|
||||
h.Input(h.Type("email"), h.Placeholder("you@example.com"), email.Bind()),
|
||||
h.If(email.HasError(), h.Div(h.Class("error"), h.Text(email.FirstError()))),
|
||||
|
||||
h.Label(h.Text("Age")),
|
||||
h.Input(h.Type("number"), h.Placeholder("your age"), age.Bind()),
|
||||
h.If(age.HasError(), h.Div(h.Class("error"), h.Text(age.FirstError()))),
|
||||
|
||||
h.Label(h.Text("Website (optional)")),
|
||||
h.Input(h.Type("url"), h.Placeholder("https://example.com"), website.Bind()),
|
||||
h.If(website.HasError(), h.Div(h.Class("error"), h.Text(website.FirstError()))),
|
||||
|
||||
h.Div(h.Class("actions"),
|
||||
h.Button(h.Text("Sign Up"), signup.OnClick()),
|
||||
h.Button(h.Text("Reset"), reset.OnClick()),
|
||||
),
|
||||
|
||||
h.If(success != "", h.P(h.Class("success"), h.Text(success))),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
v.Start()
|
||||
}
|
||||
91
internal/examples/spa/main.go
Normal file
91
internal/examples/spa/main.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ryanhamamura/via"
|
||||
. "github.com/ryanhamamura/via/h"
|
||||
)
|
||||
|
||||
func main() {
|
||||
v := via.New()
|
||||
v.Config(via.Options{
|
||||
DocumentTitle: "SPA Navigation",
|
||||
ServerAddress: ":7331",
|
||||
})
|
||||
|
||||
v.AppendToHead(
|
||||
Raw(`<link rel="preconnect" href="https://fonts.googleapis.com">`),
|
||||
Raw(`<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>`),
|
||||
Raw(`<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;600;700&display=swap" rel="stylesheet">`),
|
||||
Raw(`<style>body{font-family:'Inter',sans-serif;margin:0;background:#111;color:#eee}</style>`),
|
||||
)
|
||||
|
||||
v.Layout(func(content func() H) H {
|
||||
return Div(
|
||||
Nav(
|
||||
Style("display:flex;gap:1rem;padding:1rem;background:#222;"),
|
||||
A(Href("/"), Text("Home"), Style("color:#fff")),
|
||||
A(Href("/counter"), Text("Counter"), Style("color:#fff")),
|
||||
A(Href("/clock"), Text("Clock"), Style("color:#fff")),
|
||||
A(Href("https://github.com"), Text("GitHub (external)"), Style("color:#888")),
|
||||
A(Href("/"), Text("Full Reload"), Attr("data-via-no-boost"), Style("color:#f88")),
|
||||
),
|
||||
Main(Style("padding:1rem"), content()),
|
||||
)
|
||||
})
|
||||
|
||||
// Home page
|
||||
v.Page("/", func(c *via.Context) {
|
||||
goCounter := c.Action(func() { c.Navigate("/counter", false) })
|
||||
|
||||
c.View(func() H {
|
||||
return Div(
|
||||
H1(Text("Home"), DataViewTransition("page-title")),
|
||||
P(Text("Click the nav links above — no page reload, no white flash.")),
|
||||
P(Text("Or navigate programmatically:")),
|
||||
Button(Text("Go to Counter"), goCounter.OnClick()),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
// Counter page — demonstrates signals and actions survive within a page,
|
||||
// but reset on navigate away and back.
|
||||
v.Page("/counter", func(c *via.Context) {
|
||||
count := 0
|
||||
increment := c.Action(func() { count++; c.Sync() })
|
||||
goHome := c.Action(func() { c.Navigate("/", false) })
|
||||
|
||||
c.View(func() H {
|
||||
return Div(
|
||||
H1(Text("Counter"), DataViewTransition("page-title")),
|
||||
P(Textf("Count: %d", count)),
|
||||
Button(Text("+1"), increment.OnClick()),
|
||||
Button(Text("Go Home"), goHome.OnClick(), Style("margin-left:0.5rem")),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
// Clock page — demonstrates OnInterval cleanup on navigate.
|
||||
v.Page("/clock", func(c *via.Context) {
|
||||
now := time.Now().Format("15:04:05")
|
||||
c.OnInterval(time.Second, func() {
|
||||
now = time.Now().Format("15:04:05")
|
||||
c.Sync()
|
||||
})
|
||||
|
||||
c.View(func() H {
|
||||
return Div(
|
||||
H1(Text("Clock"), DataViewTransition("page-title")),
|
||||
P(Text("This page has an OnInterval that ticks every second.")),
|
||||
P(Textf("Current time: %s", now)),
|
||||
P(Text("Navigate away and back — the old interval stops, a new one starts.")),
|
||||
P(Textf("Proof this is a fresh page init: random = %d", time.Now().UnixNano()%1000)),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
fmt.Println("SPA example running at http://localhost:7331")
|
||||
v.Start()
|
||||
}
|
||||
190
nats.go
Normal file
190
nats.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/delaneyj/toolbelt/embeddednats"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// defaultNATS is the process-scoped embedded NATS server.
|
||||
type defaultNATS struct {
|
||||
server *embeddednats.Server
|
||||
nc *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
cancel context.CancelFunc
|
||||
dataDir string
|
||||
}
|
||||
|
||||
var (
|
||||
sharedNATS *defaultNATS
|
||||
sharedNATSOnce sync.Once
|
||||
sharedNATSErr error
|
||||
)
|
||||
|
||||
// getSharedNATS returns a process-level singleton embedded NATS server.
|
||||
// The server starts once and is reused across all V instances.
|
||||
func getSharedNATS() (*defaultNATS, error) {
|
||||
sharedNATSOnce.Do(func() {
|
||||
sharedNATS, sharedNATSErr = startDefaultNATS()
|
||||
})
|
||||
return sharedNATS, sharedNATSErr
|
||||
}
|
||||
|
||||
func startDefaultNATS() (dn *defaultNATS, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("nats server panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
dataDir, err := os.MkdirTemp("", "via-nats-*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create temp dir: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
||||
if err != nil {
|
||||
cancel()
|
||||
os.RemoveAll(dataDir)
|
||||
return nil, fmt.Errorf("start embedded nats: %w", err)
|
||||
}
|
||||
ns.WaitForServer()
|
||||
|
||||
nc, err := ns.Client()
|
||||
if err != nil {
|
||||
ns.Close()
|
||||
cancel()
|
||||
os.RemoveAll(dataDir)
|
||||
return nil, fmt.Errorf("connect nats client: %w", err)
|
||||
}
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Close()
|
||||
cancel()
|
||||
os.RemoveAll(dataDir)
|
||||
return nil, fmt.Errorf("init jetstream: %w", err)
|
||||
}
|
||||
|
||||
return &defaultNATS{
|
||||
server: ns,
|
||||
nc: nc,
|
||||
js: js,
|
||||
cancel: cancel,
|
||||
dataDir: dataDir,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *defaultNATS) Publish(subject string, data []byte) error {
|
||||
return n.nc.Publish(subject, data)
|
||||
}
|
||||
|
||||
func (n *defaultNATS) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
||||
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||
handler(msg.Data)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// natsRef wraps a shared defaultNATS as a PubSub. Close is a no-op because
|
||||
// the underlying server is process-scoped and outlives individual V instances.
|
||||
type natsRef struct {
|
||||
dn *defaultNATS
|
||||
}
|
||||
|
||||
func (r *natsRef) Publish(subject string, data []byte) error {
|
||||
return r.dn.Publish(subject, data)
|
||||
}
|
||||
|
||||
func (r *natsRef) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
||||
return r.dn.Subscribe(subject, handler)
|
||||
}
|
||||
|
||||
func (r *natsRef) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NATSConn returns the underlying NATS connection from the built-in embedded
|
||||
// server, or nil if a custom PubSub backend is in use.
|
||||
func (v *V) NATSConn() *nats.Conn {
|
||||
if v.defaultNATS != nil {
|
||||
return v.defaultNATS.nc
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// JetStream returns the JetStream context from the built-in embedded server,
|
||||
// or nil if a custom PubSub backend is in use.
|
||||
func (v *V) JetStream() nats.JetStreamContext {
|
||||
if v.defaultNATS != nil {
|
||||
return v.defaultNATS.js
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
||||
type StreamConfig struct {
|
||||
Name string
|
||||
Subjects []string
|
||||
MaxMsgs int64
|
||||
MaxAge time.Duration
|
||||
}
|
||||
|
||||
// EnsureStream creates or updates a JetStream stream matching cfg.
|
||||
func EnsureStream(v *V, cfg StreamConfig) error {
|
||||
js := v.JetStream()
|
||||
if js == nil {
|
||||
return fmt.Errorf("jetstream not available")
|
||||
}
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: cfg.Name,
|
||||
Subjects: cfg.Subjects,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxMsgs: cfg.MaxMsgs,
|
||||
MaxAge: cfg.MaxAge,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// ReplayHistory fetches the last limit messages from subject,
|
||||
// deserializing each as T. Returns an empty slice if nothing is available.
|
||||
func ReplayHistory[T any](v *V, subject string, limit int) ([]T, error) {
|
||||
js := v.JetStream()
|
||||
if js == nil {
|
||||
return nil, fmt.Errorf("jetstream not available")
|
||||
}
|
||||
sub, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
var msgs []T
|
||||
for {
|
||||
raw, err := sub.NextMsg(200 * time.Millisecond)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
var msg T
|
||||
if json.Unmarshal(raw.Data, &msg) == nil {
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
}
|
||||
|
||||
if limit > 0 && len(msgs) > limit {
|
||||
msgs = msgs[len(msgs)-limit:]
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
108
nats_test.go
108
nats_test.go
@@ -2,7 +2,6 @@ package via
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -11,88 +10,36 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockHandler struct {
|
||||
id int64
|
||||
fn func([]byte)
|
||||
active atomic.Bool
|
||||
}
|
||||
|
||||
// mockPubSub implements PubSub for testing without NATS.
|
||||
type mockPubSub struct {
|
||||
mu sync.Mutex
|
||||
subs map[string][]*mockHandler
|
||||
nextID atomic.Int64
|
||||
}
|
||||
|
||||
func newMockPubSub() *mockPubSub {
|
||||
return &mockPubSub{subs: make(map[string][]*mockHandler)}
|
||||
}
|
||||
|
||||
func (m *mockPubSub) Publish(subject string, data []byte) error {
|
||||
m.mu.Lock()
|
||||
handlers := make([]*mockHandler, len(m.subs[subject]))
|
||||
copy(handlers, m.subs[subject])
|
||||
m.mu.Unlock()
|
||||
for _, h := range handlers {
|
||||
if h.active.Load() {
|
||||
h.fn(data)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
mh := &mockHandler{
|
||||
id: m.nextID.Add(1),
|
||||
fn: handler,
|
||||
}
|
||||
mh.active.Store(true)
|
||||
m.subs[subject] = append(m.subs[subject], mh)
|
||||
return &mockSub{handler: mh}, nil
|
||||
}
|
||||
|
||||
func (m *mockPubSub) Close() error { return nil }
|
||||
|
||||
type mockSub struct {
|
||||
handler *mockHandler
|
||||
}
|
||||
|
||||
func (s *mockSub) Unsubscribe() error {
|
||||
s.handler.active.Store(false)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPubSub_RoundTrip(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
var received []byte
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
done := make(chan struct{})
|
||||
|
||||
c := newContext("test-ctx", "/", v)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
_, err := c.Subscribe("test.topic", func(data []byte) {
|
||||
received = data
|
||||
wg.Done()
|
||||
close(done)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = c.Publish("test.topic", []byte("hello"))
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for message")
|
||||
}
|
||||
assert.Equal(t, []byte("hello"), received)
|
||||
}
|
||||
|
||||
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
var mu sync.Mutex
|
||||
var results []string
|
||||
@@ -119,7 +66,17 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||
})
|
||||
|
||||
c1.Publish("broadcast", []byte("msg"))
|
||||
wg.Wait()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for messages")
|
||||
}
|
||||
|
||||
assert.Len(t, results, 2)
|
||||
assert.Contains(t, results, "c1:msg")
|
||||
@@ -127,9 +84,8 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
c := newContext("cleanup-ctx", "/", v)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
@@ -144,9 +100,8 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
c := newContext("unsub-ctx", "/", v)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
@@ -160,28 +115,13 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
||||
sub.Unsubscribe()
|
||||
|
||||
c.Publish("topic", []byte("ignored"))
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
assert.False(t, called)
|
||||
}
|
||||
|
||||
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
|
||||
v := New()
|
||||
|
||||
c := newContext("noop-ctx", "/", v)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
err := c.Publish("topic", []byte("data"))
|
||||
assert.Error(t, err)
|
||||
|
||||
sub, err := c.Subscribe("topic", func(data []byte) {})
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, sub)
|
||||
}
|
||||
|
||||
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
// Panic-check context has id=""
|
||||
c := newContext("", "/", v)
|
||||
|
||||
51
navigate.js
Normal file
51
navigate.js
Normal file
@@ -0,0 +1,51 @@
|
||||
(function() {
|
||||
const meta = document.querySelector('meta[data-signals]');
|
||||
if (!meta) return;
|
||||
const raw = meta.getAttribute('data-signals');
|
||||
const parsed = JSON.parse(raw.replace(/'/g, '"'));
|
||||
const ctxID = parsed['via-ctx'];
|
||||
const csrf = parsed['via-csrf'];
|
||||
if (!ctxID || !csrf) return;
|
||||
|
||||
function navigate(url, popstate) {
|
||||
const params = new URLSearchParams({
|
||||
'via-ctx': ctxID,
|
||||
'via-csrf': csrf,
|
||||
'url': url,
|
||||
});
|
||||
if (popstate) params.set('popstate', '1');
|
||||
fetch('/_navigate', {
|
||||
method: 'POST',
|
||||
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
|
||||
body: params.toString()
|
||||
}).then(function(res) {
|
||||
if (!res.ok) window.location.href = url;
|
||||
}).catch(function() {
|
||||
window.location.href = url;
|
||||
});
|
||||
}
|
||||
|
||||
document.addEventListener('click', function(e) {
|
||||
var el = e.target;
|
||||
while (el && el.tagName !== 'A') el = el.parentElement;
|
||||
if (!el) return;
|
||||
if (e.ctrlKey || e.metaKey || e.shiftKey || e.altKey) return;
|
||||
if (el.hasAttribute('target')) return;
|
||||
if (el.hasAttribute('data-via-no-boost')) return;
|
||||
var href = el.getAttribute('href');
|
||||
if (!href || href.startsWith('#')) return;
|
||||
try {
|
||||
var url = new URL(href, window.location.origin);
|
||||
if (url.origin !== window.location.origin) return;
|
||||
e.preventDefault();
|
||||
navigate(url.pathname + url.search + url.hash);
|
||||
} catch(_) {}
|
||||
});
|
||||
|
||||
var ready = false;
|
||||
window.addEventListener('popstate', function() {
|
||||
if (!ready) return;
|
||||
navigate(window.location.pathname + window.location.search + window.location.hash, true);
|
||||
});
|
||||
setTimeout(function() { ready = true; }, 0);
|
||||
})();
|
||||
@@ -1,7 +1,8 @@
|
||||
package via
|
||||
|
||||
// PubSub is an interface for publish/subscribe messaging backends.
|
||||
// The vianats sub-package provides an embedded NATS implementation.
|
||||
// By default, New() starts an embedded NATS server. Supply a custom
|
||||
// implementation via Config(Options{PubSub: yourBackend}) to override.
|
||||
type PubSub interface {
|
||||
Publish(subject string, data []byte) error
|
||||
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ryanhamamura/via/h"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -10,9 +10,8 @@ import (
|
||||
)
|
||||
|
||||
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
type event struct {
|
||||
Name string `json:"name"`
|
||||
@@ -20,30 +19,32 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
||||
}
|
||||
|
||||
var got event
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
done := make(chan struct{})
|
||||
|
||||
c := newContext("typed-ctx", "/", v)
|
||||
c.View(func() h.H { return h.Div() })
|
||||
|
||||
_, err := Subscribe(c, "events", func(e event) {
|
||||
got = e
|
||||
wg.Done()
|
||||
close(done)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = Publish(c, "events", event{Name: "click", Count: 42})
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("timed out waiting for message")
|
||||
}
|
||||
assert.Equal(t, "click", got.Name)
|
||||
assert.Equal(t, 42, got.Count)
|
||||
}
|
||||
|
||||
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
||||
ps := newMockPubSub()
|
||||
v := New()
|
||||
v.Config(Options{PubSub: ps})
|
||||
defer v.Shutdown()
|
||||
|
||||
type msg struct {
|
||||
Text string `json:"text"`
|
||||
@@ -62,5 +63,6 @@ func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
||||
err = c.Publish("topic", []byte("not json"))
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
assert.False(t, called)
|
||||
}
|
||||
|
||||
74
routine.go
74
routine.go
@@ -1,76 +1,34 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// OnIntervalRoutine allows for defining concurrent goroutines safely. Goroutines started by *OnIntervalRoutine
|
||||
// are tied to the *Context lifecycle.
|
||||
type OnIntervalRoutine struct {
|
||||
mu sync.RWMutex
|
||||
ctxDisposed chan struct{}
|
||||
localInterrupt chan struct{}
|
||||
isRunning atomic.Bool
|
||||
routineFn func()
|
||||
tckDuration time.Duration
|
||||
updateTkrChan chan time.Duration
|
||||
}
|
||||
func newOnInterval(ctxDisposedChan, pageStopChan chan struct{}, duration time.Duration, handler func()) func() {
|
||||
localInterrupt := make(chan struct{})
|
||||
var stopped atomic.Bool
|
||||
|
||||
// UpdateInterval sets a new interval duration for the internal *time.Ticker. If the provided
|
||||
// duration is equal of less than 0, UpdateInterval does nothing.
|
||||
func (r *OnIntervalRoutine) UpdateInterval(d time.Duration) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.tckDuration = d
|
||||
r.updateTkrChan <- d
|
||||
|
||||
}
|
||||
|
||||
// Start executes the predifined goroutine. If no predifined goroutine exists, or it already
|
||||
// started, Start does nothing.
|
||||
func (r *OnIntervalRoutine) Start() {
|
||||
if !r.isRunning.CompareAndSwap(false, true) || r.routineFn == nil {
|
||||
return
|
||||
}
|
||||
go r.routineFn()
|
||||
}
|
||||
|
||||
// Stop interrupts the predifined goroutine. If no predifined goroutine exists, or it already
|
||||
// ustopped, Stop does nothing.
|
||||
func (r *OnIntervalRoutine) Stop() {
|
||||
if !r.isRunning.CompareAndSwap(true, false) || r.routineFn == nil {
|
||||
return
|
||||
}
|
||||
r.localInterrupt <- struct{}{}
|
||||
}
|
||||
|
||||
func newOnIntervalRoutine(ctxDisposedChan chan struct{},
|
||||
duration time.Duration, handler func()) *OnIntervalRoutine {
|
||||
r := &OnIntervalRoutine{
|
||||
ctxDisposed: ctxDisposedChan,
|
||||
localInterrupt: make(chan struct{}),
|
||||
updateTkrChan: make(chan time.Duration),
|
||||
}
|
||||
r.tckDuration = duration
|
||||
r.routineFn = func() {
|
||||
r.mu.RLock()
|
||||
tkr := time.NewTicker(r.tckDuration)
|
||||
r.mu.RUnlock()
|
||||
defer tkr.Stop() // clean up the ticker when routine stops
|
||||
go func() {
|
||||
tkr := time.NewTicker(duration)
|
||||
defer tkr.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-r.ctxDisposed: // dispose of the routine when ctx is disposed
|
||||
case <-ctxDisposedChan:
|
||||
return
|
||||
case <-r.localInterrupt: // dispose of the routine on interrupt signal
|
||||
case <-pageStopChan:
|
||||
return
|
||||
case <-localInterrupt:
|
||||
return
|
||||
case d := <-r.updateTkrChan:
|
||||
tkr.Reset(d)
|
||||
case <-tkr.C:
|
||||
handler()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
if stopped.CompareAndSwap(false, true) {
|
||||
close(localInterrupt)
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
130
rule.go
Normal file
130
rule.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
// Rule defines a single validation check for a Field.
|
||||
type Rule struct {
|
||||
validate func(val string) error
|
||||
}
|
||||
|
||||
// Required rejects empty or whitespace-only values.
|
||||
func Required(msg ...string) Rule {
|
||||
m := "This field is required"
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
if strings.TrimSpace(val) == "" {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// MinLen rejects values shorter than n characters.
|
||||
func MinLen(n int, msg ...string) Rule {
|
||||
m := fmt.Sprintf("Must be at least %d characters", n)
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
if utf8.RuneCountInString(val) < n {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// MaxLen rejects values longer than n characters.
|
||||
func MaxLen(n int, msg ...string) Rule {
|
||||
m := fmt.Sprintf("Must be at most %d characters", n)
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
if utf8.RuneCountInString(val) > n {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// Min parses the value as an integer and rejects values less than n.
|
||||
func Min(n int, msg ...string) Rule {
|
||||
m := fmt.Sprintf("Must be at least %d", n)
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
v, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return errors.New("Must be a valid number")
|
||||
}
|
||||
if v < n {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// Max parses the value as an integer and rejects values greater than n.
|
||||
func Max(n int, msg ...string) Rule {
|
||||
m := fmt.Sprintf("Must be at most %d", n)
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
v, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return errors.New("Must be a valid number")
|
||||
}
|
||||
if v > n {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// Pattern rejects values that don't match the regular expression re.
|
||||
func Pattern(re string, msg ...string) Rule {
|
||||
m := "Invalid format"
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
compiled := regexp.MustCompile(re)
|
||||
return Rule{func(val string) error {
|
||||
if !compiled.MatchString(val) {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
var emailRegexp = regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`)
|
||||
|
||||
// Email rejects values that don't look like an email address.
|
||||
func Email(msg ...string) Rule {
|
||||
m := "Invalid email address"
|
||||
if len(msg) > 0 {
|
||||
m = msg[0]
|
||||
}
|
||||
return Rule{func(val string) error {
|
||||
if !emailRegexp.MatchString(val) {
|
||||
return errors.New(m)
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// Custom creates a rule from a user-provided validation function.
|
||||
// The function should return nil for valid input and an error for invalid input.
|
||||
func Custom(fn func(string) error) Rule {
|
||||
return Rule{validate: fn}
|
||||
}
|
||||
116
rule_test.go
Normal file
116
rule_test.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package via
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRequired(t *testing.T) {
|
||||
r := Required()
|
||||
assert.NoError(t, r.validate("hello"))
|
||||
assert.Error(t, r.validate(""))
|
||||
assert.Error(t, r.validate(" "))
|
||||
}
|
||||
|
||||
func TestRequiredCustomMessage(t *testing.T) {
|
||||
r := Required("name needed")
|
||||
err := r.validate("")
|
||||
assert.EqualError(t, err, "name needed")
|
||||
}
|
||||
|
||||
func TestMinLen(t *testing.T) {
|
||||
r := MinLen(3)
|
||||
assert.NoError(t, r.validate("abc"))
|
||||
assert.NoError(t, r.validate("abcd"))
|
||||
assert.Error(t, r.validate("ab"))
|
||||
assert.Error(t, r.validate(""))
|
||||
}
|
||||
|
||||
func TestMinLenCustomMessage(t *testing.T) {
|
||||
r := MinLen(5, "too short")
|
||||
err := r.validate("ab")
|
||||
assert.EqualError(t, err, "too short")
|
||||
}
|
||||
|
||||
func TestMaxLen(t *testing.T) {
|
||||
r := MaxLen(5)
|
||||
assert.NoError(t, r.validate("abc"))
|
||||
assert.NoError(t, r.validate("abcde"))
|
||||
assert.Error(t, r.validate("abcdef"))
|
||||
}
|
||||
|
||||
func TestMaxLenCustomMessage(t *testing.T) {
|
||||
r := MaxLen(2, "too long")
|
||||
err := r.validate("abc")
|
||||
assert.EqualError(t, err, "too long")
|
||||
}
|
||||
|
||||
func TestMin(t *testing.T) {
|
||||
r := Min(5)
|
||||
assert.NoError(t, r.validate("5"))
|
||||
assert.NoError(t, r.validate("10"))
|
||||
assert.Error(t, r.validate("4"))
|
||||
assert.Error(t, r.validate("abc"))
|
||||
}
|
||||
|
||||
func TestMinCustomMessage(t *testing.T) {
|
||||
r := Min(10, "need 10+")
|
||||
err := r.validate("3")
|
||||
assert.EqualError(t, err, "need 10+")
|
||||
}
|
||||
|
||||
func TestMax(t *testing.T) {
|
||||
r := Max(10)
|
||||
assert.NoError(t, r.validate("10"))
|
||||
assert.NoError(t, r.validate("5"))
|
||||
assert.Error(t, r.validate("11"))
|
||||
assert.Error(t, r.validate("abc"))
|
||||
}
|
||||
|
||||
func TestMaxCustomMessage(t *testing.T) {
|
||||
r := Max(5, "too big")
|
||||
err := r.validate("6")
|
||||
assert.EqualError(t, err, "too big")
|
||||
}
|
||||
|
||||
func TestPattern(t *testing.T) {
|
||||
r := Pattern(`^\d{3}$`)
|
||||
assert.NoError(t, r.validate("123"))
|
||||
assert.Error(t, r.validate("12"))
|
||||
assert.Error(t, r.validate("abcd"))
|
||||
}
|
||||
|
||||
func TestPatternCustomMessage(t *testing.T) {
|
||||
r := Pattern(`^\d+$`, "digits only")
|
||||
err := r.validate("abc")
|
||||
assert.EqualError(t, err, "digits only")
|
||||
}
|
||||
|
||||
func TestEmail(t *testing.T) {
|
||||
r := Email()
|
||||
assert.NoError(t, r.validate("user@example.com"))
|
||||
assert.NoError(t, r.validate("a.b+c@foo.co"))
|
||||
assert.Error(t, r.validate("notanemail"))
|
||||
assert.Error(t, r.validate("@example.com"))
|
||||
assert.Error(t, r.validate("user@"))
|
||||
assert.Error(t, r.validate(""))
|
||||
}
|
||||
|
||||
func TestEmailCustomMessage(t *testing.T) {
|
||||
r := Email("bad email")
|
||||
err := r.validate("nope")
|
||||
assert.EqualError(t, err, "bad email")
|
||||
}
|
||||
|
||||
func TestCustom(t *testing.T) {
|
||||
r := Custom(func(val string) error {
|
||||
if val != "magic" {
|
||||
return fmt.Errorf("must be magic")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, r.validate("magic"))
|
||||
assert.EqualError(t, r.validate("other"), "must be magic")
|
||||
}
|
||||
83
via.go
83
via.go
@@ -35,6 +35,9 @@ import (
|
||||
//go:embed datastar.js
|
||||
var datastarJS []byte
|
||||
|
||||
//go:embed navigate.js
|
||||
var navigateJS []byte
|
||||
|
||||
// V is the root application.
|
||||
// It manages page routing, user sessions, and SSE connections for live updates.
|
||||
type V struct {
|
||||
@@ -47,14 +50,17 @@ type V struct {
|
||||
documentHeadIncludes []h.H
|
||||
documentFootIncludes []h.H
|
||||
devModePageInitFnMap map[string]func(*Context)
|
||||
pageRegistry map[string]func(*Context)
|
||||
sessionManager *scs.SessionManager
|
||||
pubsub PubSub
|
||||
defaultNATS *defaultNATS
|
||||
actionRateLimit RateLimitConfig
|
||||
datastarPath string
|
||||
datastarContent []byte
|
||||
datastarOnce sync.Once
|
||||
reaperStop chan struct{}
|
||||
middleware []Middleware
|
||||
layout func(func() h.H) h.H
|
||||
}
|
||||
|
||||
func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event {
|
||||
@@ -130,6 +136,7 @@ func (v *V) Config(cfg Options) {
|
||||
v.datastarPath = cfg.DatastarPath
|
||||
}
|
||||
if cfg.PubSub != nil {
|
||||
v.defaultNATS = nil
|
||||
v.pubsub = cfg.PubSub
|
||||
}
|
||||
if cfg.ContextTTL != 0 {
|
||||
@@ -194,6 +201,7 @@ func (v *V) page(route string, raw, wrapped func(*Context)) {
|
||||
c.stopAllRoutines()
|
||||
}()
|
||||
|
||||
v.pageRegistry[route] = wrapped
|
||||
if v.cfg.DevMode {
|
||||
v.devModePageInitFnMap[route] = wrapped
|
||||
}
|
||||
@@ -221,6 +229,8 @@ func (v *V) page(route string, raw, wrapped func(*Context)) {
|
||||
h.Meta(h.Data("init", "@get('/_sse')")),
|
||||
h.Meta(h.Data("init", fmt.Sprintf(`window.addEventListener('beforeunload', (evt) => {
|
||||
navigator.sendBeacon('/_session/close', '%s');});`, c.id))),
|
||||
h.Meta(h.Attr("name", "view-transition"), h.Attr("content", "same-origin")),
|
||||
h.Script(h.Raw(string(navigateJS))),
|
||||
)
|
||||
|
||||
bodyElements := []h.H{c.view()}
|
||||
@@ -379,6 +389,7 @@ func (v *V) Shutdown() {
|
||||
v.logErr(nil, "pubsub close error: %v", err)
|
||||
}
|
||||
}
|
||||
v.defaultNATS = nil
|
||||
|
||||
v.logInfo(nil, "shutdown complete")
|
||||
}
|
||||
@@ -408,6 +419,11 @@ func (v *V) HTTPServeMux() *http.ServeMux {
|
||||
return v.mux
|
||||
}
|
||||
|
||||
// PubSub returns the configured PubSub backend, or nil if none is set.
|
||||
func (v *V) PubSub() PubSub {
|
||||
return v.pubsub
|
||||
}
|
||||
|
||||
// Static serves files from a filesystem directory at the given URL prefix.
|
||||
//
|
||||
// Example:
|
||||
@@ -554,6 +570,7 @@ type patchType int
|
||||
|
||||
const (
|
||||
patchTypeElements = iota
|
||||
patchTypeElementsWithVT
|
||||
patchTypeSignals
|
||||
patchTypeScript
|
||||
patchTypeRedirect
|
||||
@@ -574,6 +591,7 @@ func New() *V {
|
||||
logger: newConsoleLogger(zerolog.InfoLevel),
|
||||
contextRegistry: make(map[string]*Context),
|
||||
devModePageInitFnMap: make(map[string]func(*Context)),
|
||||
pageRegistry: make(map[string]func(*Context)),
|
||||
sessionManager: scs.New(),
|
||||
datastarPath: "/_datastar.js",
|
||||
datastarContent: datastarJS,
|
||||
@@ -624,11 +642,16 @@ func New() *V {
|
||||
switch patch.typ {
|
||||
case patchTypeElements:
|
||||
if err := sse.PatchElements(patch.content); err != nil {
|
||||
// Only log if connection wasn't closed (avoids noise during shutdown/tests)
|
||||
if sse.Context().Err() == nil {
|
||||
v.logErr(c, "PatchElements failed: %v", err)
|
||||
}
|
||||
}
|
||||
case patchTypeElementsWithVT:
|
||||
if err := sse.PatchElements(patch.content, datastar.WithViewTransitions()); err != nil {
|
||||
if sse.Context().Err() == nil {
|
||||
v.logErr(c, "PatchElements (view transition) failed: %v", err)
|
||||
}
|
||||
}
|
||||
case patchTypeSignals:
|
||||
if err := sse.PatchSignals([]byte(patch.content)); err != nil {
|
||||
if sse.Context().Err() == nil {
|
||||
@@ -708,6 +731,39 @@ func New() *V {
|
||||
}
|
||||
})
|
||||
|
||||
v.mux.HandleFunc("POST /_navigate", func(w http.ResponseWriter, r *http.Request) {
|
||||
_ = r.ParseForm()
|
||||
cID := r.FormValue("via-ctx")
|
||||
csrfToken := r.FormValue("via-csrf")
|
||||
navURL := r.FormValue("url")
|
||||
popstate := r.FormValue("popstate") == "1"
|
||||
|
||||
if cID == "" || navURL == "" || !strings.HasPrefix(navURL, "/") {
|
||||
http.Error(w, "missing or invalid parameters", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
c, err := v.getCtx(cID)
|
||||
if err != nil {
|
||||
v.logErr(nil, "navigate failed: %v", err)
|
||||
http.Error(w, "context not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if subtle.ConstantTimeCompare([]byte(csrfToken), []byte(c.csrfToken)) != 1 {
|
||||
v.logWarn(c, "navigate rejected: invalid CSRF token")
|
||||
http.Error(w, "invalid CSRF token", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if c.actionLimiter != nil && !c.actionLimiter.Allow() {
|
||||
v.logWarn(c, "navigate rate limited")
|
||||
http.Error(w, "rate limited", http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
c.reqCtx = r.Context()
|
||||
v.logDebug(c, "SPA navigate to %s", navURL)
|
||||
c.Navigate(navURL, popstate)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
v.mux.HandleFunc("POST /_session/close", func(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
@@ -725,6 +781,15 @@ func New() *V {
|
||||
v.logDebug(c, "session close event triggered")
|
||||
v.cleanupCtx(c)
|
||||
})
|
||||
|
||||
dn, err := getSharedNATS()
|
||||
if err != nil {
|
||||
v.logWarn(nil, "embedded NATS unavailable: %v", err)
|
||||
} else {
|
||||
v.defaultNATS = dn
|
||||
v.pubsub = &natsRef{dn: dn}
|
||||
}
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -757,3 +822,19 @@ func extractParams(pattern, path string) map[string]string {
|
||||
}
|
||||
return params
|
||||
}
|
||||
|
||||
// matchRoute finds the registered page init function and extracted params for the given path.
|
||||
func (v *V) matchRoute(path string) (route string, initFn func(*Context), params map[string]string) {
|
||||
for pattern, fn := range v.pageRegistry {
|
||||
if p := extractParams(pattern, path); p != nil {
|
||||
return pattern, fn, p
|
||||
}
|
||||
}
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
// Layout sets a layout function that wraps every page's view.
|
||||
// The layout receives the page content as a function and returns the full view.
|
||||
func (v *V) Layout(f func(func() h.H) h.H) {
|
||||
v.layout = f
|
||||
}
|
||||
|
||||
@@ -1,127 +0,0 @@
|
||||
// Package vianats provides an embedded NATS server with JetStream as a
|
||||
// pub/sub backend for Via applications.
|
||||
package vianats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/delaneyj/toolbelt/embeddednats"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/ryanhamamura/via"
|
||||
)
|
||||
|
||||
// NATS implements via.PubSub using an embedded NATS server with JetStream.
|
||||
type NATS struct {
|
||||
server *embeddednats.Server
|
||||
nc *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
}
|
||||
|
||||
// New starts an embedded NATS server with JetStream enabled and returns a
|
||||
// ready-to-use NATS instance. The server stores data in dataDir and shuts
|
||||
// down when ctx is cancelled.
|
||||
func New(ctx context.Context, dataDir string) (*NATS, error) {
|
||||
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("vianats: start server: %w", err)
|
||||
}
|
||||
ns.WaitForServer()
|
||||
|
||||
nc, err := ns.Client()
|
||||
if err != nil {
|
||||
ns.Close()
|
||||
return nil, fmt.Errorf("vianats: connect client: %w", err)
|
||||
}
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
ns.Close()
|
||||
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
|
||||
}
|
||||
|
||||
return &NATS{server: ns, nc: nc, js: js}, nil
|
||||
}
|
||||
|
||||
// Publish sends data to the given subject using core NATS publish.
|
||||
// JetStream captures messages automatically if a matching stream exists.
|
||||
func (n *NATS) Publish(subject string, data []byte) error {
|
||||
return n.nc.Publish(subject, data)
|
||||
}
|
||||
|
||||
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
|
||||
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
|
||||
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||
handler(msg.Data)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// Close shuts down the client connection and embedded server.
|
||||
func (n *NATS) Close() error {
|
||||
n.nc.Close()
|
||||
return n.server.Close()
|
||||
}
|
||||
|
||||
// Conn returns the underlying NATS connection for advanced usage.
|
||||
func (n *NATS) Conn() *nats.Conn {
|
||||
return n.nc
|
||||
}
|
||||
|
||||
// JetStream returns the JetStream context for stream configuration and replay.
|
||||
func (n *NATS) JetStream() nats.JetStreamContext {
|
||||
return n.js
|
||||
}
|
||||
|
||||
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
||||
type StreamConfig struct {
|
||||
Name string
|
||||
Subjects []string
|
||||
MaxMsgs int64
|
||||
MaxAge time.Duration
|
||||
}
|
||||
|
||||
// EnsureStream creates or updates a JetStream stream matching cfg.
|
||||
func EnsureStream(n *NATS, cfg StreamConfig) error {
|
||||
_, err := n.js.AddStream(&nats.StreamConfig{
|
||||
Name: cfg.Name,
|
||||
Subjects: cfg.Subjects,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxMsgs: cfg.MaxMsgs,
|
||||
MaxAge: cfg.MaxAge,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// ReplayHistory fetches the last limit messages from subject,
|
||||
// deserializing each as T. Returns an empty slice if nothing is available.
|
||||
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
|
||||
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
var msgs []T
|
||||
for {
|
||||
raw, err := sub.NextMsg(200 * time.Millisecond)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
var msg T
|
||||
if json.Unmarshal(raw.Data, &msg) == nil {
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
}
|
||||
|
||||
if limit > 0 && len(msgs) > limit {
|
||||
msgs = msgs[len(msgs)-limit:]
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
Reference in New Issue
Block a user