14 Commits

Author SHA1 Message Date
Ryan Hamamura
8aa91c577c feat: add event types OnSubmit, OnInput, OnFocus, OnBlur, OnMouseEnter, OnMouseLeave, OnScroll, OnDblClick 2026-02-06 10:54:27 -10:00
Ryan Hamamura
6dcd54c88b fix: clean up leaked contexts on SSE disconnect and add orphan reaper
When clients disconnect without beforeunload firing (network drops,
mobile kills, crashes), contexts leaked in the registry permanently.

- Extract cleanupCtx helper for dispose/unregister sequence
- Call cleanupCtx on SSE disconnect (sse.Context().Done())
- Add background reaper for contexts where SSE never connected
- Add ContextTTL config option (default 30s, negative disables)
- Fix inverted condition in devModeRemovePersisted
2026-02-06 10:34:28 -10:00
Ryan Hamamura
2c44671d0e feat: add generic pub/sub helpers and pubsub-crud example
Add typed Publish[T] and Subscribe[T] generic helpers that handle
JSON marshaling, along with vianats.EnsureStream and ReplayHistory
helpers. Refactor nats-chatroom to use the new APIs.

Add pubsub-crud example demonstrating CRUD operations with DaisyUI
toast notifications broadcast to all connected clients via NATS.
2026-02-06 09:47:39 -10:00
Ryan Hamamura
53e5733100 feat: add keyboard grid example
8x8 grid game demonstrating OnKeyDownMap with WASD and arrow key
bindings. Arrow keys use WithPreventDefault to avoid page scrolling.
2026-02-02 08:58:03 -10:00
Ryan Hamamura
11543947bd feat: add OnKeyDownMap and WithWindow for combined key bindings
Add window-scoped keydown dispatching with per-key signal and
preventDefault options. Use comma operator instead of semicolons
in generated ternary expressions to produce valid JavaScript.
2026-02-02 08:57:59 -10:00
Ryan Hamamura
e79bb0e1b0 Revert "feat: add OnKeyDownMap and WithWindow for combined key bindings"
This reverts commit d1e8e3a2ed.
2026-02-02 08:27:07 -10:00
Ryan Hamamura
d1e8e3a2ed feat: add OnKeyDownMap and WithWindow for combined key bindings
OnKeyDownMap merges multiple key bindings into a single
data-on:keydown__window attribute via a JS ternary chain,
avoiding HTML attribute deduplication. WithWindow scopes
any keydown listener to the window object.
2026-02-02 08:23:06 -10:00
Ryan Hamamura
4a7acbb630 feat: add graceful shutdown with OS signal handling
Handle SIGINT/SIGTERM in Start() to cleanly drain all contexts,
stop goroutines, close SSE connections, and tear down PubSub.

Fix stopAllRoutines() to close() the channel instead of sending a
single value, so all listening goroutines are notified.
2026-01-31 09:22:43 -10:00
Ryan Hamamura
a7ace9099f feat: replace log with rs/zerolog for structured logging
Switch from the standard library log package to rs/zerolog with
ConsoleWriter for colorful terminal output in dev mode and JSON
output in production. Users can now provide their own logger via
Options.Logger or set the level via Options.LogLevel.
2026-01-31 08:18:24 -10:00
Ryan Hamamura
d8318af9c4 feat: add JetStream message replay to chatroom example
Replay stored messages from JetStream when joining or switching rooms
so users see chat history immediately.
2026-01-26 08:10:30 -10:00
Ryan Hamamura
30cc6d88e6 feat: add embedded NATS pub/sub support on Context
Define PubSub and Subscription interfaces in the core via package with
a vianats sub-package providing the embedded NATS + JetStream
implementation. Expose c.Publish() and c.Subscribe() on Context with
automatic subscription cleanup on session close. Refactor the NATS
chatroom example to use the built-in methods instead of manual
subscription tracking.
2026-01-26 08:06:50 -10:00
Ryan Hamamura
88bd0f31df feat: add NATS chatroom example with embedded server
Demonstrates pub/sub messaging as an alternative to custom Rooms
implementation. Uses delaneyj/toolbelt/embeddednats to run NATS
with JetStream inside the binary - no external server required.
2026-01-16 00:50:05 -10:00
Ryan Hamamura
82a3314089 feat: add SQLite session store support
Add NewSQLiteSessionManager helper that creates an SCS session manager
backed by SQLite, allowing sessions to persist across server restarts.
The function handles table creation automatically.
2026-01-15 08:44:27 -10:00
Ryan Hamamura
73f4e4009b Always sync full state when SSE connects
Previously only called Sync() on SSE reconnect (detected via last-event-id
header). This caused issues when application code registered contexts for
updates before the SSE connection was established - patches sent to
patchChan could be dropped.

Now always call Sync() when SSE connects, ensuring clients receive the
full current state regardless of what happened before the connection
was established.

Fixes #2
2026-01-14 19:02:44 -10:00
23 changed files with 1970 additions and 105 deletions

4
.gitignore vendored
View File

@@ -47,3 +47,7 @@ internal/examples/picocss/picocss
internal/examples/plugins/plugins internal/examples/plugins/plugins
internal/examples/realtimechart/realtimechart internal/examples/realtimechart/realtimechart
internal/examples/shakespeare/shakespeare internal/examples/shakespeare/shakespeare
internal/examples/nats-chatroom/nats-chatroom
# NATS data directory
data/

View File

@@ -21,6 +21,8 @@ type triggerOpts struct {
hasSignal bool hasSignal bool
signalID string signalID string
value string value string
window bool
preventDefault bool
} }
type withSignalOpt struct { type withSignalOpt struct {
@@ -34,6 +36,28 @@ func (o withSignalOpt) apply(opts *triggerOpts) {
opts.value = o.value opts.value = o.value
} }
type withWindowOpt struct{}
func (o withWindowOpt) apply(opts *triggerOpts) {
opts.window = true
}
// WithWindow makes the event listener attach to the window instead of the element.
func WithWindow() ActionTriggerOption {
return withWindowOpt{}
}
type withPreventDefaultOpt struct{}
func (o withPreventDefaultOpt) apply(opts *triggerOpts) {
opts.preventDefault = true
}
// WithPreventDefault calls evt.preventDefault() for matched keys.
func WithPreventDefault() ActionTriggerOption {
return withPreventDefaultOpt{}
}
// WithSignal sets a signal value before triggering the action. // WithSignal sets a signal value before triggering the action.
func WithSignal(sig *signal, value string) ActionTriggerOption { func WithSignal(sig *signal, value string) ActionTriggerOption {
return withSignalOpt{ return withSignalOpt{
@@ -54,7 +78,7 @@ func buildOnExpr(base string, opts *triggerOpts) string {
if !opts.hasSignal { if !opts.hasSignal {
return base return base
} }
return fmt.Sprintf("$%s=%s;%s", opts.signalID, opts.value, base) return fmt.Sprintf("$%s=%s,%s", opts.signalID, opts.value, base)
} }
func applyOptions(options ...ActionTriggerOption) triggerOpts { func applyOptions(options ...ActionTriggerOption) triggerOpts {
@@ -83,6 +107,54 @@ func (a *actionTrigger) OnChange(options ...ActionTriggerOption) h.H {
return h.Data("on:change__debounce.200ms", buildOnExpr(actionURL(a.id), &opts)) return h.Data("on:change__debounce.200ms", buildOnExpr(actionURL(a.id), &opts))
} }
// OnSubmit returns a via.h DOM attribute that triggers on form submit.
func (a *actionTrigger) OnSubmit(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:submit", buildOnExpr(actionURL(a.id), &opts))
}
// OnInput returns a via.h DOM attribute that triggers on input (without debounce).
func (a *actionTrigger) OnInput(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:input", buildOnExpr(actionURL(a.id), &opts))
}
// OnFocus returns a via.h DOM attribute that triggers when the element gains focus.
func (a *actionTrigger) OnFocus(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:focus", buildOnExpr(actionURL(a.id), &opts))
}
// OnBlur returns a via.h DOM attribute that triggers when the element loses focus.
func (a *actionTrigger) OnBlur(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:blur", buildOnExpr(actionURL(a.id), &opts))
}
// OnMouseEnter returns a via.h DOM attribute that triggers when the mouse enters the element.
func (a *actionTrigger) OnMouseEnter(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:mouseenter", buildOnExpr(actionURL(a.id), &opts))
}
// OnMouseLeave returns a via.h DOM attribute that triggers when the mouse leaves the element.
func (a *actionTrigger) OnMouseLeave(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:mouseleave", buildOnExpr(actionURL(a.id), &opts))
}
// OnScroll returns a via.h DOM attribute that triggers on scroll.
func (a *actionTrigger) OnScroll(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:scroll", buildOnExpr(actionURL(a.id), &opts))
}
// OnDblClick returns a via.h DOM attribute that triggers on double click.
func (a *actionTrigger) OnDblClick(options ...ActionTriggerOption) h.H {
opts := applyOptions(options...)
return h.Data("on:dblclick", buildOnExpr(actionURL(a.id), &opts))
}
// OnKeyDown returns a via.h DOM attribute that triggers when a key is pressed. // OnKeyDown returns a via.h DOM attribute that triggers when a key is pressed.
// key: optional, see https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key // key: optional, see https://developer.mozilla.org/en-US/docs/Web/API/KeyboardEvent/key
// Example: OnKeyDown("Enter") // Example: OnKeyDown("Enter")
@@ -92,5 +164,49 @@ func (a *actionTrigger) OnKeyDown(key string, options ...ActionTriggerOption) h.
if key != "" { if key != "" {
condition = fmt.Sprintf("evt.key==='%s' &&", key) condition = fmt.Sprintf("evt.key==='%s' &&", key)
} }
return h.Data("on:keydown", fmt.Sprintf("%s%s", condition, buildOnExpr(actionURL(a.id), &opts))) attrName := "on:keydown"
if opts.window {
attrName = "on:keydown__window"
}
return h.Data(attrName, fmt.Sprintf("%s%s", condition, buildOnExpr(actionURL(a.id), &opts)))
}
// KeyBinding pairs a key with an action and per-binding options.
type KeyBinding struct {
Key string
Action *actionTrigger
Options []ActionTriggerOption
}
// KeyBind creates a KeyBinding for use with OnKeyDownMap.
func KeyBind(key string, action *actionTrigger, options ...ActionTriggerOption) KeyBinding {
return KeyBinding{Key: key, Action: action, Options: options}
}
// OnKeyDownMap produces a single window-scoped keydown attribute that dispatches
// to different actions based on the pressed key. Each binding can reference a
// different action and carry its own signal/preventDefault options.
func OnKeyDownMap(bindings ...KeyBinding) h.H {
if len(bindings) == 0 {
return nil
}
expr := ""
for i, b := range bindings {
opts := applyOptions(b.Options...)
branch := ""
if opts.preventDefault {
branch = "evt.preventDefault(),"
}
branch += buildOnExpr(actionURL(b.Action.id), &opts)
if i > 0 {
expr += " : "
}
expr += fmt.Sprintf("evt.key==='%s' ? (%s)", b.Key, branch)
}
expr += " : void 0"
return h.Data("on:keydown__window", expr)
} }

View File

@@ -1,15 +1,19 @@
package via package via
import "github.com/alexedwards/scs/v2" import (
"time"
type LogLevel int "github.com/alexedwards/scs/v2"
"github.com/rs/zerolog"
)
const ( func ptr(l zerolog.Level) *zerolog.Level { return &l }
undefined LogLevel = iota
LogLevelError var (
LogLevelWarn LogLevelDebug = ptr(zerolog.DebugLevel)
LogLevelInfo LogLevelInfo = ptr(zerolog.InfoLevel)
LogLevelDebug LogLevelWarn = ptr(zerolog.WarnLevel)
LogLevelError = ptr(zerolog.ErrorLevel)
) )
// Plugin is a func that can mutate the given *via.V app runtime. It is useful to integrate popular JS/CSS UI libraries or tools. // Plugin is a func that can mutate the given *via.V app runtime. It is useful to integrate popular JS/CSS UI libraries or tools.
@@ -23,9 +27,12 @@ type Options struct {
// The http server address. e.g. ':3000' // The http server address. e.g. ':3000'
ServerAddress string ServerAddress string
// Level of the logs to write to stdout. // LogLevel sets the minimum log level. nil keeps the default (Info).
// Options: Error, Warn, Info, Debug. LogLevel *zerolog.Level
LogLvl LogLevel
// Logger overrides the default logger entirely. When set, LogLevel and
// DevMode have no effect on logging.
Logger *zerolog.Logger
// The title of the HTML document. // The title of the HTML document.
DocumentTitle string DocumentTitle string
@@ -45,4 +52,13 @@ type Options struct {
// DatastarPath is the URL path where the script is served. // DatastarPath is the URL path where the script is served.
// Defaults to "/_datastar.js" if empty. // Defaults to "/_datastar.js" if empty.
DatastarPath string DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
// embedded NATS backend, or supply any PubSub implementation.
PubSub PubSub
// ContextTTL is the maximum time a context may exist without an SSE
// connection before the background reaper disposes it.
// Default: 30s. Negative value disables the reaper.
ContextTTL time.Duration
} }

View File

@@ -5,10 +5,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"maps" "maps"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
@@ -31,6 +31,11 @@ type Context struct {
mu sync.RWMutex mu sync.RWMutex
ctxDisposedChan chan struct{} ctxDisposedChan chan struct{}
reqCtx context.Context reqCtx context.Context
subscriptions []Subscription
subsMu sync.Mutex
disposeOnce sync.Once
createdAt time.Time
sseConnected atomic.Bool
} }
// View defines the UI rendered by this context. // View defines the UI rendered by this context.
@@ -349,11 +354,23 @@ func (c *Context) ReplaceURLf(format string, a ...any) {
c.ReplaceURL(fmt.Sprintf(format, a...)) c.ReplaceURL(fmt.Sprintf(format, a...))
} }
// stopAllRoutines stops all go routines tied to this Context preventing goroutine leaks. // dispose idempotently tears down this context: unsubscribes all pubsub
// subscriptions and closes ctxDisposedChan to stop routines and exit the SSE loop.
func (c *Context) dispose() {
c.disposeOnce.Do(func() {
c.unsubscribeAll()
c.stopAllRoutines()
})
}
// stopAllRoutines closes ctxDisposedChan, broadcasting to all listening
// goroutines (OnIntervalRoutine, SSE loop) that this context is done.
func (c *Context) stopAllRoutines() { func (c *Context) stopAllRoutines() {
select { select {
case c.ctxDisposedChan <- struct{}{}: case <-c.ctxDisposedChan:
// already closed
default: default:
close(c.ctxDisposedChan)
} }
} }
@@ -403,9 +420,58 @@ func (c *Context) Session() *Session {
} }
} }
// Publish sends data to the given subject via the configured PubSub backend.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Publish(subject string, data []byte) error {
if c.id == "" {
return nil
}
if c.app.pubsub == nil {
return fmt.Errorf("pubsub not configured")
}
return c.app.pubsub.Publish(subject, data)
}
// Subscribe creates a subscription on the configured PubSub backend.
// The subscription is tracked for automatic cleanup when the context is disposed.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
if c.id == "" {
return nil, nil
}
if c.app.pubsub == nil {
return nil, fmt.Errorf("pubsub not configured")
}
sub, err := c.app.pubsub.Subscribe(subject, handler)
if err != nil {
return nil, err
}
// Track on page context for cleanup (components use parent, like signals/actions)
target := c
if c.isComponent() {
target = c.parentPageCtx
}
target.subsMu.Lock()
target.subscriptions = append(target.subscriptions, sub)
target.subsMu.Unlock()
return sub, nil
}
// unsubscribeAll cleans up all tracked subscriptions for this context and its components.
func (c *Context) unsubscribeAll() {
c.subsMu.Lock()
subs := c.subscriptions
c.subscriptions = nil
c.subsMu.Unlock()
for _, sub := range subs {
sub.Unsubscribe()
}
}
func newContext(id string, route string, v *V) *Context { func newContext(id string, route string, v *V) *Context {
if v == nil { if v == nil {
log.Fatal("create context failed: app pointer is nil") panic("create context failed: app pointer is nil")
} }
return &Context{ return &Context{
@@ -418,5 +484,6 @@ func newContext(id string, route string, v *V) *Context {
signals: new(sync.Map), signals: new(sync.Map),
patchChan: make(chan patch, 1), patchChan: make(chan patch, 1),
ctxDisposedChan: make(chan struct{}, 1), ctxDisposedChan: make(chan struct{}, 1),
createdAt: time.Now(),
} }
} }

27
go.mod
View File

@@ -6,18 +6,37 @@ require maragu.dev/gomponents v1.2.0
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/alexedwards/scs/sqlite3store v0.0.0-20251002162104-209de6e426de
github.com/alexedwards/scs/v2 v2.9.0 github.com/alexedwards/scs/v2 v2.9.0
github.com/delaneyj/toolbelt v0.9.1
github.com/mattn/go-sqlite3 v1.14.32 github.com/mattn/go-sqlite3 v1.14.32
github.com/nats-io/nats.go v1.48.0
github.com/rs/zerolog v1.34.0
github.com/starfederation/datastar-go v1.0.3 github.com/starfederation/datastar-go v1.0.3
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.11.1
) )
require ( require (
github.com/CAFxX/httpcompression v0.0.9 // indirect github.com/CAFxX/httpcompression v0.0.9 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/antithesishq/antithesis-sdk-go v0.5.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/nats-io/nats-server/v2 v2.12.2 // indirect
github.com/nats-io/nkeys v0.4.12 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/time v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

69
go.sum
View File

@@ -2,27 +2,71 @@ github.com/CAFxX/httpcompression v0.0.9 h1:0ue2X8dOLEpxTm8tt+OdHcgA+gbDge0OqFQWG
github.com/CAFxX/httpcompression v0.0.9/go.mod h1:XX8oPZA+4IDcfZ0A71Hz0mZsv/YJOgYygkFhizVPilM= github.com/CAFxX/httpcompression v0.0.9/go.mod h1:XX8oPZA+4IDcfZ0A71Hz0mZsv/YJOgYygkFhizVPilM=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/alexedwards/scs/sqlite3store v0.0.0-20251002162104-209de6e426de h1:c72K9HLu6K442et0j3BUL/9HEYaUJouLkkVANdmqTOo=
github.com/alexedwards/scs/sqlite3store v0.0.0-20251002162104-209de6e426de/go.mod h1:Iyk7S76cxGaiEX/mSYmTZzYehp4KfyylcLaV3OnToss=
github.com/alexedwards/scs/v2 v2.9.0 h1:xa05mVpwTBm1iLeTMNFfAWpKUm4fXAW7CeAViqBVS90= github.com/alexedwards/scs/v2 v2.9.0 h1:xa05mVpwTBm1iLeTMNFfAWpKUm4fXAW7CeAViqBVS90=
github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8= github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/antithesishq/antithesis-sdk-go v0.5.0 h1:cudCFF83pDDANcXFzkQPUHHedfnnIbUO3JMr9fqwFJs=
github.com/antithesishq/antithesis-sdk-go v0.5.0/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/delaneyj/toolbelt v0.9.1 h1:QJComn2qoaQ4azl5uRkGpdHSO9e+JtoxDTXCiQHvH8o=
github.com/delaneyj/toolbelt v0.9.1/go.mod h1:eNXpPuThjTD4tpRNCBl4JEz9jdg9LpyzNuyG+stnIbs=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k=
github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.2 h1:4TEQd0Y4zvcW0IsVxjlXnRso1hBkQl3TS0BI+SxgPhE=
github.com/nats-io/nats-server/v2 v2.12.2/go.mod h1:j1AAttYeu7WnvD8HLJ+WWKNMSyxsqmZ160pNtCQRMyE=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/starfederation/datastar-go v1.0.3 h1:DnzgsJ6tDHDM6y5Nxsk0AGW/m8SyKch2vQg3P1xGTcU= github.com/starfederation/datastar-go v1.0.3 h1:DnzgsJ6tDHDM6y5Nxsk0AGW/m8SyKch2vQg3P1xGTcU=
github.com/starfederation/datastar-go v1.0.3/go.mod h1:stm83LQkhZkwa5GzzdPEN6dLuu8FVwxIv0w1DYkbD3w= github.com/starfederation/datastar-go v1.0.3/go.mod h1:stm83LQkhZkwa5GzzdPEN6dLuu8FVwxIv0w1DYkbD3w=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -31,8 +75,8 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
@@ -40,8 +84,19 @@ github.com/valyala/gozstd v1.20.1 h1:xPnnnvjmaDDitMFfDxmQ4vpx0+3CdTg2o3lALvXTU/g
github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/valyala/gozstd v1.20.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -22,7 +22,7 @@ func main() {
v.Config(via.Options{ v.Config(via.Options{
DevMode: true, DevMode: true,
DocumentTitle: "ViaChat", DocumentTitle: "ViaChat",
LogLvl: via.LogLevelInfo, LogLevel: via.LogLevelInfo,
}) })
v.AppendToHead( v.AppendToHead(

View File

@@ -0,0 +1,74 @@
package main
import (
"fmt"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
)
const gridSize = 8
func main() {
v := via.New()
v.Config(via.Options{DocumentTitle: "Keyboard", ServerAddress: ":7331"})
v.Page("/", func(c *via.Context) {
x, y := 0, 0
dir := c.Signal("")
move := c.Action(func() {
switch dir.String() {
case "up":
y = max(0, y-1)
case "down":
y = min(gridSize-1, y+1)
case "left":
x = max(0, x-1)
case "right":
x = min(gridSize-1, x+1)
}
c.Sync()
})
c.View(func() h.H {
var rows []h.H
for row := range gridSize {
var cells []h.H
for col := range gridSize {
bg := "#e0e0e0"
if col == x && row == y {
bg = "#4a90d9"
}
cells = append(cells, h.Div(
h.Attr("style", fmt.Sprintf(
"width:48px;height:48px;background:%s;border:1px solid #ccc;",
bg,
)),
))
}
rows = append(rows, h.Div(
append([]h.H{h.Attr("style", "display:flex;")}, cells...)...,
))
}
return h.Div(
h.H1(h.Text("Keyboard Grid")),
h.P(h.Text("Move with WASD or arrow keys")),
h.Div(rows...),
via.OnKeyDownMap(
via.KeyBind("w", move, via.WithSignal(dir, "up")),
via.KeyBind("a", move, via.WithSignal(dir, "left")),
via.KeyBind("s", move, via.WithSignal(dir, "down")),
via.KeyBind("d", move, via.WithSignal(dir, "right")),
via.KeyBind("ArrowUp", move, via.WithSignal(dir, "up"), via.WithPreventDefault()),
via.KeyBind("ArrowLeft", move, via.WithSignal(dir, "left"), via.WithPreventDefault()),
via.KeyBind("ArrowDown", move, via.WithSignal(dir, "down"), via.WithPreventDefault()),
via.KeyBind("ArrowRight", move, via.WithSignal(dir, "right"), via.WithPreventDefault()),
),
)
})
})
v.Start()
}

View File

@@ -14,7 +14,7 @@ func main() {
v.Config(via.Options{ v.Config(via.Options{
DocumentTitle: "Live Reload Demo", DocumentTitle: "Live Reload Demo",
DevMode: true, DevMode: true,
LogLvl: via.LogLevelDebug, LogLevel: via.LogLevelDebug,
Plugins: []via.Plugin{ Plugins: []via.Plugin{
// picocss.Default // picocss.Default
}, },

View File

@@ -0,0 +1,109 @@
# NATS Chatroom Example (Embedded)
A chatroom built with Via and an **embedded NATS server**, demonstrating pub/sub messaging as an alternative to the custom `Rooms` implementation in `../chatroom`.
Uses `delaneyj/toolbelt/embeddednats` to run NATS inside the same binary - no external server required.
## Key Differences from Original Chatroom
| Aspect | Original (`../chatroom`) | This Example |
|--------|-------------------------|--------------|
| Pub/sub | Custom `Rooms` struct (~160 lines) | NATS subjects |
| Member tracking | Manual `map[TU]Syncable` | NATS handles subscribers |
| Publish timing | Ticker every 100ms + dirty flag | Instant delivery |
| Durability | None (in-memory) | JetStream persists to disk |
| Multi-instance | Not supported | Works across server instances |
| External deps | None | **None** (NATS embedded in binary) |
## Run the Example
```bash
go run ./internal/examples/nats-chatroom
```
That's it. No separate NATS server needed.
Open multiple browser tabs at http://localhost:7331 to see messages broadcast across all clients.
## How Embedded NATS Works
```go
// Start embedded NATS server (JetStream enabled by default)
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
ns.WaitForServer()
// Get client connection to embedded server
nc, err := ns.Client()
```
Data is persisted to `./data/nats/` for JetStream durability.
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Single Binary │
│ │
│ Browser A Embedded NATS Browser B │
│ │ │ │ │
│ │-- Via Action ---> │ │ │
│ │ (Send msg) │ │ │
│ │ │ │ │
│ │ nc.Publish() │ │
│ │ "chat.room.Go" │ │
│ │ │ │ │
│ │<-- Subscribe -----|---- Subscribe --->│ │
│ │ callback │ callback │ │
│ │ │ │ │
│ │-- c.Sync() ------>│<--- c.Sync() -----| │
│ │ (SSE) │ (SSE) │ │
│ │
└─────────────────────────────────────────────────────────┘
```
## JetStream Durability
Messages persist to disk via JetStream:
```go
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000, // Keep last 1000 messages
MaxAge: 24 * time.Hour,
})
```
Stop and restart the app - chat history survives.
## Code Comparison
**Original chatroom - 160+ lines of custom pub/sub:**
- `Rooms` struct with named rooms
- `Room` with member tracking, mutex, dirty flag
- Ticker-based publish loop
- Manual join/leave channels
**This example - ~60 lines of NATS integration:**
- `embeddednats.New()` starts the server
- `nc.Subscribe(subject, handler)` for receiving
- `nc.Publish(subject, data)` for sending
- NATS handles delivery, no polling
## Next Steps
If this pattern proves useful, it could be promoted to a Via plugin:
```go
// Hypothetical future API
v.Config(via.WithEmbeddedNATS("./data/nats"))
// In page init
c.Subscribe("events.user.*", func(data []byte) {
c.Sync()
})
c.Publish("events.user.login", userData)
```

View File

@@ -0,0 +1,284 @@
package main
import (
"context"
"log"
"math/rand"
"sync"
"time"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
)
var (
WithSignal = via.WithSignal
)
// ChatMessage represents a message in a chat room
type ChatMessage struct {
User UserInfo `json:"user"`
Message string `json:"message"`
Time int64 `json:"time"`
}
// UserInfo identifies a chat participant
type UserInfo struct {
Name string `json:"name"`
Emoji string `json:"emoji"`
}
func (u *UserInfo) Avatar() h.H {
return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji))
}
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
func main() {
ctx := context.Background()
ps, err := vianats.New(ctx, "./data/nats")
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "NATS Chat",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
h.StyleEl(h.Raw(`
body { margin: 0; }
main {
display: flex;
flex-direction: column;
height: 100vh;
}
nav[role="tab-control"] ul li a[aria-current="page"] {
background-color: var(--pico-primary-background);
color: var(--pico-primary-inverse);
border-bottom: 2px solid var(--pico-primary);
}
.chat-message { display: flex; gap: 0.75rem; margin-bottom: 0.5rem; }
.avatar {
width: 2rem;
height: 2rem;
border-radius: 50%;
background: var(--pico-muted-border-color);
display: grid;
place-items: center;
font-size: 1.5rem;
flex-shrink: 0;
}
.bubble { flex: 1; }
.bubble p { margin: 0; }
.chat-history {
flex: 1;
overflow-y: auto;
padding: 1rem;
padding-bottom: calc(88px + env(safe-area-inset-bottom));
}
.chat-input {
position: fixed;
left: 0;
right: 0;
bottom: 0;
background: var(--pico-background-color);
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.75rem 1rem calc(0.75rem + env(safe-area-inset-bottom));
border-top: 1px solid var(--pico-muted-border-color);
}
.chat-input fieldset {
flex: 1;
margin: 0;
}
.nats-badge {
background: #27AAE1;
color: white;
padding: 0.25rem 0.5rem;
border-radius: 4px;
font-size: 0.75rem;
margin-left: auto;
}
`)),
h.Script(h.Raw(`
function scrollChatToBottom() {
const chatHistory = document.querySelector('.chat-history');
if (chatHistory) chatHistory.scrollTop = chatHistory.scrollHeight;
}
`)),
)
v.Page("/", func(c *via.Context) {
currentUser := randUser()
roomSignal := c.Signal("Go")
statement := c.Signal("")
var messages []ChatMessage
var messagesMu sync.Mutex
currentRoom := "Go"
var currentSub via.Subscription
subscribeToRoom := func(room string) {
if currentSub != nil {
currentSub.Unsubscribe()
}
subject := "chat.room." + room
// Replay history from JetStream
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil {
messages = hist
}
sub, _ := via.Subscribe(c, subject, func(msg ChatMessage) {
messagesMu.Lock()
messages = append(messages, msg)
if len(messages) > 50 {
messages = messages[len(messages)-50:]
}
messagesMu.Unlock()
c.Sync()
})
currentSub = sub
currentRoom = room
}
subscribeToRoom("Go")
switchRoom := c.Action(func() {
newRoom := roomSignal.String()
if newRoom != currentRoom {
messagesMu.Lock()
messages = nil
messagesMu.Unlock()
subscribeToRoom(newRoom)
c.Sync()
}
})
say := c.Action(func() {
msg := statement.String()
if msg == "" {
msg = randomDevQuote()
}
statement.SetValue("")
via.Publish(c, "chat.room."+currentRoom, ChatMessage{
User: currentUser,
Message: msg,
Time: time.Now().UnixMilli(),
})
})
c.View(func() h.H {
var tabs []h.H
for _, name := range roomNames {
isCurrent := name == currentRoom
tabs = append(tabs, h.Li(
h.A(
h.If(isCurrent, h.Attr("aria-current", "page")),
h.Text(name),
switchRoom.OnClick(WithSignal(roomSignal, name)),
),
))
}
messagesMu.Lock()
chatHistoryChildren := []h.H{
h.Class("chat-history"),
h.Script(h.Raw(`new MutationObserver(()=>scrollChatToBottom()).observe(document.querySelector('.chat-history'), {childList:true})`)),
}
for _, msg := range messages {
chatHistoryChildren = append(chatHistoryChildren,
h.Div(h.Class("chat-message"),
h.Div(h.Class("avatar"), h.Attr("title", msg.User.Name), h.Text(msg.User.Emoji)),
h.Div(h.Class("bubble"),
h.P(h.Text(msg.Message)),
),
),
)
}
messagesMu.Unlock()
return h.Main(h.Class("container"),
h.Nav(
h.Attr("role", "tab-control"),
h.Ul(tabs...),
h.Span(h.Class("nats-badge"), h.Text("NATS")),
),
h.Div(chatHistoryChildren...),
h.Div(
h.Class("chat-input"),
currentUser.Avatar(),
h.FieldSet(
h.Attr("role", "group"),
h.Input(
h.Type("text"),
h.Placeholder(currentUser.Name+" says..."),
statement.Bind(),
h.Attr("autofocus"),
say.OnKeyDown("Enter"),
),
h.Button(h.Text("Send"), say.OnClick()),
),
),
)
})
})
log.Println("Starting NATS chatroom on :7331 (embedded NATS server)")
v.Start()
}
func randUser() UserInfo {
adjectives := []string{"Happy", "Clever", "Brave", "Swift", "Gentle", "Wise", "Bold", "Calm", "Eager", "Fierce"}
animals := []string{"Panda", "Tiger", "Eagle", "Dolphin", "Fox", "Wolf", "Bear", "Hawk", "Otter", "Lion"}
emojis := []string{"🐼", "🐯", "🦅", "🐬", "🦊", "🐺", "🐻", "🦅", "🦦", "🦁"}
idx := rand.Intn(len(animals))
return UserInfo{
Name: adjectives[rand.Intn(len(adjectives))] + " " + animals[idx],
Emoji: emojis[idx],
}
}
var quoteIdx = rand.Intn(len(devQuotes))
var devQuotes = []string{
"Just use NATS.",
"Pub/sub all the things!",
"Messages are the new API.",
"JetStream for durability.",
"No more polling.",
"Event-driven architecture FTW.",
"Decouple everything.",
"NATS is fast.",
"Subjects are like topics.",
"Request-reply is cool.",
}
func randomDevQuote() string {
quoteIdx = (quoteIdx + 1) % len(devQuotes)
return devQuotes[quoteIdx]
}

View File

@@ -0,0 +1,284 @@
package main
import (
"context"
"crypto/rand"
"fmt"
"html"
"log"
"sync"
"time"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
)
var WithSignal = via.WithSignal
type Bookmark struct {
ID string
Title string
URL string
}
type CRUDEvent struct {
Action string `json:"action"`
Title string `json:"title"`
UserID string `json:"user_id"`
}
var (
bookmarks []Bookmark
bookmarksMu sync.RWMutex
)
func randomHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return fmt.Sprintf("%x", b)
}
func findBookmark(id string) (Bookmark, int) {
for i, bm := range bookmarks {
if bm.ID == id {
return bm, i
}
}
return Bookmark{}, -1
}
func main() {
ctx := context.Background()
ps, err := vianats.New(ctx, "./data/nats")
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
h.Script(h.Src("https://cdn.tailwindcss.com")),
)
v.Page("/", func(c *via.Context) {
userID := randomHex(8)
titleSignal := c.Signal("")
urlSignal := c.Signal("")
targetIDSignal := c.Signal("")
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
if evt.UserID == userID {
return
}
safeTitle := html.EscapeString(evt.Title)
var alertClass string
switch evt.Action {
case "created":
alertClass = "alert-success"
case "updated":
alertClass = "alert-info"
case "deleted":
alertClass = "alert-error"
}
c.ExecScript(fmt.Sprintf(`(function(){
var tc = document.getElementById('toast-container');
if (!tc) return;
var d = document.createElement('div');
d.className = 'alert %s';
d.innerHTML = '<span>Bookmark "%s" %s</span>';
tc.appendChild(d);
setTimeout(function(){ d.remove(); }, 3000);
})()`, alertClass, safeTitle, evt.Action))
c.Sync()
})
save := c.Action(func() {
title := titleSignal.String()
url := urlSignal.String()
if title == "" || url == "" {
return
}
targetID := targetIDSignal.String()
action := "created"
bookmarksMu.Lock()
if targetID != "" {
if _, idx := findBookmark(targetID); idx >= 0 {
bookmarks[idx].Title = title
bookmarks[idx].URL = url
action = "updated"
}
} else {
bookmarks = append(bookmarks, Bookmark{
ID: randomHex(8),
Title: title,
URL: url,
})
}
bookmarksMu.Unlock()
titleSignal.SetValue("")
urlSignal.SetValue("")
targetIDSignal.SetValue("")
via.Publish(c, "bookmarks.events", CRUDEvent{
Action: action,
Title: title,
UserID: userID,
})
c.Sync()
})
edit := c.Action(func() {
id := targetIDSignal.String()
bookmarksMu.RLock()
bm, idx := findBookmark(id)
bookmarksMu.RUnlock()
if idx < 0 {
return
}
titleSignal.SetValue(bm.Title)
urlSignal.SetValue(bm.URL)
})
del := c.Action(func() {
id := targetIDSignal.String()
bookmarksMu.Lock()
bm, idx := findBookmark(id)
if idx >= 0 {
bookmarks = append(bookmarks[:idx], bookmarks[idx+1:]...)
}
bookmarksMu.Unlock()
if idx < 0 {
return
}
targetIDSignal.SetValue("")
via.Publish(c, "bookmarks.events", CRUDEvent{
Action: "deleted",
Title: bm.Title,
UserID: userID,
})
c.Sync()
})
cancelEdit := c.Action(func() {
titleSignal.SetValue("")
urlSignal.SetValue("")
targetIDSignal.SetValue("")
})
c.View(func() h.H {
isEditing := targetIDSignal.String() != ""
// Build table rows
bookmarksMu.RLock()
var rows []h.H
for _, bm := range bookmarks {
rows = append(rows, h.Tr(
h.Td(h.Text(bm.Title)),
h.Td(h.A(h.Href(bm.URL), h.Attr("target", "_blank"), h.Class("link link-primary"), h.Text(bm.URL))),
h.Td(
h.Div(h.Class("flex gap-1"),
h.Button(h.Class("btn btn-xs btn-ghost"), h.Text("Edit"),
edit.OnClick(WithSignal(targetIDSignal, bm.ID)),
),
h.Button(h.Class("btn btn-xs btn-ghost text-error"), h.Text("Delete"),
del.OnClick(WithSignal(targetIDSignal, bm.ID)),
),
),
),
))
}
bookmarksMu.RUnlock()
saveLabel := "Add Bookmark"
if isEditing {
saveLabel = "Update Bookmark"
}
return h.Div(h.Class("min-h-screen bg-base-200"),
// Navbar
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
h.Div(h.Class("flex-1"),
h.A(h.Class("btn btn-ghost text-xl"), h.Text("Bookmarks")),
),
h.Div(h.Class("flex-none"),
h.Div(h.Class("badge badge-outline"), h.Text(userID[:8])),
),
),
h.Div(h.Class("container mx-auto p-4 max-w-3xl flex flex-col gap-4"),
// Form card
h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text(saveLabel)),
h.Div(h.Class("flex flex-col gap-2"),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
h.Div(h.Class("card-actions justify-end"),
h.If(isEditing,
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
),
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()),
),
),
),
),
// Table card
h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text("All Bookmarks")),
h.If(len(rows) == 0,
h.P(h.Class("text-base-content/60"), h.Text("No bookmarks yet. Add one above!")),
),
h.If(len(rows) > 0,
h.Div(h.Class("overflow-x-auto"),
h.Table(h.Class("table"),
h.THead(h.Tr(
h.Th(h.Text("Title")),
h.Th(h.Text("URL")),
h.Th(h.Text("Actions")),
)),
h.TBody(rows...),
),
),
),
),
),
),
// Toast container — ignored by morph so Sync() doesn't wipe active toasts
h.Div(h.ID("toast-container"), h.Class("toast toast-end toast-top"), h.DataIgnoreMorph()),
)
})
})
log.Println("Starting pubsub-crud example on :7331")
v.Start()
}

View File

@@ -14,7 +14,7 @@ func main() {
v := via.New() v := via.New()
v.Config(via.Options{ v.Config(via.Options{
LogLvl: via.LogLevelDebug, LogLevel: via.LogLevelDebug,
DevMode: true, DevMode: true,
Plugins: []via.Plugin{ Plugins: []via.Plugin{
// picocss.Default, // picocss.Default,

View File

@@ -1,13 +1,33 @@
package main package main
import ( import (
"database/sql"
"log"
_ "github.com/mattn/go-sqlite3"
"github.com/ryanhamamura/via" "github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
) )
func main() { func main() {
// Open SQLite database for persistent sessions
db, err := sql.Open("sqlite3", "sessions.db")
if err != nil {
log.Fatalf("failed to open database: %v", err)
}
defer db.Close()
// Create session manager with SQLite store
sm, err := via.NewSQLiteSessionManager(db)
if err != nil {
log.Fatalf("failed to create session manager: %v", err)
}
v := via.New() v := via.New()
v.Config(via.Options{ServerAddress: ":7331"}) v.Config(via.Options{
ServerAddress: ":7331",
SessionManager: sm,
})
// Login page // Login page
v.Page("/login", func(c *via.Context) { v.Page("/login", func(c *via.Context) {

View File

@@ -54,7 +54,7 @@ func main() {
v.Config(via.Options{ v.Config(via.Options{
DevMode: true, DevMode: true,
DocumentTitle: "Search", DocumentTitle: "Search",
LogLvl: via.LogLevelWarn, LogLevel: via.LogLevelWarn,
}) })
v.AppendToHead( v.AppendToHead(

195
nats_test.go Normal file
View File

@@ -0,0 +1,195 @@
package via
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type mockHandler struct {
id int64
fn func([]byte)
active atomic.Bool
}
// mockPubSub implements PubSub for testing without NATS.
type mockPubSub struct {
mu sync.Mutex
subs map[string][]*mockHandler
nextID atomic.Int64
}
func newMockPubSub() *mockPubSub {
return &mockPubSub{subs: make(map[string][]*mockHandler)}
}
func (m *mockPubSub) Publish(subject string, data []byte) error {
m.mu.Lock()
handlers := make([]*mockHandler, len(m.subs[subject]))
copy(handlers, m.subs[subject])
m.mu.Unlock()
for _, h := range handlers {
if h.active.Load() {
h.fn(data)
}
}
return nil
}
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
m.mu.Lock()
defer m.mu.Unlock()
mh := &mockHandler{
id: m.nextID.Add(1),
fn: handler,
}
mh.active.Store(true)
m.subs[subject] = append(m.subs[subject], mh)
return &mockSub{handler: mh}, nil
}
func (m *mockPubSub) Close() error { return nil }
type mockSub struct {
handler *mockHandler
}
func (s *mockSub) Unsubscribe() error {
s.handler.active.Store(false)
return nil
}
func TestPubSub_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var received []byte
var wg sync.WaitGroup
wg.Add(1)
c := newContext("test-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := c.Subscribe("test.topic", func(data []byte) {
received = data
wg.Done()
})
require.NoError(t, err)
err = c.Publish("test.topic", []byte("hello"))
require.NoError(t, err)
wg.Wait()
assert.Equal(t, []byte("hello"), received)
}
func TestPubSub_MultipleSubscribers(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var mu sync.Mutex
var results []string
var wg sync.WaitGroup
wg.Add(2)
c1 := newContext("ctx-1", "/", v)
c1.View(func() h.H { return h.Div() })
c2 := newContext("ctx-2", "/", v)
c2.View(func() h.H { return h.Div() })
c1.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c1:"+string(data))
mu.Unlock()
wg.Done()
})
c2.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c2:"+string(data))
mu.Unlock()
wg.Done()
})
c1.Publish("broadcast", []byte("msg"))
wg.Wait()
assert.Len(t, results, 2)
assert.Contains(t, results, "c1:msg")
assert.Contains(t, results, "c2:msg")
}
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("cleanup-ctx", "/", v)
c.View(func() h.H { return h.Div() })
c.Subscribe("room.1", func(data []byte) {})
c.Subscribe("room.2", func(data []byte) {})
assert.Len(t, c.subscriptions, 2)
c.unsubscribeAll()
assert.Empty(t, c.subscriptions)
}
func TestPubSub_ManualUnsubscribe(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("unsub-ctx", "/", v)
c.View(func() h.H { return h.Div() })
called := false
sub, err := c.Subscribe("topic", func(data []byte) {
called = true
})
require.NoError(t, err)
sub.Unsubscribe()
c.Publish("topic", []byte("ignored"))
time.Sleep(10 * time.Millisecond)
assert.False(t, called)
}
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
v := New()
c := newContext("noop-ctx", "/", v)
c.View(func() h.H { return h.Div() })
err := c.Publish("topic", []byte("data"))
assert.Error(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.Error(t, err)
assert.Nil(t, sub)
}
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
// Panic-check context has id=""
c := newContext("", "/", v)
err := c.Publish("topic", []byte("data"))
assert.NoError(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.NoError(t, err)
assert.Nil(t, sub)
}

14
pubsub.go Normal file
View File

@@ -0,0 +1,14 @@
package via
// PubSub is an interface for publish/subscribe messaging backends.
// The vianats sub-package provides an embedded NATS implementation.
type PubSub interface {
Publish(subject string, data []byte) error
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
Close() error
}
// Subscription represents an active subscription that can be manually unsubscribed.
type Subscription interface {
Unsubscribe() error
}

23
pubsub_helpers.go Normal file
View File

@@ -0,0 +1,23 @@
package via
import "encoding/json"
// Publish JSON-marshals msg and publishes to subject.
func Publish[T any](c *Context, subject string, msg T) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}
return c.Publish(subject, data)
}
// Subscribe JSON-unmarshals each message as T and calls handler.
func Subscribe[T any](c *Context, subject string, handler func(T)) (Subscription, error) {
return c.Subscribe(subject, func(data []byte) {
var msg T
if err := json.Unmarshal(data, &msg); err != nil {
return
}
handler(msg)
})
}

66
pubsub_helpers_test.go Normal file
View File

@@ -0,0 +1,66 @@
package via
import (
"sync"
"testing"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPublishSubscribe_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
type event struct {
Name string `json:"name"`
Count int `json:"count"`
}
var got event
var wg sync.WaitGroup
wg.Add(1)
c := newContext("typed-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := Subscribe(c, "events", func(e event) {
got = e
wg.Done()
})
require.NoError(t, err)
err = Publish(c, "events", event{Name: "click", Count: 42})
require.NoError(t, err)
wg.Wait()
assert.Equal(t, "click", got.Name)
assert.Equal(t, 42, got.Count)
}
func TestSubscribe_SkipsBadJSON(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
type msg struct {
Text string `json:"text"`
}
called := false
c := newContext("bad-json-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := Subscribe(c, "topic", func(m msg) {
called = true
})
require.NoError(t, err)
// Publish raw invalid JSON — handler should silently skip
err = c.Publish("topic", []byte("not json"))
require.NoError(t, err)
assert.False(t, called)
}

View File

@@ -2,11 +2,34 @@ package via
import ( import (
"context" "context"
"database/sql"
"time" "time"
"github.com/alexedwards/scs/sqlite3store"
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
) )
// NewSQLiteSessionManager creates a session manager using SQLite for persistence.
// Creates the sessions table if it doesn't exist.
// The returned manager can be configured further (Lifetime, Cookie settings, etc.)
// before passing to Options.SessionManager.
func NewSQLiteSessionManager(db *sql.DB) (*scs.SessionManager, error) {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
data BLOB NOT NULL,
expiry REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS sessions_expiry_idx ON sessions(expiry);
`)
if err != nil {
return nil, err
}
sm := scs.New()
sm.Store = sqlite3store.New(db)
return sm, nil
}
// Session provides access to the user's session data. // Session provides access to the user's session data.
// Session data persists across page views for the same browser. // Session data persists across page views for the same browser.
type Session struct { type Session struct {

238
via.go
View File

@@ -7,21 +7,25 @@
package via package via
import ( import (
"context"
"crypto/rand" "crypto/rand"
_ "embed" _ "embed"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
ossignal "os/signal"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"syscall"
"time"
"github.com/alexedwards/scs/v2" "github.com/alexedwards/scs/v2"
"github.com/rs/zerolog"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/starfederation/datastar-go/datastar" "github.com/starfederation/datastar-go/datastar"
) )
@@ -34,63 +38,67 @@ var datastarJS []byte
type V struct { type V struct {
cfg Options cfg Options
mux *http.ServeMux mux *http.ServeMux
server *http.Server
logger zerolog.Logger
contextRegistry map[string]*Context contextRegistry map[string]*Context
contextRegistryMutex sync.RWMutex contextRegistryMutex sync.RWMutex
documentHeadIncludes []h.H documentHeadIncludes []h.H
documentFootIncludes []h.H documentFootIncludes []h.H
devModePageInitFnMap map[string]func(*Context) devModePageInitFnMap map[string]func(*Context)
sessionManager *scs.SessionManager sessionManager *scs.SessionManager
pubsub PubSub
datastarPath string datastarPath string
datastarContent []byte datastarContent []byte
datastarOnce sync.Once datastarOnce sync.Once
reaperStop chan struct{}
}
func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event {
if c != nil && c.id != "" {
evt = evt.Str("via-ctx", c.id)
}
return evt
} }
func (v *V) logFatal(format string, a ...any) { func (v *V) logFatal(format string, a ...any) {
log.Printf("[fatal] msg=%q", fmt.Sprintf(format, a...)) v.logEvent(v.logger.WithLevel(zerolog.FatalLevel), nil).Msgf(format, a...)
} }
func (v *V) logErr(c *Context, format string, a ...any) { func (v *V) logErr(c *Context, format string, a ...any) {
cRef := "" v.logEvent(v.logger.Error(), c).Msgf(format, a...)
if c != nil && c.id != "" {
cRef = fmt.Sprintf("via-ctx=%q ", c.id)
}
log.Printf("[error] %smsg=%q", cRef, fmt.Sprintf(format, a...))
} }
func (v *V) logWarn(c *Context, format string, a ...any) { func (v *V) logWarn(c *Context, format string, a ...any) {
cRef := "" v.logEvent(v.logger.Warn(), c).Msgf(format, a...)
if c != nil && c.id != "" {
cRef = fmt.Sprintf("via-ctx=%q ", c.id)
}
if v.cfg.LogLvl >= LogLevelWarn {
log.Printf("[warn] %smsg=%q", cRef, fmt.Sprintf(format, a...))
}
} }
func (v *V) logInfo(c *Context, format string, a ...any) { func (v *V) logInfo(c *Context, format string, a ...any) {
cRef := "" v.logEvent(v.logger.Info(), c).Msgf(format, a...)
if c != nil && c.id != "" {
cRef = fmt.Sprintf("via-ctx=%q ", c.id)
}
if v.cfg.LogLvl >= LogLevelInfo {
log.Printf("[info] %smsg=%q", cRef, fmt.Sprintf(format, a...))
}
} }
func (v *V) logDebug(c *Context, format string, a ...any) { func (v *V) logDebug(c *Context, format string, a ...any) {
cRef := "" v.logEvent(v.logger.Debug(), c).Msgf(format, a...)
if c != nil && c.id != "" { }
cRef = fmt.Sprintf("via-ctx=%q ", c.id)
} func newConsoleLogger(level zerolog.Level) zerolog.Logger {
if v.cfg.LogLvl == LogLevelDebug { return zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05"}).
log.Printf("[debug] %smsg=%q", cRef, fmt.Sprintf(format, a...)) With().Timestamp().Logger().Level(level)
}
} }
// Config overrides the default configuration with the given options. // Config overrides the default configuration with the given options.
func (v *V) Config(cfg Options) { func (v *V) Config(cfg Options) {
if cfg.LogLvl != undefined { if cfg.Logger != nil {
v.cfg.LogLvl = cfg.LogLvl v.logger = *cfg.Logger
} else if cfg.LogLevel != nil || cfg.DevMode != v.cfg.DevMode {
level := zerolog.InfoLevel
if cfg.LogLevel != nil {
level = *cfg.LogLevel
}
if cfg.DevMode {
v.logger = newConsoleLogger(level)
} else {
v.logger = zerolog.New(os.Stderr).With().Timestamp().Logger().Level(level)
}
} }
if cfg.DocumentTitle != "" { if cfg.DocumentTitle != "" {
v.cfg.DocumentTitle = cfg.DocumentTitle v.cfg.DocumentTitle = cfg.DocumentTitle
@@ -117,6 +125,12 @@ func (v *V) Config(cfg Options) {
if cfg.DatastarPath != "" { if cfg.DatastarPath != "" {
v.datastarPath = cfg.DatastarPath v.datastarPath = cfg.DatastarPath
} }
if cfg.PubSub != nil {
v.pubsub = cfg.PubSub
}
if cfg.ContextTTL != 0 {
v.cfg.ContextTTL = cfg.ContextTTL
}
} }
// AppendToHead appends the given h.H nodes to the head of the base HTML document. // AppendToHead appends the given h.H nodes to the head of the base HTML document.
@@ -228,6 +242,14 @@ func (v *V) currSessionNum() int {
return len(v.contextRegistry) return len(v.contextRegistry)
} }
func (v *V) cleanupCtx(c *Context) {
c.dispose()
if v.cfg.DevMode {
v.devModeRemovePersisted(c)
}
v.unregisterCtx(c)
}
func (v *V) unregisterCtx(c *Context) { func (v *V) unregisterCtx(c *Context) {
if c.id == "" { if c.id == "" {
v.logErr(c, "unregister ctx failed: ctx contains empty id") v.logErr(c, "unregister ctx failed: ctx contains empty id")
@@ -249,14 +271,131 @@ func (v *V) getCtx(id string) (*Context, error) {
return nil, fmt.Errorf("ctx '%s' not found", id) return nil, fmt.Errorf("ctx '%s' not found", id)
} }
// Start starts the Via HTTP server on the given address. func (v *V) startReaper() {
ttl := v.cfg.ContextTTL
if ttl < 0 {
return
}
if ttl == 0 {
ttl = 30 * time.Second
}
interval := ttl / 3
if interval < 5*time.Second {
interval = 5 * time.Second
}
v.reaperStop = make(chan struct{})
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-v.reaperStop:
return
case <-ticker.C:
v.reapOrphanedContexts(ttl)
}
}
}()
}
func (v *V) reapOrphanedContexts(ttl time.Duration) {
now := time.Now()
v.contextRegistryMutex.RLock()
var orphans []*Context
for _, c := range v.contextRegistry {
if !c.sseConnected.Load() && now.Sub(c.createdAt) > ttl {
orphans = append(orphans, c)
}
}
v.contextRegistryMutex.RUnlock()
for _, c := range orphans {
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
v.cleanupCtx(c)
}
}
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
// signal is received, then performs a graceful shutdown.
func (v *V) Start() { func (v *V) Start() {
v.logInfo(nil, "via started at [%s]", v.cfg.ServerAddress)
handler := http.Handler(v.mux) handler := http.Handler(v.mux)
if v.sessionManager != nil { if v.sessionManager != nil {
handler = v.sessionManager.LoadAndSave(v.mux) handler = v.sessionManager.LoadAndSave(v.mux)
} }
log.Fatalf("[fatal] %v", http.ListenAndServe(v.cfg.ServerAddress, handler)) v.server = &http.Server{
Addr: v.cfg.ServerAddress,
Handler: handler,
}
v.startReaper()
errCh := make(chan error, 1)
go func() {
errCh <- v.server.ListenAndServe()
}()
v.logInfo(nil, "via started at [%s]", v.cfg.ServerAddress)
sigCh := make(chan os.Signal, 1)
ossignal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-sigCh:
v.logInfo(nil, "received signal %v, shutting down", sig)
case err := <-errCh:
if err != nil && err != http.ErrServerClosed {
v.logger.Fatal().Err(err).Msg("http server failed")
}
return
}
v.shutdown()
}
// Shutdown gracefully shuts down the server and all contexts.
// Safe for programmatic or test use.
func (v *V) Shutdown() {
v.shutdown()
}
func (v *V) shutdown() {
if v.reaperStop != nil {
close(v.reaperStop)
}
v.logInfo(nil, "draining all contexts")
v.drainAllContexts()
if v.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := v.server.Shutdown(ctx); err != nil {
v.logErr(nil, "http server shutdown error: %v", err)
}
}
if v.pubsub != nil {
if err := v.pubsub.Close(); err != nil {
v.logErr(nil, "pubsub close error: %v", err)
}
}
v.logInfo(nil, "shutdown complete")
}
func (v *V) drainAllContexts() {
v.contextRegistryMutex.Lock()
contexts := make([]*Context, 0, len(v.contextRegistry))
for _, c := range v.contextRegistry {
contexts = append(contexts, c)
}
v.contextRegistry = make(map[string]*Context)
v.contextRegistryMutex.Unlock()
for _, c := range contexts {
v.logDebug(c, "disposing context")
c.dispose()
}
v.logInfo(nil, "drained %d context(s)", len(contexts))
} }
// HTTPServeMux returns the underlying HTTP request multiplexer to enable user extentions, middleware and // HTTPServeMux returns the underlying HTTP request multiplexer to enable user extentions, middleware and
@@ -280,7 +419,7 @@ func (v *V) ensureDatastarHandler() {
func (v *V) devModePersist(c *Context) { func (v *V) devModePersist(c *Context) {
p := filepath.Join(".via", "devmode", "ctx.json") p := filepath.Join(".via", "devmode", "ctx.json")
if err := os.MkdirAll(filepath.Dir(p), 0755); err != nil { if err := os.MkdirAll(filepath.Dir(p), 0755); err != nil {
log.Fatalf("failed to create directory for devmode files: %v", err) v.logFatal("failed to create directory for devmode files: %v", err)
} }
// load persisted list from file, or empty list if file not found // load persisted list from file, or empty list if file not found
@@ -322,10 +461,7 @@ func (v *V) devModeRemovePersisted(c *Context) {
} }
file.Close() file.Close()
// remove ctx to persisted list
if _, ok := ctxRegMap[c.id]; !ok {
delete(ctxRegMap, c.id) delete(ctxRegMap, c.id)
}
// write persisted list to file // write persisted list to file
file, err = os.Create(p) file, err = os.Create(p)
@@ -394,6 +530,7 @@ func New() *V {
v := &V{ v := &V{
mux: mux, mux: mux,
logger: newConsoleLogger(zerolog.InfoLevel),
contextRegistry: make(map[string]*Context), contextRegistry: make(map[string]*Context),
devModePageInitFnMap: make(map[string]func(*Context)), devModePageInitFnMap: make(map[string]func(*Context)),
sessionManager: scs.New(), sessionManager: scs.New(),
@@ -402,16 +539,11 @@ func New() *V {
cfg: Options{ cfg: Options{
DevMode: false, DevMode: false,
ServerAddress: ":3000", ServerAddress: ":3000",
LogLvl: LogLevelInfo,
DocumentTitle: "⚡ Via", DocumentTitle: "⚡ Via",
}, },
} }
v.mux.HandleFunc("GET /_sse", func(w http.ResponseWriter, r *http.Request) { v.mux.HandleFunc("GET /_sse", func(w http.ResponseWriter, r *http.Request) {
isReconnect := false
if r.Header.Get("last-event-id") == "via" {
isReconnect = true
}
var sigs map[string]any var sigs map[string]any
_ = datastar.ReadSignals(r, &sigs) _ = datastar.ReadSignals(r, &sigs)
cID, _ := sigs["via-ctx"].(string) cID, _ := sigs["via-ctx"].(string)
@@ -433,25 +565,23 @@ func New() *V {
// use last-event-id to tell if request is a sse reconnect // use last-event-id to tell if request is a sse reconnect
sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via")) sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via"))
c.sseConnected.Store(true)
v.logDebug(c, "SSE connection established") v.logDebug(c, "SSE connection established")
go func() { go func() {
if isReconnect || v.cfg.DevMode {
c.Sync() c.Sync()
return
}
c.SyncSignals()
}() }()
for { for {
select { select {
case <-sse.Context().Done(): case <-sse.Context().Done():
v.logDebug(c, "SSE connection ended") v.logDebug(c, "SSE connection ended")
v.cleanupCtx(c)
return return
case patch, ok := <-c.patchChan: case <-c.ctxDisposedChan:
if !ok { v.logDebug(c, "context disposed, closing SSE")
continue return
} case patch := <-c.patchChan:
switch patch.typ { switch patch.typ {
case patchTypeElements: case patchTypeElements:
if err := sse.PatchElements(patch.content); err != nil { if err := sse.PatchElements(patch.content); err != nil {
@@ -522,7 +652,7 @@ func New() *V {
v.mux.HandleFunc("POST /_session/close", func(w http.ResponseWriter, r *http.Request) { v.mux.HandleFunc("POST /_session/close", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body) body, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
log.Printf("Error reading body: %v", err) v.logErr(nil, "error reading body: %v", err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
@@ -533,12 +663,8 @@ func New() *V {
v.logErr(c, "failed to handle session close: %v", err) v.logErr(c, "failed to handle session close: %v", err)
return return
} }
c.stopAllRoutines()
v.logDebug(c, "session close event triggered") v.logDebug(c, "session close event triggered")
if v.cfg.DevMode { v.cleanupCtx(c)
v.devModeRemovePersisted(c)
}
v.unregisterCtx(c)
}) })
return v return v
} }

View File

@@ -1,9 +1,13 @@
package via package via
import ( import (
"encoding/json"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"path/filepath"
"testing" "testing"
"time"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -128,6 +132,155 @@ func TestAction(t *testing.T) {
assert.Contains(t, body, "/_action/") assert.Contains(t, body, "/_action/")
} }
func TestEventTypes(t *testing.T) {
tests := []struct {
name string
attr string
buildEl func(trigger *actionTrigger) h.H
}{
{"OnSubmit", "data-on:submit", func(tr *actionTrigger) h.H { return h.Form(tr.OnSubmit()) }},
{"OnInput", "data-on:input", func(tr *actionTrigger) h.H { return h.Input(tr.OnInput()) }},
{"OnFocus", "data-on:focus", func(tr *actionTrigger) h.H { return h.Input(tr.OnFocus()) }},
{"OnBlur", "data-on:blur", func(tr *actionTrigger) h.H { return h.Input(tr.OnBlur()) }},
{"OnMouseEnter", "data-on:mouseenter", func(tr *actionTrigger) h.H { return h.Div(tr.OnMouseEnter()) }},
{"OnMouseLeave", "data-on:mouseleave", func(tr *actionTrigger) h.H { return h.Div(tr.OnMouseLeave()) }},
{"OnScroll", "data-on:scroll", func(tr *actionTrigger) h.H { return h.Div(tr.OnScroll()) }},
{"OnDblClick", "data-on:dblclick", func(tr *actionTrigger) h.H { return h.Div(tr.OnDblClick()) }},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H { return tt.buildEl(trigger) })
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, tt.attr)
assert.Contains(t, body, "/_action/"+trigger.id)
})
}
t.Run("WithSignal", func(t *testing.T) {
var trigger *actionTrigger
var sig *signal
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
sig = c.Signal("val")
c.View(func() h.H {
return h.Div(trigger.OnDblClick(WithSignal(sig, "x")))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:dblclick")
assert.Contains(t, body, "$"+sig.ID()+"=&#39;x&#39;")
})
}
func TestOnKeyDownWithWindow(t *testing.T) {
var trigger *actionTrigger
v := New()
v.Page("/", func(c *Context) {
trigger = c.Action(func() {})
c.View(func() h.H {
return h.Div(trigger.OnKeyDown("Enter", WithWindow()))
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "data-on:keydown__window")
assert.Contains(t, body, "evt.key===&#39;Enter&#39;")
}
func TestOnKeyDownMap(t *testing.T) {
t.Run("multiple bindings with different actions", func(t *testing.T) {
var move, shoot *actionTrigger
var dir *signal
v := New()
v.Page("/", func(c *Context) {
dir = c.Signal("none")
move = c.Action(func() {})
shoot = c.Action(func() {})
c.View(func() h.H {
return h.Div(
OnKeyDownMap(
KeyBind("w", move, WithSignal(dir, "up")),
KeyBind("ArrowUp", move, WithSignal(dir, "up"), WithPreventDefault()),
KeyBind(" ", shoot, WithPreventDefault()),
),
)
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
// Single attribute, window-scoped
assert.Contains(t, body, "data-on:keydown__window")
// Key dispatching
assert.Contains(t, body, "evt.key===&#39;w&#39;")
assert.Contains(t, body, "evt.key===&#39;ArrowUp&#39;")
assert.Contains(t, body, "evt.key===&#39; &#39;")
// Different actions referenced
assert.Contains(t, body, "/_action/"+move.id)
assert.Contains(t, body, "/_action/"+shoot.id)
// preventDefault only on ArrowUp and space branches
assert.Contains(t, body, "evt.key===&#39;ArrowUp&#39; ? (evt.preventDefault()")
assert.Contains(t, body, "evt.key===&#39; &#39; ? (evt.preventDefault()")
// 'w' branch should NOT have preventDefault
assert.NotContains(t, body, "evt.key===&#39;w&#39; ? (evt.preventDefault()")
})
t.Run("WithSignal per binding", func(t *testing.T) {
var move *actionTrigger
var dir *signal
v := New()
v.Page("/", func(c *Context) {
dir = c.Signal("none")
move = c.Action(func() {})
c.View(func() h.H {
return h.Div(
OnKeyDownMap(
KeyBind("w", move, WithSignal(dir, "up")),
KeyBind("s", move, WithSignal(dir, "down")),
),
)
})
})
req := httptest.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
v.mux.ServeHTTP(w, req)
body := w.Body.String()
assert.Contains(t, body, "$"+dir.ID()+"=&#39;up&#39;")
assert.Contains(t, body, "$"+dir.ID()+"=&#39;down&#39;")
})
t.Run("empty bindings returns nil", func(t *testing.T) {
result := OnKeyDownMap()
assert.Nil(t, result)
})
}
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
v := New() v := New()
v.Config(Options{DocumentTitle: "Test"}) v.Config(Options{DocumentTitle: "Test"})
@@ -140,3 +293,93 @@ func TestPage_PanicsOnNoView(t *testing.T) {
v.Page("/", func(c *Context) {}) v.Page("/", func(c *Context) {})
}) })
} }
func TestReaperCleansOrphanedContexts(t *testing.T) {
v := New()
c := newContext("orphan-1", "/", v)
c.createdAt = time.Now().Add(-time.Minute) // created 1 min ago
v.registerCtx(c)
_, err := v.getCtx("orphan-1")
assert.NoError(t, err)
v.reapOrphanedContexts(10 * time.Second)
_, err = v.getCtx("orphan-1")
assert.Error(t, err, "orphaned context should have been reaped")
}
func TestReaperIgnoresConnectedContexts(t *testing.T) {
v := New()
c := newContext("connected-1", "/", v)
c.createdAt = time.Now().Add(-time.Minute)
c.sseConnected.Store(true)
v.registerCtx(c)
v.reapOrphanedContexts(10 * time.Second)
_, err := v.getCtx("connected-1")
assert.NoError(t, err, "connected context should survive reaping")
}
func TestReaperDisabledWithNegativeTTL(t *testing.T) {
v := New()
v.cfg.ContextTTL = -1
v.startReaper()
assert.Nil(t, v.reaperStop, "reaper should not start with negative TTL")
}
func TestCleanupCtxIdempotent(t *testing.T) {
v := New()
c := newContext("idempotent-1", "/", v)
v.registerCtx(c)
assert.NotPanics(t, func() {
v.cleanupCtx(c)
v.cleanupCtx(c)
})
_, err := v.getCtx("idempotent-1")
assert.Error(t, err, "context should be removed after cleanup")
}
func TestDevModeRemovePersistedFix(t *testing.T) {
v := New()
v.cfg.DevMode = true
dir := filepath.Join(t.TempDir(), ".via", "devmode")
p := filepath.Join(dir, "ctx.json")
assert.NoError(t, os.MkdirAll(dir, 0755))
// Write a persisted context
ctxRegMap := map[string]string{"test-ctx-1": "/"}
f, err := os.Create(p)
assert.NoError(t, err)
assert.NoError(t, json.NewEncoder(f).Encode(ctxRegMap))
f.Close()
// Patch devModeRemovePersisted to use our temp path by calling it
// directly — we need to override the path. Instead, test via the
// actual function by temporarily changing the working dir.
origDir, _ := os.Getwd()
assert.NoError(t, os.Chdir(t.TempDir()))
defer os.Chdir(origDir)
// Re-create the structure in the temp dir
assert.NoError(t, os.MkdirAll(filepath.Join(".via", "devmode"), 0755))
p2 := filepath.Join(".via", "devmode", "ctx.json")
f2, _ := os.Create(p2)
json.NewEncoder(f2).Encode(map[string]string{"test-ctx-1": "/"})
f2.Close()
c := newContext("test-ctx-1", "/", v)
v.devModeRemovePersisted(c)
// Read back and verify
f3, err := os.Open(p2)
assert.NoError(t, err)
defer f3.Close()
var result map[string]string
assert.NoError(t, json.NewDecoder(f3).Decode(&result))
assert.Empty(t, result, "persisted context should be removed")
}

127
vianats/vianats.go Normal file
View File

@@ -0,0 +1,127 @@
// Package vianats provides an embedded NATS server with JetStream as a
// pub/sub backend for Via applications.
package vianats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
)
// NATS implements via.PubSub using an embedded NATS server with JetStream.
type NATS struct {
server *embeddednats.Server
nc *nats.Conn
js nats.JetStreamContext
}
// New starts an embedded NATS server with JetStream enabled and returns a
// ready-to-use NATS instance. The server stores data in dataDir and shuts
// down when ctx is cancelled.
func New(ctx context.Context, dataDir string) (*NATS, error) {
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
if err != nil {
return nil, fmt.Errorf("vianats: start server: %w", err)
}
ns.WaitForServer()
nc, err := ns.Client()
if err != nil {
ns.Close()
return nil, fmt.Errorf("vianats: connect client: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
ns.Close()
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
}
return &NATS{server: ns, nc: nc, js: js}, nil
}
// Publish sends data to the given subject using core NATS publish.
// JetStream captures messages automatically if a matching stream exists.
func (n *NATS) Publish(subject string, data []byte) error {
return n.nc.Publish(subject, data)
}
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
if err != nil {
return nil, err
}
return sub, nil
}
// Close shuts down the client connection and embedded server.
func (n *NATS) Close() error {
n.nc.Close()
return n.server.Close()
}
// Conn returns the underlying NATS connection for advanced usage.
func (n *NATS) Conn() *nats.Conn {
return n.nc
}
// JetStream returns the JetStream context for stream configuration and replay.
func (n *NATS) JetStream() nats.JetStreamContext {
return n.js
}
// StreamConfig holds the parameters for creating or updating a JetStream stream.
type StreamConfig struct {
Name string
Subjects []string
MaxMsgs int64
MaxAge time.Duration
}
// EnsureStream creates or updates a JetStream stream matching cfg.
func EnsureStream(n *NATS, cfg StreamConfig) error {
_, err := n.js.AddStream(&nats.StreamConfig{
Name: cfg.Name,
Subjects: cfg.Subjects,
Retention: nats.LimitsPolicy,
MaxMsgs: cfg.MaxMsgs,
MaxAge: cfg.MaxAge,
})
return err
}
// ReplayHistory fetches the last limit messages from subject,
// deserializing each as T. Returns an empty slice if nothing is available.
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
var msgs []T
for {
raw, err := sub.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var msg T
if json.Unmarshal(raw.Data, &msg) == nil {
msgs = append(msgs, msg)
}
}
if limit > 0 && len(msgs) > limit {
msgs = msgs[len(msgs)-limit:]
}
return msgs, nil
}