feat: add embedded NATS pub/sub support on Context

Define PubSub and Subscription interfaces in the core via package with
a vianats sub-package providing the embedded NATS + JetStream
implementation. Expose c.Publish() and c.Subscribe() on Context with
automatic subscription cleanup on session close. Refactor the NATS
chatroom example to use the built-in methods instead of manual
subscription tracking.
This commit is contained in:
Ryan Hamamura
2026-01-26 08:06:50 -10:00
parent 88bd0f31df
commit 30cc6d88e6
7 changed files with 374 additions and 153 deletions

View File

@@ -45,4 +45,8 @@ type Options struct {
// DatastarPath is the URL path where the script is served. // DatastarPath is the URL path where the script is served.
// Defaults to "/_datastar.js" if empty. // Defaults to "/_datastar.js" if empty.
DatastarPath string DatastarPath string
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
// embedded NATS backend, or supply any PubSub implementation.
PubSub PubSub
} }

View File

@@ -31,6 +31,8 @@ type Context struct {
mu sync.RWMutex mu sync.RWMutex
ctxDisposedChan chan struct{} ctxDisposedChan chan struct{}
reqCtx context.Context reqCtx context.Context
subscriptions []Subscription
subsMu sync.Mutex
} }
// View defines the UI rendered by this context. // View defines the UI rendered by this context.
@@ -403,6 +405,55 @@ func (c *Context) Session() *Session {
} }
} }
// Publish sends data to the given subject via the configured PubSub backend.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Publish(subject string, data []byte) error {
if c.id == "" {
return nil
}
if c.app.pubsub == nil {
return fmt.Errorf("pubsub not configured")
}
return c.app.pubsub.Publish(subject, data)
}
// Subscribe creates a subscription on the configured PubSub backend.
// The subscription is tracked for automatic cleanup when the context is disposed.
// Returns an error if no PubSub is configured. No-ops during panic-check init.
func (c *Context) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
if c.id == "" {
return nil, nil
}
if c.app.pubsub == nil {
return nil, fmt.Errorf("pubsub not configured")
}
sub, err := c.app.pubsub.Subscribe(subject, handler)
if err != nil {
return nil, err
}
// Track on page context for cleanup (components use parent, like signals/actions)
target := c
if c.isComponent() {
target = c.parentPageCtx
}
target.subsMu.Lock()
target.subscriptions = append(target.subscriptions, sub)
target.subsMu.Unlock()
return sub, nil
}
// unsubscribeAll cleans up all tracked subscriptions for this context and its components.
func (c *Context) unsubscribeAll() {
c.subsMu.Lock()
subs := c.subscriptions
c.subscriptions = nil
c.subsMu.Unlock()
for _, sub := range subs {
sub.Unsubscribe()
}
}
func newContext(id string, route string, v *V) *Context { func newContext(id string, route string, v *V) *Context {
if v == nil { if v == nil {
log.Fatal("create context failed: app pointer is nil") log.Fatal("create context failed: app pointer is nil")

View File

@@ -8,10 +8,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/ryanhamamura/via" "github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
) )
var ( var (
@@ -35,147 +35,26 @@ func (u *UserInfo) Avatar() h.H {
return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji)) return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji))
} }
// NATSChatroom manages NATS connections and per-context subscriptions
type NATSChatroom struct {
nc *nats.Conn
js nats.JetStreamContext
subs map[string]*nats.Subscription
mu sync.RWMutex
}
func NewNATSChatroom(nc *nats.Conn) (*NATSChatroom, error) {
js, err := nc.JetStream()
if err != nil {
return nil, err
}
// Create or update the CHAT stream for durability
_, err = js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000, // Keep last 1000 messages per room
MaxAge: 24 * time.Hour,
})
if err != nil && err != nats.ErrStreamNameAlreadyInUse {
// Stream might already exist, that's fine
log.Printf("Note: stream creation returned: %v", err)
}
return &NATSChatroom{
nc: nc,
js: js,
subs: make(map[string]*nats.Subscription),
}, nil
}
// Subscribe creates a subscription for a context to a room
func (chat *NATSChatroom) Subscribe(ctxID, room string, handler func(msg *ChatMessage)) error {
subject := "chat.room." + room
sub, err := chat.nc.Subscribe(subject, func(m *nats.Msg) {
var msg ChatMessage
if err := json.Unmarshal(m.Data, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
return
}
handler(&msg)
})
if err != nil {
return err
}
chat.mu.Lock()
// Clean up old subscription if exists
if old, exists := chat.subs[ctxID]; exists {
old.Unsubscribe()
}
chat.subs[ctxID] = sub
chat.mu.Unlock()
return nil
}
// Unsubscribe removes a context's subscription
func (chat *NATSChatroom) Unsubscribe(ctxID string) {
chat.mu.Lock()
defer chat.mu.Unlock()
if sub, exists := chat.subs[ctxID]; exists {
sub.Unsubscribe()
delete(chat.subs, ctxID)
}
}
// Publish sends a message to a room
func (chat *NATSChatroom) Publish(room string, msg ChatMessage) error {
subject := "chat.room." + room
data, err := json.Marshal(msg)
if err != nil {
return err
}
return chat.nc.Publish(subject, data)
}
// GetHistory retrieves recent messages from JetStream
func (chat *NATSChatroom) GetHistory(room string, limit int) ([]ChatMessage, error) {
subject := "chat.room." + room
// Create an ephemeral consumer to replay messages
sub, err := chat.js.SubscribeSync(subject, nats.DeliverLast())
if err != nil {
// No messages yet
return nil, nil
}
defer sub.Unsubscribe()
var messages []ChatMessage
for i := 0; i < limit; i++ {
msg, err := sub.NextMsg(100 * time.Millisecond)
if err != nil {
break
}
var chatMsg ChatMessage
if err := json.Unmarshal(msg.Data, &chatMsg); err == nil {
messages = append(messages, chatMsg)
}
}
return messages, nil
}
func (chat *NATSChatroom) Close() {
chat.mu.Lock()
for _, sub := range chat.subs {
sub.Unsubscribe()
}
chat.mu.Unlock()
chat.nc.Close()
}
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"} var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
func main() { func main() {
ctx := context.Background() ctx := context.Background()
// Start embedded NATS server (JetStream enabled by default) ps, err := vianats.New(ctx, "./data/nats")
ns, err := embeddednats.New(ctx,
embeddednats.WithDirectory("./data/nats"),
)
if err != nil { if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err) log.Fatalf("Failed to start embedded NATS: %v", err)
} }
ns.WaitForServer() defer ps.Close()
// Get client connection to embedded server // Create JetStream stream for message durability
nc, err := ns.Client() js := ps.JetStream()
if err != nil { js.AddStream(&nats.StreamConfig{
log.Fatalf("Failed to connect to embedded NATS: %v", err) Name: "CHAT",
} Subjects: []string{"chat.>"},
Retention: nats.LimitsPolicy,
chat, err := NewNATSChatroom(nc) MaxMsgs: 1000,
if err != nil { MaxAge: 24 * time.Hour,
log.Fatalf("Failed to initialize chatroom: %v", err) })
}
defer chat.Close()
v := via.New() v := via.New()
v.Config(via.Options{ v.Config(via.Options{
@@ -183,6 +62,7 @@ func main() {
DocumentTitle: "NATS Chat", DocumentTitle: "NATS Chat",
LogLvl: via.LogLevelInfo, LogLvl: via.LogLevelInfo,
ServerAddress: ":7331", ServerAddress: ":7331",
PubSub: ps,
}) })
v.AppendToHead( v.AppendToHead(
@@ -256,26 +136,30 @@ func main() {
roomSignal := c.Signal("Go") roomSignal := c.Signal("Go")
statement := c.Signal("") statement := c.Signal("")
// Local message cache for this context
var messages []ChatMessage var messages []ChatMessage
var messagesMu sync.Mutex var messagesMu sync.Mutex
currentRoom := "Go" currentRoom := "Go"
// Context ID for subscription management var currentSub via.Subscription
ctxID := randID()
// Subscribe to current room
subscribeToRoom := func(room string) { subscribeToRoom := func(room string) {
chat.Subscribe(ctxID, room, func(msg *ChatMessage) { if currentSub != nil {
currentSub.Unsubscribe()
}
sub, _ := c.Subscribe("chat.room."+room, func(data []byte) {
var msg ChatMessage
if err := json.Unmarshal(data, &msg); err != nil {
return
}
messagesMu.Lock() messagesMu.Lock()
messages = append(messages, *msg) messages = append(messages, msg)
// Keep only last 50 messages
if len(messages) > 50 { if len(messages) > 50 {
messages = messages[len(messages)-50:] messages = messages[len(messages)-50:]
} }
messagesMu.Unlock() messagesMu.Unlock()
c.Sync() c.Sync()
}) })
currentSub = sub
currentRoom = room currentRoom = room
} }
@@ -285,7 +169,7 @@ func main() {
newRoom := roomSignal.String() newRoom := roomSignal.String()
if newRoom != currentRoom { if newRoom != currentRoom {
messagesMu.Lock() messagesMu.Lock()
messages = nil // Clear messages for new room messages = nil
messagesMu.Unlock() messagesMu.Unlock()
subscribeToRoom(newRoom) subscribeToRoom(newRoom)
c.Sync() c.Sync()
@@ -299,15 +183,15 @@ func main() {
} }
statement.SetValue("") statement.SetValue("")
chat.Publish(currentRoom, ChatMessage{ data, _ := json.Marshal(ChatMessage{
User: currentUser, User: currentUser,
Message: msg, Message: msg,
Time: time.Now().UnixMilli(), Time: time.Now().UnixMilli(),
}) })
c.Publish("chat.room."+currentRoom, data)
}) })
c.View(func() h.H { c.View(func() h.H {
// Build room tabs
var tabs []h.H var tabs []h.H
for _, name := range roomNames { for _, name := range roomNames {
isCurrent := name == currentRoom isCurrent := name == currentRoom
@@ -320,7 +204,6 @@ func main() {
)) ))
} }
// Build message list
messagesMu.Lock() messagesMu.Lock()
chatHistoryChildren := []h.H{ chatHistoryChildren := []h.H{
h.Class("chat-history"), h.Class("chat-history"),
@@ -380,15 +263,6 @@ func randUser() UserInfo {
} }
} }
func randID() string {
const chars = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 8)
for i := range b {
b[i] = chars[rand.Intn(len(chars))]
}
return string(b)
}
var quoteIdx = rand.Intn(len(devQuotes)) var quoteIdx = rand.Intn(len(devQuotes))
var devQuotes = []string{ var devQuotes = []string{
"Just use NATS.", "Just use NATS.",

195
nats_test.go Normal file
View File

@@ -0,0 +1,195 @@
package via
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type mockHandler struct {
id int64
fn func([]byte)
active atomic.Bool
}
// mockPubSub implements PubSub for testing without NATS.
type mockPubSub struct {
mu sync.Mutex
subs map[string][]*mockHandler
nextID atomic.Int64
}
func newMockPubSub() *mockPubSub {
return &mockPubSub{subs: make(map[string][]*mockHandler)}
}
func (m *mockPubSub) Publish(subject string, data []byte) error {
m.mu.Lock()
handlers := make([]*mockHandler, len(m.subs[subject]))
copy(handlers, m.subs[subject])
m.mu.Unlock()
for _, h := range handlers {
if h.active.Load() {
h.fn(data)
}
}
return nil
}
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
m.mu.Lock()
defer m.mu.Unlock()
mh := &mockHandler{
id: m.nextID.Add(1),
fn: handler,
}
mh.active.Store(true)
m.subs[subject] = append(m.subs[subject], mh)
return &mockSub{handler: mh}, nil
}
func (m *mockPubSub) Close() error { return nil }
type mockSub struct {
handler *mockHandler
}
func (s *mockSub) Unsubscribe() error {
s.handler.active.Store(false)
return nil
}
func TestPubSub_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var received []byte
var wg sync.WaitGroup
wg.Add(1)
c := newContext("test-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := c.Subscribe("test.topic", func(data []byte) {
received = data
wg.Done()
})
require.NoError(t, err)
err = c.Publish("test.topic", []byte("hello"))
require.NoError(t, err)
wg.Wait()
assert.Equal(t, []byte("hello"), received)
}
func TestPubSub_MultipleSubscribers(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
var mu sync.Mutex
var results []string
var wg sync.WaitGroup
wg.Add(2)
c1 := newContext("ctx-1", "/", v)
c1.View(func() h.H { return h.Div() })
c2 := newContext("ctx-2", "/", v)
c2.View(func() h.H { return h.Div() })
c1.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c1:"+string(data))
mu.Unlock()
wg.Done()
})
c2.Subscribe("broadcast", func(data []byte) {
mu.Lock()
results = append(results, "c2:"+string(data))
mu.Unlock()
wg.Done()
})
c1.Publish("broadcast", []byte("msg"))
wg.Wait()
assert.Len(t, results, 2)
assert.Contains(t, results, "c1:msg")
assert.Contains(t, results, "c2:msg")
}
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("cleanup-ctx", "/", v)
c.View(func() h.H { return h.Div() })
c.Subscribe("room.1", func(data []byte) {})
c.Subscribe("room.2", func(data []byte) {})
assert.Len(t, c.subscriptions, 2)
c.unsubscribeAll()
assert.Empty(t, c.subscriptions)
}
func TestPubSub_ManualUnsubscribe(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
c := newContext("unsub-ctx", "/", v)
c.View(func() h.H { return h.Div() })
called := false
sub, err := c.Subscribe("topic", func(data []byte) {
called = true
})
require.NoError(t, err)
sub.Unsubscribe()
c.Publish("topic", []byte("ignored"))
time.Sleep(10 * time.Millisecond)
assert.False(t, called)
}
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
v := New()
c := newContext("noop-ctx", "/", v)
c.View(func() h.H { return h.Div() })
err := c.Publish("topic", []byte("data"))
assert.Error(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.Error(t, err)
assert.Nil(t, sub)
}
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
// Panic-check context has id=""
c := newContext("", "/", v)
err := c.Publish("topic", []byte("data"))
assert.NoError(t, err)
sub, err := c.Subscribe("topic", func(data []byte) {})
assert.NoError(t, err)
assert.Nil(t, sub)
}

14
pubsub.go Normal file
View File

@@ -0,0 +1,14 @@
package via
// PubSub is an interface for publish/subscribe messaging backends.
// The vianats sub-package provides an embedded NATS implementation.
type PubSub interface {
Publish(subject string, data []byte) error
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
Close() error
}
// Subscription represents an active subscription that can be manually unsubscribed.
type Subscription interface {
Unsubscribe() error
}

5
via.go
View File

@@ -40,6 +40,7 @@ type V struct {
documentFootIncludes []h.H documentFootIncludes []h.H
devModePageInitFnMap map[string]func(*Context) devModePageInitFnMap map[string]func(*Context)
sessionManager *scs.SessionManager sessionManager *scs.SessionManager
pubsub PubSub
datastarPath string datastarPath string
datastarContent []byte datastarContent []byte
datastarOnce sync.Once datastarOnce sync.Once
@@ -117,6 +118,9 @@ func (v *V) Config(cfg Options) {
if cfg.DatastarPath != "" { if cfg.DatastarPath != "" {
v.datastarPath = cfg.DatastarPath v.datastarPath = cfg.DatastarPath
} }
if cfg.PubSub != nil {
v.pubsub = cfg.PubSub
}
} }
// AppendToHead appends the given h.H nodes to the head of the base HTML document. // AppendToHead appends the given h.H nodes to the head of the base HTML document.
@@ -525,6 +529,7 @@ func New() *V {
v.logErr(c, "failed to handle session close: %v", err) v.logErr(c, "failed to handle session close: %v", err)
return return
} }
c.unsubscribeAll()
c.stopAllRoutines() c.stopAllRoutines()
v.logDebug(c, "session close event triggered") v.logDebug(c, "session close event triggered")
if v.cfg.DevMode { if v.cfg.DevMode {

78
vianats/vianats.go Normal file
View File

@@ -0,0 +1,78 @@
// Package vianats provides an embedded NATS server with JetStream as a
// pub/sub backend for Via applications.
package vianats
import (
"context"
"fmt"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
)
// NATS implements via.PubSub using an embedded NATS server with JetStream.
type NATS struct {
server *embeddednats.Server
nc *nats.Conn
js nats.JetStreamContext
}
// New starts an embedded NATS server with JetStream enabled and returns a
// ready-to-use NATS instance. The server stores data in dataDir and shuts
// down when ctx is cancelled.
func New(ctx context.Context, dataDir string) (*NATS, error) {
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
if err != nil {
return nil, fmt.Errorf("vianats: start server: %w", err)
}
ns.WaitForServer()
nc, err := ns.Client()
if err != nil {
ns.Close()
return nil, fmt.Errorf("vianats: connect client: %w", err)
}
js, err := nc.JetStream()
if err != nil {
nc.Close()
ns.Close()
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
}
return &NATS{server: ns, nc: nc, js: js}, nil
}
// Publish sends data to the given subject using core NATS publish.
// JetStream captures messages automatically if a matching stream exists.
func (n *NATS) Publish(subject string, data []byte) error {
return n.nc.Publish(subject, data)
}
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
handler(msg.Data)
})
if err != nil {
return nil, err
}
return sub, nil
}
// Close shuts down the client connection and embedded server.
func (n *NATS) Close() error {
n.nc.Close()
return n.server.Close()
}
// Conn returns the underlying NATS connection for advanced usage.
func (n *NATS) Conn() *nats.Conn {
return n.nc
}
// JetStream returns the JetStream context for stream configuration and replay.
func (n *NATS) JetStream() nats.JetStreamContext {
return n.js
}