diff --git a/configuration.go b/configuration.go index 75923d2..999da00 100644 --- a/configuration.go +++ b/configuration.go @@ -45,4 +45,8 @@ type Options struct { // DatastarPath is the URL path where the script is served. // Defaults to "/_datastar.js" if empty. DatastarPath string + + // PubSub enables publish/subscribe messaging. Use vianats.New() for an + // embedded NATS backend, or supply any PubSub implementation. + PubSub PubSub } diff --git a/context.go b/context.go index 152f752..77b4d2b 100644 --- a/context.go +++ b/context.go @@ -31,6 +31,8 @@ type Context struct { mu sync.RWMutex ctxDisposedChan chan struct{} reqCtx context.Context + subscriptions []Subscription + subsMu sync.Mutex } // View defines the UI rendered by this context. @@ -403,6 +405,55 @@ func (c *Context) Session() *Session { } } +// Publish sends data to the given subject via the configured PubSub backend. +// Returns an error if no PubSub is configured. No-ops during panic-check init. +func (c *Context) Publish(subject string, data []byte) error { + if c.id == "" { + return nil + } + if c.app.pubsub == nil { + return fmt.Errorf("pubsub not configured") + } + return c.app.pubsub.Publish(subject, data) +} + +// Subscribe creates a subscription on the configured PubSub backend. +// The subscription is tracked for automatic cleanup when the context is disposed. +// Returns an error if no PubSub is configured. No-ops during panic-check init. +func (c *Context) Subscribe(subject string, handler func(data []byte)) (Subscription, error) { + if c.id == "" { + return nil, nil + } + if c.app.pubsub == nil { + return nil, fmt.Errorf("pubsub not configured") + } + sub, err := c.app.pubsub.Subscribe(subject, handler) + if err != nil { + return nil, err + } + + // Track on page context for cleanup (components use parent, like signals/actions) + target := c + if c.isComponent() { + target = c.parentPageCtx + } + target.subsMu.Lock() + target.subscriptions = append(target.subscriptions, sub) + target.subsMu.Unlock() + return sub, nil +} + +// unsubscribeAll cleans up all tracked subscriptions for this context and its components. +func (c *Context) unsubscribeAll() { + c.subsMu.Lock() + subs := c.subscriptions + c.subscriptions = nil + c.subsMu.Unlock() + for _, sub := range subs { + sub.Unsubscribe() + } +} + func newContext(id string, route string, v *V) *Context { if v == nil { log.Fatal("create context failed: app pointer is nil") diff --git a/internal/examples/nats-chatroom/main.go b/internal/examples/nats-chatroom/main.go index bed5bf0..5a00780 100644 --- a/internal/examples/nats-chatroom/main.go +++ b/internal/examples/nats-chatroom/main.go @@ -8,10 +8,10 @@ import ( "sync" "time" - "github.com/delaneyj/toolbelt/embeddednats" "github.com/nats-io/nats.go" "github.com/ryanhamamura/via" "github.com/ryanhamamura/via/h" + "github.com/ryanhamamura/via/vianats" ) var ( @@ -35,147 +35,26 @@ func (u *UserInfo) Avatar() h.H { return h.Div(h.Class("avatar"), h.Attr("title", u.Name), h.Text(u.Emoji)) } -// NATSChatroom manages NATS connections and per-context subscriptions -type NATSChatroom struct { - nc *nats.Conn - js nats.JetStreamContext - subs map[string]*nats.Subscription - mu sync.RWMutex -} - -func NewNATSChatroom(nc *nats.Conn) (*NATSChatroom, error) { - js, err := nc.JetStream() - if err != nil { - return nil, err - } - - // Create or update the CHAT stream for durability - _, err = js.AddStream(&nats.StreamConfig{ - Name: "CHAT", - Subjects: []string{"chat.>"}, - Retention: nats.LimitsPolicy, - MaxMsgs: 1000, // Keep last 1000 messages per room - MaxAge: 24 * time.Hour, - }) - if err != nil && err != nats.ErrStreamNameAlreadyInUse { - // Stream might already exist, that's fine - log.Printf("Note: stream creation returned: %v", err) - } - - return &NATSChatroom{ - nc: nc, - js: js, - subs: make(map[string]*nats.Subscription), - }, nil -} - -// Subscribe creates a subscription for a context to a room -func (chat *NATSChatroom) Subscribe(ctxID, room string, handler func(msg *ChatMessage)) error { - subject := "chat.room." + room - - sub, err := chat.nc.Subscribe(subject, func(m *nats.Msg) { - var msg ChatMessage - if err := json.Unmarshal(m.Data, &msg); err != nil { - log.Printf("Failed to unmarshal message: %v", err) - return - } - handler(&msg) - }) - if err != nil { - return err - } - - chat.mu.Lock() - // Clean up old subscription if exists - if old, exists := chat.subs[ctxID]; exists { - old.Unsubscribe() - } - chat.subs[ctxID] = sub - chat.mu.Unlock() - - return nil -} - -// Unsubscribe removes a context's subscription -func (chat *NATSChatroom) Unsubscribe(ctxID string) { - chat.mu.Lock() - defer chat.mu.Unlock() - if sub, exists := chat.subs[ctxID]; exists { - sub.Unsubscribe() - delete(chat.subs, ctxID) - } -} - -// Publish sends a message to a room -func (chat *NATSChatroom) Publish(room string, msg ChatMessage) error { - subject := "chat.room." + room - data, err := json.Marshal(msg) - if err != nil { - return err - } - return chat.nc.Publish(subject, data) -} - -// GetHistory retrieves recent messages from JetStream -func (chat *NATSChatroom) GetHistory(room string, limit int) ([]ChatMessage, error) { - subject := "chat.room." + room - - // Create an ephemeral consumer to replay messages - sub, err := chat.js.SubscribeSync(subject, nats.DeliverLast()) - if err != nil { - // No messages yet - return nil, nil - } - defer sub.Unsubscribe() - - var messages []ChatMessage - for i := 0; i < limit; i++ { - msg, err := sub.NextMsg(100 * time.Millisecond) - if err != nil { - break - } - var chatMsg ChatMessage - if err := json.Unmarshal(msg.Data, &chatMsg); err == nil { - messages = append(messages, chatMsg) - } - } - return messages, nil -} - -func (chat *NATSChatroom) Close() { - chat.mu.Lock() - for _, sub := range chat.subs { - sub.Unsubscribe() - } - chat.mu.Unlock() - chat.nc.Close() -} - var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"} func main() { ctx := context.Background() - // Start embedded NATS server (JetStream enabled by default) - ns, err := embeddednats.New(ctx, - embeddednats.WithDirectory("./data/nats"), - ) + ps, err := vianats.New(ctx, "./data/nats") if err != nil { log.Fatalf("Failed to start embedded NATS: %v", err) } - ns.WaitForServer() + defer ps.Close() - // Get client connection to embedded server - nc, err := ns.Client() - if err != nil { - log.Fatalf("Failed to connect to embedded NATS: %v", err) - } - - chat, err := NewNATSChatroom(nc) - if err != nil { - log.Fatalf("Failed to initialize chatroom: %v", err) - } - defer chat.Close() + // Create JetStream stream for message durability + js := ps.JetStream() + js.AddStream(&nats.StreamConfig{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + Retention: nats.LimitsPolicy, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }) v := via.New() v.Config(via.Options{ @@ -183,6 +62,7 @@ func main() { DocumentTitle: "NATS Chat", LogLvl: via.LogLevelInfo, ServerAddress: ":7331", + PubSub: ps, }) v.AppendToHead( @@ -256,26 +136,30 @@ func main() { roomSignal := c.Signal("Go") statement := c.Signal("") - // Local message cache for this context var messages []ChatMessage var messagesMu sync.Mutex currentRoom := "Go" - // Context ID for subscription management - ctxID := randID() + var currentSub via.Subscription - // Subscribe to current room subscribeToRoom := func(room string) { - chat.Subscribe(ctxID, room, func(msg *ChatMessage) { + if currentSub != nil { + currentSub.Unsubscribe() + } + sub, _ := c.Subscribe("chat.room."+room, func(data []byte) { + var msg ChatMessage + if err := json.Unmarshal(data, &msg); err != nil { + return + } messagesMu.Lock() - messages = append(messages, *msg) - // Keep only last 50 messages + messages = append(messages, msg) if len(messages) > 50 { messages = messages[len(messages)-50:] } messagesMu.Unlock() c.Sync() }) + currentSub = sub currentRoom = room } @@ -285,7 +169,7 @@ func main() { newRoom := roomSignal.String() if newRoom != currentRoom { messagesMu.Lock() - messages = nil // Clear messages for new room + messages = nil messagesMu.Unlock() subscribeToRoom(newRoom) c.Sync() @@ -299,15 +183,15 @@ func main() { } statement.SetValue("") - chat.Publish(currentRoom, ChatMessage{ + data, _ := json.Marshal(ChatMessage{ User: currentUser, Message: msg, Time: time.Now().UnixMilli(), }) + c.Publish("chat.room."+currentRoom, data) }) c.View(func() h.H { - // Build room tabs var tabs []h.H for _, name := range roomNames { isCurrent := name == currentRoom @@ -320,7 +204,6 @@ func main() { )) } - // Build message list messagesMu.Lock() chatHistoryChildren := []h.H{ h.Class("chat-history"), @@ -380,15 +263,6 @@ func randUser() UserInfo { } } -func randID() string { - const chars = "abcdefghijklmnopqrstuvwxyz0123456789" - b := make([]byte, 8) - for i := range b { - b[i] = chars[rand.Intn(len(chars))] - } - return string(b) -} - var quoteIdx = rand.Intn(len(devQuotes)) var devQuotes = []string{ "Just use NATS.", diff --git a/nats_test.go b/nats_test.go new file mode 100644 index 0000000..87f486d --- /dev/null +++ b/nats_test.go @@ -0,0 +1,195 @@ +package via + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ryanhamamura/via/h" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockHandler struct { + id int64 + fn func([]byte) + active atomic.Bool +} + +// mockPubSub implements PubSub for testing without NATS. +type mockPubSub struct { + mu sync.Mutex + subs map[string][]*mockHandler + nextID atomic.Int64 +} + +func newMockPubSub() *mockPubSub { + return &mockPubSub{subs: make(map[string][]*mockHandler)} +} + +func (m *mockPubSub) Publish(subject string, data []byte) error { + m.mu.Lock() + handlers := make([]*mockHandler, len(m.subs[subject])) + copy(handlers, m.subs[subject]) + m.mu.Unlock() + for _, h := range handlers { + if h.active.Load() { + h.fn(data) + } + } + return nil +} + +func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) { + m.mu.Lock() + defer m.mu.Unlock() + mh := &mockHandler{ + id: m.nextID.Add(1), + fn: handler, + } + mh.active.Store(true) + m.subs[subject] = append(m.subs[subject], mh) + return &mockSub{handler: mh}, nil +} + +func (m *mockPubSub) Close() error { return nil } + +type mockSub struct { + handler *mockHandler +} + +func (s *mockSub) Unsubscribe() error { + s.handler.active.Store(false) + return nil +} + +func TestPubSub_RoundTrip(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + var received []byte + var wg sync.WaitGroup + wg.Add(1) + + c := newContext("test-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + _, err := c.Subscribe("test.topic", func(data []byte) { + received = data + wg.Done() + }) + require.NoError(t, err) + + err = c.Publish("test.topic", []byte("hello")) + require.NoError(t, err) + + wg.Wait() + assert.Equal(t, []byte("hello"), received) +} + +func TestPubSub_MultipleSubscribers(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + var mu sync.Mutex + var results []string + var wg sync.WaitGroup + wg.Add(2) + + c1 := newContext("ctx-1", "/", v) + c1.View(func() h.H { return h.Div() }) + c2 := newContext("ctx-2", "/", v) + c2.View(func() h.H { return h.Div() }) + + c1.Subscribe("broadcast", func(data []byte) { + mu.Lock() + results = append(results, "c1:"+string(data)) + mu.Unlock() + wg.Done() + }) + + c2.Subscribe("broadcast", func(data []byte) { + mu.Lock() + results = append(results, "c2:"+string(data)) + mu.Unlock() + wg.Done() + }) + + c1.Publish("broadcast", []byte("msg")) + wg.Wait() + + assert.Len(t, results, 2) + assert.Contains(t, results, "c1:msg") + assert.Contains(t, results, "c2:msg") +} + +func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + c := newContext("cleanup-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + c.Subscribe("room.1", func(data []byte) {}) + c.Subscribe("room.2", func(data []byte) {}) + + assert.Len(t, c.subscriptions, 2) + + c.unsubscribeAll() + assert.Empty(t, c.subscriptions) +} + +func TestPubSub_ManualUnsubscribe(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + c := newContext("unsub-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + called := false + sub, err := c.Subscribe("topic", func(data []byte) { + called = true + }) + require.NoError(t, err) + + sub.Unsubscribe() + + c.Publish("topic", []byte("ignored")) + time.Sleep(10 * time.Millisecond) + assert.False(t, called) +} + +func TestPubSub_NoOpWhenNotConfigured(t *testing.T) { + v := New() + + c := newContext("noop-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + err := c.Publish("topic", []byte("data")) + assert.Error(t, err) + + sub, err := c.Subscribe("topic", func(data []byte) {}) + assert.Error(t, err) + assert.Nil(t, sub) +} + +func TestPubSub_NoOpDuringPanicCheck(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + // Panic-check context has id="" + c := newContext("", "/", v) + + err := c.Publish("topic", []byte("data")) + assert.NoError(t, err) + + sub, err := c.Subscribe("topic", func(data []byte) {}) + assert.NoError(t, err) + assert.Nil(t, sub) +} diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 0000000..c1a3c35 --- /dev/null +++ b/pubsub.go @@ -0,0 +1,14 @@ +package via + +// PubSub is an interface for publish/subscribe messaging backends. +// The vianats sub-package provides an embedded NATS implementation. +type PubSub interface { + Publish(subject string, data []byte) error + Subscribe(subject string, handler func(data []byte)) (Subscription, error) + Close() error +} + +// Subscription represents an active subscription that can be manually unsubscribed. +type Subscription interface { + Unsubscribe() error +} diff --git a/via.go b/via.go index 516f339..2f03f08 100644 --- a/via.go +++ b/via.go @@ -40,6 +40,7 @@ type V struct { documentFootIncludes []h.H devModePageInitFnMap map[string]func(*Context) sessionManager *scs.SessionManager + pubsub PubSub datastarPath string datastarContent []byte datastarOnce sync.Once @@ -117,6 +118,9 @@ func (v *V) Config(cfg Options) { if cfg.DatastarPath != "" { v.datastarPath = cfg.DatastarPath } + if cfg.PubSub != nil { + v.pubsub = cfg.PubSub + } } // AppendToHead appends the given h.H nodes to the head of the base HTML document. @@ -525,6 +529,7 @@ func New() *V { v.logErr(c, "failed to handle session close: %v", err) return } + c.unsubscribeAll() c.stopAllRoutines() v.logDebug(c, "session close event triggered") if v.cfg.DevMode { diff --git a/vianats/vianats.go b/vianats/vianats.go new file mode 100644 index 0000000..9809577 --- /dev/null +++ b/vianats/vianats.go @@ -0,0 +1,78 @@ +// Package vianats provides an embedded NATS server with JetStream as a +// pub/sub backend for Via applications. +package vianats + +import ( + "context" + "fmt" + + "github.com/delaneyj/toolbelt/embeddednats" + "github.com/nats-io/nats.go" + "github.com/ryanhamamura/via" +) + +// NATS implements via.PubSub using an embedded NATS server with JetStream. +type NATS struct { + server *embeddednats.Server + nc *nats.Conn + js nats.JetStreamContext +} + +// New starts an embedded NATS server with JetStream enabled and returns a +// ready-to-use NATS instance. The server stores data in dataDir and shuts +// down when ctx is cancelled. +func New(ctx context.Context, dataDir string) (*NATS, error) { + ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir)) + if err != nil { + return nil, fmt.Errorf("vianats: start server: %w", err) + } + ns.WaitForServer() + + nc, err := ns.Client() + if err != nil { + ns.Close() + return nil, fmt.Errorf("vianats: connect client: %w", err) + } + + js, err := nc.JetStream() + if err != nil { + nc.Close() + ns.Close() + return nil, fmt.Errorf("vianats: init jetstream: %w", err) + } + + return &NATS{server: ns, nc: nc, js: js}, nil +} + +// Publish sends data to the given subject using core NATS publish. +// JetStream captures messages automatically if a matching stream exists. +func (n *NATS) Publish(subject string, data []byte) error { + return n.nc.Publish(subject, data) +} + +// Subscribe creates a core NATS subscription for real-time fan-out delivery. +func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) { + sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) { + handler(msg.Data) + }) + if err != nil { + return nil, err + } + return sub, nil +} + +// Close shuts down the client connection and embedded server. +func (n *NATS) Close() error { + n.nc.Close() + return n.server.Close() +} + +// Conn returns the underlying NATS connection for advanced usage. +func (n *NATS) Conn() *nats.Conn { + return n.nc +} + +// JetStream returns the JetStream context for stream configuration and replay. +func (n *NATS) JetStream() nats.JetStreamContext { + return n.js +}