From 2c44671d0e28e050b24b555b3d916098834cf349 Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Fri, 6 Feb 2026 09:47:39 -1000 Subject: [PATCH] feat: add generic pub/sub helpers and pubsub-crud example Add typed Publish[T] and Subscribe[T] generic helpers that handle JSON marshaling, along with vianats.EnsureStream and ReplayHistory helpers. Refactor nats-chatroom to use the new APIs. Add pubsub-crud example demonstrating CRUD operations with DaisyUI toast notifications broadcast to all connected clients via NATS. --- internal/examples/nats-chatroom/main.go | 47 ++-- internal/examples/pubsub-crud/main.go | 284 ++++++++++++++++++++++++ pubsub_helpers.go | 23 ++ pubsub_helpers_test.go | 66 ++++++ vianats/vianats.go | 49 ++++ 5 files changed, 436 insertions(+), 33 deletions(-) create mode 100644 internal/examples/pubsub-crud/main.go create mode 100644 pubsub_helpers.go create mode 100644 pubsub_helpers_test.go diff --git a/internal/examples/nats-chatroom/main.go b/internal/examples/nats-chatroom/main.go index f5c5879..e632b83 100644 --- a/internal/examples/nats-chatroom/main.go +++ b/internal/examples/nats-chatroom/main.go @@ -2,13 +2,11 @@ package main import ( "context" - "encoding/json" "log" "math/rand" "sync" "time" - "github.com/nats-io/nats.go" "github.com/ryanhamamura/via" "github.com/ryanhamamura/via/h" "github.com/ryanhamamura/via/vianats" @@ -46,15 +44,15 @@ func main() { } defer ps.Close() - // Create JetStream stream for message durability - js := ps.JetStream() - js.AddStream(&nats.StreamConfig{ - Name: "CHAT", - Subjects: []string{"chat.>"}, - Retention: nats.LimitsPolicy, - MaxMsgs: 1000, - MaxAge: 24 * time.Hour, + err = vianats.EnsureStream(ps, vianats.StreamConfig{ + Name: "CHAT", + Subjects: []string{"chat.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, }) + if err != nil { + log.Fatalf("Failed to ensure stream: %v", err) + } v := via.New() v.Config(via.Options{ @@ -147,30 +145,14 @@ func main() { currentSub.Unsubscribe() } - // Replay history from JetStream before subscribing for real-time subject := "chat.room." + room - if hist, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer()); err == nil { - for { - msg, err := hist.NextMsg(200 * time.Millisecond) - if err != nil { - break - } - var chatMsg ChatMessage - if json.Unmarshal(msg.Data, &chatMsg) == nil { - messages = append(messages, chatMsg) - } - } - hist.Unsubscribe() - if len(messages) > 50 { - messages = messages[len(messages)-50:] - } + + // Replay history from JetStream + if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil { + messages = hist } - sub, _ := c.Subscribe(subject, func(data []byte) { - var msg ChatMessage - if err := json.Unmarshal(data, &msg); err != nil { - return - } + sub, _ := via.Subscribe(c, subject, func(msg ChatMessage) { messagesMu.Lock() messages = append(messages, msg) if len(messages) > 50 { @@ -203,12 +185,11 @@ func main() { } statement.SetValue("") - data, _ := json.Marshal(ChatMessage{ + via.Publish(c, "chat.room."+currentRoom, ChatMessage{ User: currentUser, Message: msg, Time: time.Now().UnixMilli(), }) - c.Publish("chat.room."+currentRoom, data) }) c.View(func() h.H { diff --git a/internal/examples/pubsub-crud/main.go b/internal/examples/pubsub-crud/main.go new file mode 100644 index 0000000..df5b1c5 --- /dev/null +++ b/internal/examples/pubsub-crud/main.go @@ -0,0 +1,284 @@ +package main + +import ( + "context" + "crypto/rand" + "fmt" + "html" + "log" + "sync" + "time" + + "github.com/ryanhamamura/via" + "github.com/ryanhamamura/via/h" + "github.com/ryanhamamura/via/vianats" +) + +var WithSignal = via.WithSignal + +type Bookmark struct { + ID string + Title string + URL string +} + +type CRUDEvent struct { + Action string `json:"action"` + Title string `json:"title"` + UserID string `json:"user_id"` +} + +var ( + bookmarks []Bookmark + bookmarksMu sync.RWMutex +) + +func randomHex(n int) string { + b := make([]byte, n) + rand.Read(b) + return fmt.Sprintf("%x", b) +} + +func findBookmark(id string) (Bookmark, int) { + for i, bm := range bookmarks { + if bm.ID == id { + return bm, i + } + } + return Bookmark{}, -1 +} + +func main() { + ctx := context.Background() + + ps, err := vianats.New(ctx, "./data/nats") + if err != nil { + log.Fatalf("Failed to start embedded NATS: %v", err) + } + defer ps.Close() + + err = vianats.EnsureStream(ps, vianats.StreamConfig{ + Name: "BOOKMARKS", + Subjects: []string{"bookmarks.>"}, + MaxMsgs: 1000, + MaxAge: 24 * time.Hour, + }) + if err != nil { + log.Fatalf("Failed to ensure stream: %v", err) + } + + v := via.New() + v.Config(via.Options{ + DevMode: true, + DocumentTitle: "Bookmarks", + LogLevel: via.LogLevelInfo, + ServerAddress: ":7331", + PubSub: ps, + }) + + v.AppendToHead( + h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")), + h.Script(h.Src("https://cdn.tailwindcss.com")), + ) + + v.Page("/", func(c *via.Context) { + userID := randomHex(8) + + titleSignal := c.Signal("") + urlSignal := c.Signal("") + targetIDSignal := c.Signal("") + + via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) { + if evt.UserID == userID { + return + } + safeTitle := html.EscapeString(evt.Title) + var alertClass string + switch evt.Action { + case "created": + alertClass = "alert-success" + case "updated": + alertClass = "alert-info" + case "deleted": + alertClass = "alert-error" + } + c.ExecScript(fmt.Sprintf(`(function(){ + var tc = document.getElementById('toast-container'); + if (!tc) return; + var d = document.createElement('div'); + d.className = 'alert %s'; + d.innerHTML = 'Bookmark "%s" %s'; + tc.appendChild(d); + setTimeout(function(){ d.remove(); }, 3000); + })()`, alertClass, safeTitle, evt.Action)) + c.Sync() + }) + + save := c.Action(func() { + title := titleSignal.String() + url := urlSignal.String() + if title == "" || url == "" { + return + } + + targetID := targetIDSignal.String() + action := "created" + + bookmarksMu.Lock() + if targetID != "" { + if _, idx := findBookmark(targetID); idx >= 0 { + bookmarks[idx].Title = title + bookmarks[idx].URL = url + action = "updated" + } + } else { + bookmarks = append(bookmarks, Bookmark{ + ID: randomHex(8), + Title: title, + URL: url, + }) + } + bookmarksMu.Unlock() + + titleSignal.SetValue("") + urlSignal.SetValue("") + targetIDSignal.SetValue("") + + via.Publish(c, "bookmarks.events", CRUDEvent{ + Action: action, + Title: title, + UserID: userID, + }) + c.Sync() + }) + + edit := c.Action(func() { + id := targetIDSignal.String() + bookmarksMu.RLock() + bm, idx := findBookmark(id) + bookmarksMu.RUnlock() + if idx < 0 { + return + } + titleSignal.SetValue(bm.Title) + urlSignal.SetValue(bm.URL) + }) + + del := c.Action(func() { + id := targetIDSignal.String() + bookmarksMu.Lock() + bm, idx := findBookmark(id) + if idx >= 0 { + bookmarks = append(bookmarks[:idx], bookmarks[idx+1:]...) + } + bookmarksMu.Unlock() + if idx < 0 { + return + } + + targetIDSignal.SetValue("") + + via.Publish(c, "bookmarks.events", CRUDEvent{ + Action: "deleted", + Title: bm.Title, + UserID: userID, + }) + c.Sync() + }) + + cancelEdit := c.Action(func() { + titleSignal.SetValue("") + urlSignal.SetValue("") + targetIDSignal.SetValue("") + }) + + c.View(func() h.H { + isEditing := targetIDSignal.String() != "" + + // Build table rows + bookmarksMu.RLock() + var rows []h.H + for _, bm := range bookmarks { + rows = append(rows, h.Tr( + h.Td(h.Text(bm.Title)), + h.Td(h.A(h.Href(bm.URL), h.Attr("target", "_blank"), h.Class("link link-primary"), h.Text(bm.URL))), + h.Td( + h.Div(h.Class("flex gap-1"), + h.Button(h.Class("btn btn-xs btn-ghost"), h.Text("Edit"), + edit.OnClick(WithSignal(targetIDSignal, bm.ID)), + ), + h.Button(h.Class("btn btn-xs btn-ghost text-error"), h.Text("Delete"), + del.OnClick(WithSignal(targetIDSignal, bm.ID)), + ), + ), + ), + )) + } + bookmarksMu.RUnlock() + + saveLabel := "Add Bookmark" + if isEditing { + saveLabel = "Update Bookmark" + } + + return h.Div(h.Class("min-h-screen bg-base-200"), + // Navbar + h.Div(h.Class("navbar bg-base-100 shadow-sm"), + h.Div(h.Class("flex-1"), + h.A(h.Class("btn btn-ghost text-xl"), h.Text("Bookmarks")), + ), + h.Div(h.Class("flex-none"), + h.Div(h.Class("badge badge-outline"), h.Text(userID[:8])), + ), + ), + + h.Div(h.Class("container mx-auto p-4 max-w-3xl flex flex-col gap-4"), + // Form card + h.Div(h.Class("card bg-base-100 shadow"), + h.Div(h.Class("card-body"), + h.H2(h.Class("card-title"), h.Text(saveLabel)), + h.Div(h.Class("flex flex-col gap-2"), + h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()), + h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()), + h.Div(h.Class("card-actions justify-end"), + h.If(isEditing, + h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()), + ), + h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()), + ), + ), + ), + ), + + // Table card + h.Div(h.Class("card bg-base-100 shadow"), + h.Div(h.Class("card-body"), + h.H2(h.Class("card-title"), h.Text("All Bookmarks")), + h.If(len(rows) == 0, + h.P(h.Class("text-base-content/60"), h.Text("No bookmarks yet. Add one above!")), + ), + h.If(len(rows) > 0, + h.Div(h.Class("overflow-x-auto"), + h.Table(h.Class("table"), + h.THead(h.Tr( + h.Th(h.Text("Title")), + h.Th(h.Text("URL")), + h.Th(h.Text("Actions")), + )), + h.TBody(rows...), + ), + ), + ), + ), + ), + ), + + // Toast container — ignored by morph so Sync() doesn't wipe active toasts + h.Div(h.ID("toast-container"), h.Class("toast toast-end toast-top"), h.DataIgnoreMorph()), + ) + }) + }) + + log.Println("Starting pubsub-crud example on :7331") + v.Start() +} diff --git a/pubsub_helpers.go b/pubsub_helpers.go new file mode 100644 index 0000000..265b5fd --- /dev/null +++ b/pubsub_helpers.go @@ -0,0 +1,23 @@ +package via + +import "encoding/json" + +// Publish JSON-marshals msg and publishes to subject. +func Publish[T any](c *Context, subject string, msg T) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + return c.Publish(subject, data) +} + +// Subscribe JSON-unmarshals each message as T and calls handler. +func Subscribe[T any](c *Context, subject string, handler func(T)) (Subscription, error) { + return c.Subscribe(subject, func(data []byte) { + var msg T + if err := json.Unmarshal(data, &msg); err != nil { + return + } + handler(msg) + }) +} diff --git a/pubsub_helpers_test.go b/pubsub_helpers_test.go new file mode 100644 index 0000000..9a18687 --- /dev/null +++ b/pubsub_helpers_test.go @@ -0,0 +1,66 @@ +package via + +import ( + "sync" + "testing" + + "github.com/ryanhamamura/via/h" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPublishSubscribe_RoundTrip(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + type event struct { + Name string `json:"name"` + Count int `json:"count"` + } + + var got event + var wg sync.WaitGroup + wg.Add(1) + + c := newContext("typed-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + _, err := Subscribe(c, "events", func(e event) { + got = e + wg.Done() + }) + require.NoError(t, err) + + err = Publish(c, "events", event{Name: "click", Count: 42}) + require.NoError(t, err) + + wg.Wait() + assert.Equal(t, "click", got.Name) + assert.Equal(t, 42, got.Count) +} + +func TestSubscribe_SkipsBadJSON(t *testing.T) { + ps := newMockPubSub() + v := New() + v.Config(Options{PubSub: ps}) + + type msg struct { + Text string `json:"text"` + } + + called := false + c := newContext("bad-json-ctx", "/", v) + c.View(func() h.H { return h.Div() }) + + _, err := Subscribe(c, "topic", func(m msg) { + called = true + }) + require.NoError(t, err) + + // Publish raw invalid JSON — handler should silently skip + err = c.Publish("topic", []byte("not json")) + require.NoError(t, err) + + assert.False(t, called) +} diff --git a/vianats/vianats.go b/vianats/vianats.go index 9809577..fde133a 100644 --- a/vianats/vianats.go +++ b/vianats/vianats.go @@ -4,7 +4,9 @@ package vianats import ( "context" + "encoding/json" "fmt" + "time" "github.com/delaneyj/toolbelt/embeddednats" "github.com/nats-io/nats.go" @@ -76,3 +78,50 @@ func (n *NATS) Conn() *nats.Conn { func (n *NATS) JetStream() nats.JetStreamContext { return n.js } + +// StreamConfig holds the parameters for creating or updating a JetStream stream. +type StreamConfig struct { + Name string + Subjects []string + MaxMsgs int64 + MaxAge time.Duration +} + +// EnsureStream creates or updates a JetStream stream matching cfg. +func EnsureStream(n *NATS, cfg StreamConfig) error { + _, err := n.js.AddStream(&nats.StreamConfig{ + Name: cfg.Name, + Subjects: cfg.Subjects, + Retention: nats.LimitsPolicy, + MaxMsgs: cfg.MaxMsgs, + MaxAge: cfg.MaxAge, + }) + return err +} + +// ReplayHistory fetches the last limit messages from subject, +// deserializing each as T. Returns an empty slice if nothing is available. +func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) { + sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer()) + if err != nil { + return nil, err + } + defer sub.Unsubscribe() + + var msgs []T + for { + raw, err := sub.NextMsg(200 * time.Millisecond) + if err != nil { + break + } + var msg T + if json.Unmarshal(raw.Data, &msg) == nil { + msgs = append(msgs, msg) + } + } + + if limit > 0 && len(msgs) > limit { + msgs = msgs[len(msgs)-limit:] + } + return msgs, nil +}