2 Commits

Author SHA1 Message Date
Ryan Hamamura
2310e45d35 feat: auto-start embedded NATS server in New()
Pub/sub now works out of the box — New() starts a process-scoped
embedded NATS server with JetStream. The PubSub interface remains
for custom backends via Config(Options{PubSub: ...}).

- Move vianats functionality into nats.go (eliminates circular import)
- Add NATSConn(), JetStream(), EnsureStream(), ReplayHistory[T]() to via
- Delete vianats/ package
- Simplify nats-chatroom and pubsub-crud examples
- Rewrite pubsub tests to use real embedded NATS
2026-02-12 08:54:44 -10:00
Ryan Hamamura
10b4838f8d feat: auto-track fields on context for zero-arg ValidateAll/ResetFields
Fields created via Context.Field are now tracked on the page context,
so ValidateAll() and ResetFields() with no arguments operate on all
fields by default. Explicit field args still work for selective use.

Also switches MinLen/MaxLen to utf8.RuneCountInString for correct
unicode handling and replaces fmt.Errorf with errors.New where
format strings are unnecessary.
2026-02-11 19:57:13 -10:00
13 changed files with 337 additions and 295 deletions

View File

@@ -32,6 +32,7 @@ type Context struct {
mu sync.RWMutex mu sync.RWMutex
ctxDisposedChan chan struct{} ctxDisposedChan chan struct{}
reqCtx context.Context reqCtx context.Context
fields []*Field
subscriptions []Subscription subscriptions []Subscription
subsMu sync.Mutex subsMu sync.Mutex
disposeOnce sync.Once disposeOnce sync.Once
@@ -482,16 +483,28 @@ func (c *Context) unsubscribeAll() {
// Field creates a signal with validation rules attached. // Field creates a signal with validation rules attached.
// The initial value seeds both the signal and the reset target. // The initial value seeds both the signal and the reset target.
// The field is tracked on the context so ValidateAll/ResetFields
// can operate on all fields by default.
func (c *Context) Field(initial any, rules ...Rule) *Field { func (c *Context) Field(initial any, rules ...Rule) *Field {
return &Field{ f := &Field{
signal: c.Signal(initial), signal: c.Signal(initial),
rules: rules, rules: rules,
initialVal: initial, initialVal: initial,
} }
target := c
if c.isComponent() {
target = c.parentPageCtx
}
target.fields = append(target.fields, f)
return f
} }
// ValidateAll runs Validate on every field, returning true only if all pass. // ValidateAll runs Validate on each field, returning true only if all pass.
// With no arguments it validates every field tracked on this context.
func (c *Context) ValidateAll(fields ...*Field) bool { func (c *Context) ValidateAll(fields ...*Field) bool {
if len(fields) == 0 {
fields = c.fields
}
ok := true ok := true
for _, f := range fields { for _, f := range fields {
if !f.Validate() { if !f.Validate() {
@@ -501,8 +514,12 @@ func (c *Context) ValidateAll(fields ...*Field) bool {
return ok return ok
} }
// ResetFields resets every field to its initial value and clears errors. // ResetFields resets each field to its initial value and clears errors.
// With no arguments it resets every field tracked on this context.
func (c *Context) ResetFields(fields ...*Field) { func (c *Context) ResetFields(fields ...*Field) {
if len(fields) == 0 {
fields = c.fields
}
for _, f := range fields { for _, f := range fields {
f.Reset() f.Reset()
} }

View File

@@ -96,27 +96,22 @@ func TestFieldReset(t *testing.T) {
func TestValidateAll(t *testing.T) { func TestValidateAll(t *testing.T) {
v := New() v := New()
var username, email *Field
v.Page("/", func(c *Context) { v.Page("/", func(c *Context) {
username = c.Field("", Required(), MinLen(3)) c.Field("", Required(), MinLen(3))
email = c.Field("", Required(), Email()) c.Field("", Required(), Email())
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
})
// both empty → both fail // both empty → both fail
assert.False(t, false) // sanity assert.False(t, c.ValidateAll())
ok := username.Validate() && email.Validate() })
assert.False(t, ok)
// simulate ValidateAll via context
v2 := New() v2 := New()
var u2, e2 *Field
v2.Page("/", func(c *Context) { v2.Page("/", func(c *Context) {
u2 = c.Field("joe", Required(), MinLen(3)) c.Field("joe", Required(), MinLen(3))
e2 = c.Field("joe@x.com", Required(), Email()) c.Field("joe@x.com", Required(), Email())
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
assert.True(t, c.ValidateAll(u2, e2)) assert.True(t, c.ValidateAll())
}) })
} }
@@ -127,14 +122,30 @@ func TestValidateAllPartialFailure(t *testing.T) {
bad := c.Field("", Required()) bad := c.Field("", Required())
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
// ValidateAll must run ALL fields even if first passes ok := c.ValidateAll()
ok := c.ValidateAll(good, bad)
assert.False(t, ok) assert.False(t, ok)
assert.False(t, good.HasError()) assert.False(t, good.HasError())
assert.True(t, bad.HasError()) assert.True(t, bad.HasError())
}) })
} }
func TestValidateAllSelectiveArgs(t *testing.T) {
v := New()
v.Page("/", func(c *Context) {
a := c.Field("", Required())
b := c.Field("ok", Required())
cField := c.Field("", Required())
c.View(func() h.H { return h.Div() })
// only validate a and b — cField should be untouched
ok := c.ValidateAll(a, b)
assert.False(t, ok)
assert.True(t, a.HasError())
assert.False(t, b.HasError())
assert.False(t, cField.HasError(), "unselected field should not be validated")
})
}
func TestResetFields(t *testing.T) { func TestResetFields(t *testing.T) {
v := New() v := New()
v.Page("/", func(c *Context) { v.Page("/", func(c *Context) {
@@ -146,13 +157,30 @@ func TestResetFields(t *testing.T) {
b.SetValue("changed-b") b.SetValue("changed-b")
a.AddError("err") a.AddError("err")
c.ResetFields(a, b) c.ResetFields()
assert.Equal(t, "a", a.String()) assert.Equal(t, "a", a.String())
assert.Equal(t, "b", b.String()) assert.Equal(t, "b", b.String())
assert.False(t, a.HasError()) assert.False(t, a.HasError())
}) })
} }
func TestResetFieldsSelectiveArgs(t *testing.T) {
v := New()
v.Page("/", func(c *Context) {
a := c.Field("a")
b := c.Field("b")
c.View(func() h.H { return h.Div() })
a.SetValue("changed-a")
b.SetValue("changed-b")
// only reset a
c.ResetFields(a)
assert.Equal(t, "a", a.String())
assert.Equal(t, "changed-b", b.String(), "unselected field should not be reset")
})
}
func TestFieldValidateClearsPreviousErrors(t *testing.T) { func TestFieldValidateClearsPreviousErrors(t *testing.T) {
f := newTestField("", Required()) f := newTestField("", Required())
f.Validate() f.Validate()

1
go.mod
View File

@@ -38,6 +38,5 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.45.0 // indirect golang.org/x/crypto v0.45.0 // indirect
golang.org/x/sys v0.38.0 // indirect golang.org/x/sys v0.38.0 // indirect
golang.org/x/time v0.14.0
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

View File

@@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"log" "log"
"math/rand" "math/rand"
"sync" "sync"
@@ -9,7 +8,6 @@ import (
"github.com/ryanhamamura/via" "github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
) )
var ( var (
@@ -36,15 +34,15 @@ func (u *UserInfo) Avatar() h.H {
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"} var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
func main() { func main() {
ctx := context.Background() v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
})
ps, err := vianats.New(ctx, "./data/nats") err := via.EnsureStream(v, via.StreamConfig{
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "CHAT", Name: "CHAT",
Subjects: []string{"chat.>"}, Subjects: []string{"chat.>"},
MaxMsgs: 1000, MaxMsgs: 1000,
@@ -54,15 +52,6 @@ func main() {
log.Fatalf("Failed to ensure stream: %v", err) log.Fatalf("Failed to ensure stream: %v", err)
} }
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
h.StyleEl(h.Raw(` h.StyleEl(h.Raw(`
@@ -148,7 +137,7 @@ func main() {
subject := "chat.room." + room subject := "chat.room." + room
// Replay history from JetStream // Replay history from JetStream
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil { if hist, err := via.ReplayHistory[ChatMessage](v, subject, 50); err == nil {
messages = hist messages = hist
} }

View File

@@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"html" "html"
@@ -11,7 +10,6 @@ import (
"github.com/ryanhamamura/via" "github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
) )
var WithSignal = via.WithSignal var WithSignal = via.WithSignal
@@ -49,15 +47,15 @@ func findBookmark(id string) (Bookmark, int) {
} }
func main() { func main() {
ctx := context.Background() v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
})
ps, err := vianats.New(ctx, "./data/nats") err := via.EnsureStream(v, via.StreamConfig{
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "BOOKMARKS", Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"}, Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000, MaxMsgs: 1000,
@@ -67,15 +65,6 @@ func main() {
log.Fatalf("Failed to ensure stream: %v", err) log.Fatalf("Failed to ensure stream: %v", err)
} }
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead( v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")), h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
h.Script(h.Src("https://cdn.tailwindcss.com")), h.Script(h.Src("https://cdn.tailwindcss.com")),

View File

@@ -26,13 +26,13 @@ func main() {
email := c.Field("", via.Required(), via.Email()) email := c.Field("", via.Required(), via.Email())
age := c.Field("", via.Required(), via.Min(13), via.Max(120)) age := c.Field("", via.Required(), via.Min(13), via.Max(120))
// Optional field — only validated when non-empty // Optional field — only validated when non-empty
website := c.Field("", via.Custom(func(val string) error { return nil }), via.Pattern(`^$|^https?://\S+$`, "Must be a valid URL")) website := c.Field("", via.Pattern(`^$|^https?://\S+$`, "Must be a valid URL"))
var success string var success string
signup := c.Action(func() { signup := c.Action(func() {
success = "" success = ""
if !c.ValidateAll(username, email, age, website) { if !c.ValidateAll() {
c.Sync() c.Sync()
return return
} }
@@ -43,13 +43,13 @@ func main() {
return return
} }
success = "Account created for " + username.String() + "!" success = "Account created for " + username.String() + "!"
c.ResetFields(username, email, age, website) c.ResetFields()
c.Sync() c.Sync()
}) })
reset := c.Action(func() { reset := c.Action(func() {
success = "" success = ""
c.ResetFields(username, email, age, website) c.ResetFields()
c.Sync() c.Sync()
}) })

190
nats.go Normal file
View File

@@ -0,0 +1,190 @@
package via
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
)
// defaultNATS is the process-scoped embedded NATS server.
type defaultNATS struct {
server *embeddednats.Server
nc *nats.Conn
js nats.JetStreamContext
cancel context.CancelFunc
dataDir string
}
var (
sharedNATS *defaultNATS
sharedNATSOnce sync.Once
sharedNATSErr error
)
// getSharedNATS returns a process-level singleton embedded NATS server.
// The server starts once and is reused across all V instances.
func getSharedNATS() (*defaultNATS, error) {
sharedNATSOnce.Do(func() {
sharedNATS, sharedNATSErr = startDefaultNATS()
})
return sharedNATS, sharedNATSErr
}
func startDefaultNATS() (dn *defaultNATS, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("nats server panic: %v", r)
}
}()
dataDir, err := os.MkdirTemp("", "via-nats-*")
if err != nil {
return nil, fmt.Errorf("create temp dir: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
if err != nil {
cancel()
os.RemoveAll(dataDir)
return nil, fmt.Errorf("start embedded nats: %w", err)
}
ns.WaitForServer()
nc, err := ns.Client()
if err != nil {
ns.Close()
cancel()
os.RemoveAll(dataDir)
return nil, fmt.Errorf("connect nats client: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
ns.Close()
cancel()
os.RemoveAll(dataDir)
return nil, fmt.Errorf("init jetstream: %w", err)
}
return &defaultNATS{
server: ns,
nc: nc,
js: js,
cancel: cancel,
dataDir: dataDir,
}, nil
}
func (n *defaultNATS) Publish(subject string, data []byte) error {
return n.nc.Publish(subject, data)
}
func (n *defaultNATS) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
if err != nil {
return nil, err
}
return sub, nil
}
// natsRef wraps a shared defaultNATS as a PubSub. Close is a no-op because
// the underlying server is process-scoped and outlives individual V instances.
type natsRef struct {
dn *defaultNATS
}
func (r *natsRef) Publish(subject string, data []byte) error {
return r.dn.Publish(subject, data)
}
func (r *natsRef) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
return r.dn.Subscribe(subject, handler)
}
func (r *natsRef) Close() error {
return nil
}
// NATSConn returns the underlying NATS connection from the built-in embedded
// server, or nil if a custom PubSub backend is in use.
func (v *V) NATSConn() *nats.Conn {
if v.defaultNATS != nil {
return v.defaultNATS.nc
}
return nil
}
// JetStream returns the JetStream context from the built-in embedded server,
// or nil if a custom PubSub backend is in use.
func (v *V) JetStream() nats.JetStreamContext {
if v.defaultNATS != nil {
return v.defaultNATS.js
}
return nil
}
// StreamConfig holds the parameters for creating or updating a JetStream stream.
type StreamConfig struct {
Name string
Subjects []string
MaxMsgs int64
MaxAge time.Duration
}
// EnsureStream creates or updates a JetStream stream matching cfg.
func EnsureStream(v *V, cfg StreamConfig) error {
js := v.JetStream()
if js == nil {
return fmt.Errorf("jetstream not available")
}
_, err := js.AddStream(&nats.StreamConfig{
Name: cfg.Name,
Subjects: cfg.Subjects,
Retention: nats.LimitsPolicy,
MaxMsgs: cfg.MaxMsgs,
MaxAge: cfg.MaxAge,
})
return err
}
// ReplayHistory fetches the last limit messages from subject,
// deserializing each as T. Returns an empty slice if nothing is available.
func ReplayHistory[T any](v *V, subject string, limit int) ([]T, error) {
js := v.JetStream()
if js == nil {
return nil, fmt.Errorf("jetstream not available")
}
sub, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
var msgs []T
for {
raw, err := sub.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var msg T
if json.Unmarshal(raw.Data, &msg) == nil {
msgs = append(msgs, msg)
}
}
if limit > 0 && len(msgs) > limit {
msgs = msgs[len(msgs)-limit:]
}
return msgs, nil
}

View File

@@ -2,7 +2,6 @@ package via
import ( import (
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -11,88 +10,36 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type mockHandler struct {
id int64
fn func([]byte)
active atomic.Bool
}
// mockPubSub implements PubSub for testing without NATS.
type mockPubSub struct {
mu sync.Mutex
subs map[string][]*mockHandler
nextID atomic.Int64
}
func newMockPubSub() *mockPubSub {
return &mockPubSub{subs: make(map[string][]*mockHandler)}
}
func (m *mockPubSub) Publish(subject string, data []byte) error {
m.mu.Lock()
handlers := make([]*mockHandler, len(m.subs[subject]))
copy(handlers, m.subs[subject])
m.mu.Unlock()
for _, h := range handlers {
if h.active.Load() {
h.fn(data)
}
}
return nil
}
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
m.mu.Lock()
defer m.mu.Unlock()
mh := &mockHandler{
id: m.nextID.Add(1),
fn: handler,
}
mh.active.Store(true)
m.subs[subject] = append(m.subs[subject], mh)
return &mockSub{handler: mh}, nil
}
func (m *mockPubSub) Close() error { return nil }
type mockSub struct {
handler *mockHandler
}
func (s *mockSub) Unsubscribe() error {
s.handler.active.Store(false)
return nil
}
func TestPubSub_RoundTrip(t *testing.T) { func TestPubSub_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
var received []byte var received []byte
var wg sync.WaitGroup done := make(chan struct{})
wg.Add(1)
c := newContext("test-ctx", "/", v) c := newContext("test-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
_, err := c.Subscribe("test.topic", func(data []byte) { _, err := c.Subscribe("test.topic", func(data []byte) {
received = data received = data
wg.Done() close(done)
}) })
require.NoError(t, err) require.NoError(t, err)
err = c.Publish("test.topic", []byte("hello")) err = c.Publish("test.topic", []byte("hello"))
require.NoError(t, err) require.NoError(t, err)
wg.Wait() select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for message")
}
assert.Equal(t, []byte("hello"), received) assert.Equal(t, []byte("hello"), received)
} }
func TestPubSub_MultipleSubscribers(t *testing.T) { func TestPubSub_MultipleSubscribers(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
var mu sync.Mutex var mu sync.Mutex
var results []string var results []string
@@ -119,7 +66,17 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
}) })
c1.Publish("broadcast", []byte("msg")) c1.Publish("broadcast", []byte("msg"))
done := make(chan struct{})
go func() {
wg.Wait() wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for messages")
}
assert.Len(t, results, 2) assert.Len(t, results, 2)
assert.Contains(t, results, "c1:msg") assert.Contains(t, results, "c1:msg")
@@ -127,9 +84,8 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
} }
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) { func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
c := newContext("cleanup-ctx", "/", v) c := newContext("cleanup-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
@@ -144,9 +100,8 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
} }
func TestPubSub_ManualUnsubscribe(t *testing.T) { func TestPubSub_ManualUnsubscribe(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
c := newContext("unsub-ctx", "/", v) c := newContext("unsub-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
@@ -160,28 +115,13 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) {
sub.Unsubscribe() sub.Unsubscribe()
c.Publish("topic", []byte("ignored")) c.Publish("topic", []byte("ignored"))
time.Sleep(10 * time.Millisecond) time.Sleep(50 * time.Millisecond)
assert.False(t, called) assert.False(t, called)
} }
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
v := New()
c := newContext("noop-ctx", "/", v)
c.View(func() h.H { return h.Div() })
err := c.Publish("topic", []byte("data"))
assert.Error(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.Error(t, err)
assert.Nil(t, sub)
}
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) { func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
// Panic-check context has id="" // Panic-check context has id=""
c := newContext("", "/", v) c := newContext("", "/", v)

View File

@@ -1,7 +1,8 @@
package via package via
// PubSub is an interface for publish/subscribe messaging backends. // PubSub is an interface for publish/subscribe messaging backends.
// The vianats sub-package provides an embedded NATS implementation. // By default, New() starts an embedded NATS server. Supply a custom
// implementation via Config(Options{PubSub: yourBackend}) to override.
type PubSub interface { type PubSub interface {
Publish(subject string, data []byte) error Publish(subject string, data []byte) error
Subscribe(subject string, handler func(data []byte)) (Subscription, error) Subscribe(subject string, handler func(data []byte)) (Subscription, error)

View File

@@ -1,8 +1,8 @@
package via package via
import ( import (
"sync"
"testing" "testing"
"time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -10,9 +10,8 @@ import (
) )
func TestPublishSubscribe_RoundTrip(t *testing.T) { func TestPublishSubscribe_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
type event struct { type event struct {
Name string `json:"name"` Name string `json:"name"`
@@ -20,30 +19,32 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) {
} }
var got event var got event
var wg sync.WaitGroup done := make(chan struct{})
wg.Add(1)
c := newContext("typed-ctx", "/", v) c := newContext("typed-ctx", "/", v)
c.View(func() h.H { return h.Div() }) c.View(func() h.H { return h.Div() })
_, err := Subscribe(c, "events", func(e event) { _, err := Subscribe(c, "events", func(e event) {
got = e got = e
wg.Done() close(done)
}) })
require.NoError(t, err) require.NoError(t, err)
err = Publish(c, "events", event{Name: "click", Count: 42}) err = Publish(c, "events", event{Name: "click", Count: 42})
require.NoError(t, err) require.NoError(t, err)
wg.Wait() select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for message")
}
assert.Equal(t, "click", got.Name) assert.Equal(t, "click", got.Name)
assert.Equal(t, 42, got.Count) assert.Equal(t, 42, got.Count)
} }
func TestSubscribe_SkipsBadJSON(t *testing.T) { func TestSubscribe_SkipsBadJSON(t *testing.T) {
ps := newMockPubSub()
v := New() v := New()
v.Config(Options{PubSub: ps}) defer v.Shutdown()
type msg struct { type msg struct {
Text string `json:"text"` Text string `json:"text"`
@@ -62,5 +63,6 @@ func TestSubscribe_SkipsBadJSON(t *testing.T) {
err = c.Publish("topic", []byte("not json")) err = c.Publish("topic", []byte("not json"))
require.NoError(t, err) require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
assert.False(t, called) assert.False(t, called)
} }

24
rule.go
View File

@@ -1,10 +1,12 @@
package via package via
import ( import (
"errors"
"fmt" "fmt"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"unicode/utf8"
) )
// Rule defines a single validation check for a Field. // Rule defines a single validation check for a Field.
@@ -20,7 +22,7 @@ func Required(msg ...string) Rule {
} }
return Rule{func(val string) error { return Rule{func(val string) error {
if strings.TrimSpace(val) == "" { if strings.TrimSpace(val) == "" {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -33,8 +35,8 @@ func MinLen(n int, msg ...string) Rule {
m = msg[0] m = msg[0]
} }
return Rule{func(val string) error { return Rule{func(val string) error {
if len(val) < n { if utf8.RuneCountInString(val) < n {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -47,8 +49,8 @@ func MaxLen(n int, msg ...string) Rule {
m = msg[0] m = msg[0]
} }
return Rule{func(val string) error { return Rule{func(val string) error {
if len(val) > n { if utf8.RuneCountInString(val) > n {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -63,10 +65,10 @@ func Min(n int, msg ...string) Rule {
return Rule{func(val string) error { return Rule{func(val string) error {
v, err := strconv.Atoi(val) v, err := strconv.Atoi(val)
if err != nil { if err != nil {
return fmt.Errorf("%s", m) return errors.New("Must be a valid number")
} }
if v < n { if v < n {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -81,10 +83,10 @@ func Max(n int, msg ...string) Rule {
return Rule{func(val string) error { return Rule{func(val string) error {
v, err := strconv.Atoi(val) v, err := strconv.Atoi(val)
if err != nil { if err != nil {
return fmt.Errorf("%s", m) return errors.New("Must be a valid number")
} }
if v > n { if v > n {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -99,7 +101,7 @@ func Pattern(re string, msg ...string) Rule {
compiled := regexp.MustCompile(re) compiled := regexp.MustCompile(re)
return Rule{func(val string) error { return Rule{func(val string) error {
if !compiled.MatchString(val) { if !compiled.MatchString(val) {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}
@@ -115,7 +117,7 @@ func Email(msg ...string) Rule {
} }
return Rule{func(val string) error { return Rule{func(val string) error {
if !emailRegexp.MatchString(val) { if !emailRegexp.MatchString(val) {
return fmt.Errorf("%s", m) return errors.New(m)
} }
return nil return nil
}} }}

12
via.go
View File

@@ -49,6 +49,7 @@ type V struct {
devModePageInitFnMap map[string]func(*Context) devModePageInitFnMap map[string]func(*Context)
sessionManager *scs.SessionManager sessionManager *scs.SessionManager
pubsub PubSub pubsub PubSub
defaultNATS *defaultNATS
actionRateLimit RateLimitConfig actionRateLimit RateLimitConfig
datastarPath string datastarPath string
datastarContent []byte datastarContent []byte
@@ -130,6 +131,7 @@ func (v *V) Config(cfg Options) {
v.datastarPath = cfg.DatastarPath v.datastarPath = cfg.DatastarPath
} }
if cfg.PubSub != nil { if cfg.PubSub != nil {
v.defaultNATS = nil
v.pubsub = cfg.PubSub v.pubsub = cfg.PubSub
} }
if cfg.ContextTTL != 0 { if cfg.ContextTTL != 0 {
@@ -379,6 +381,7 @@ func (v *V) Shutdown() {
v.logErr(nil, "pubsub close error: %v", err) v.logErr(nil, "pubsub close error: %v", err)
} }
} }
v.defaultNATS = nil
v.logInfo(nil, "shutdown complete") v.logInfo(nil, "shutdown complete")
} }
@@ -725,6 +728,15 @@ func New() *V {
v.logDebug(c, "session close event triggered") v.logDebug(c, "session close event triggered")
v.cleanupCtx(c) v.cleanupCtx(c)
}) })
dn, err := getSharedNATS()
if err != nil {
v.logWarn(nil, "embedded NATS unavailable: %v", err)
} else {
v.defaultNATS = dn
v.pubsub = &natsRef{dn: dn}
}
return v return v
} }

View File

@@ -1,127 +0,0 @@
// Package vianats provides an embedded NATS server with JetStream as a
// pub/sub backend for Via applications.
package vianats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
)
// NATS implements via.PubSub using an embedded NATS server with JetStream.
type NATS struct {
server *embeddednats.Server
nc *nats.Conn
js nats.JetStreamContext
}
// New starts an embedded NATS server with JetStream enabled and returns a
// ready-to-use NATS instance. The server stores data in dataDir and shuts
// down when ctx is cancelled.
func New(ctx context.Context, dataDir string) (*NATS, error) {
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
if err != nil {
return nil, fmt.Errorf("vianats: start server: %w", err)
}
ns.WaitForServer()
nc, err := ns.Client()
if err != nil {
ns.Close()
return nil, fmt.Errorf("vianats: connect client: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
ns.Close()
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
}
return &NATS{server: ns, nc: nc, js: js}, nil
}
// Publish sends data to the given subject using core NATS publish.
// JetStream captures messages automatically if a matching stream exists.
func (n *NATS) Publish(subject string, data []byte) error {
return n.nc.Publish(subject, data)
}
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
if err != nil {
return nil, err
}
return sub, nil
}
// Close shuts down the client connection and embedded server.
func (n *NATS) Close() error {
n.nc.Close()
return n.server.Close()
}
// Conn returns the underlying NATS connection for advanced usage.
func (n *NATS) Conn() *nats.Conn {
return n.nc
}
// JetStream returns the JetStream context for stream configuration and replay.
func (n *NATS) JetStream() nats.JetStreamContext {
return n.js
}
// StreamConfig holds the parameters for creating or updating a JetStream stream.
type StreamConfig struct {
Name string
Subjects []string
MaxMsgs int64
MaxAge time.Duration
}
// EnsureStream creates or updates a JetStream stream matching cfg.
func EnsureStream(n *NATS, cfg StreamConfig) error {
_, err := n.js.AddStream(&nats.StreamConfig{
Name: cfg.Name,
Subjects: cfg.Subjects,
Retention: nats.LimitsPolicy,
MaxMsgs: cfg.MaxMsgs,
MaxAge: cfg.MaxAge,
})
return err
}
// ReplayHistory fetches the last limit messages from subject,
// deserializing each as T. Returns an empty slice if nothing is available.
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
var msgs []T
for {
raw, err := sub.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var msg T
if json.Unmarshal(raw.Data, &msg) == nil {
msgs = append(msgs, msg)
}
}
if limit > 0 && len(msgs) > limit {
msgs = msgs[len(msgs)-limit:]
}
return msgs, nil
}