feat: add generic pub/sub helpers and pubsub-crud example

Add typed Publish[T] and Subscribe[T] generic helpers that handle
JSON marshaling, along with vianats.EnsureStream and ReplayHistory
helpers. Refactor nats-chatroom to use the new APIs.

Add pubsub-crud example demonstrating CRUD operations with DaisyUI
toast notifications broadcast to all connected clients via NATS.
This commit is contained in:
Ryan Hamamura
2026-02-06 09:47:39 -10:00
parent 53e5733100
commit 2c44671d0e
5 changed files with 436 additions and 33 deletions

View File

@@ -2,13 +2,11 @@ package main
import (
"context"
"encoding/json"
"log"
"math/rand"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
@@ -46,15 +44,15 @@ func main() {
}
defer ps.Close()
// Create JetStream stream for message durability
js := ps.JetStream()
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
Retention: nats.LimitsPolicy,
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v := via.New()
v.Config(via.Options{
@@ -147,30 +145,14 @@ func main() {
currentSub.Unsubscribe()
}
// Replay history from JetStream before subscribing for real-time
subject := "chat.room." + room
if hist, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer()); err == nil {
for {
msg, err := hist.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var chatMsg ChatMessage
if json.Unmarshal(msg.Data, &chatMsg) == nil {
messages = append(messages, chatMsg)
}
}
hist.Unsubscribe()
if len(messages) > 50 {
messages = messages[len(messages)-50:]
}
// Replay history from JetStream
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil {
messages = hist
}
sub, _ := c.Subscribe(subject, func(data []byte) {
var msg ChatMessage
if err := json.Unmarshal(data, &msg); err != nil {
return
}
sub, _ := via.Subscribe(c, subject, func(msg ChatMessage) {
messagesMu.Lock()
messages = append(messages, msg)
if len(messages) > 50 {
@@ -203,12 +185,11 @@ func main() {
}
statement.SetValue("")
data, _ := json.Marshal(ChatMessage{
via.Publish(c, "chat.room."+currentRoom, ChatMessage{
User: currentUser,
Message: msg,
Time: time.Now().UnixMilli(),
})
c.Publish("chat.room."+currentRoom, data)
})
c.View(func() h.H {

View File

@@ -0,0 +1,284 @@
package main
import (
"context"
"crypto/rand"
"fmt"
"html"
"log"
"sync"
"time"
"github.com/ryanhamamura/via"
"github.com/ryanhamamura/via/h"
"github.com/ryanhamamura/via/vianats"
)
var WithSignal = via.WithSignal
type Bookmark struct {
ID string
Title string
URL string
}
type CRUDEvent struct {
Action string `json:"action"`
Title string `json:"title"`
UserID string `json:"user_id"`
}
var (
bookmarks []Bookmark
bookmarksMu sync.RWMutex
)
func randomHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return fmt.Sprintf("%x", b)
}
func findBookmark(id string) (Bookmark, int) {
for i, bm := range bookmarks {
if bm.ID == id {
return bm, i
}
}
return Bookmark{}, -1
}
func main() {
ctx := context.Background()
ps, err := vianats.New(ctx, "./data/nats")
if err != nil {
log.Fatalf("Failed to start embedded NATS: %v", err)
}
defer ps.Close()
err = vianats.EnsureStream(ps, vianats.StreamConfig{
Name: "BOOKMARKS",
Subjects: []string{"bookmarks.>"},
MaxMsgs: 1000,
MaxAge: 24 * time.Hour,
})
if err != nil {
log.Fatalf("Failed to ensure stream: %v", err)
}
v := via.New()
v.Config(via.Options{
DevMode: true,
DocumentTitle: "Bookmarks",
LogLevel: via.LogLevelInfo,
ServerAddress: ":7331",
PubSub: ps,
})
v.AppendToHead(
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
h.Script(h.Src("https://cdn.tailwindcss.com")),
)
v.Page("/", func(c *via.Context) {
userID := randomHex(8)
titleSignal := c.Signal("")
urlSignal := c.Signal("")
targetIDSignal := c.Signal("")
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
if evt.UserID == userID {
return
}
safeTitle := html.EscapeString(evt.Title)
var alertClass string
switch evt.Action {
case "created":
alertClass = "alert-success"
case "updated":
alertClass = "alert-info"
case "deleted":
alertClass = "alert-error"
}
c.ExecScript(fmt.Sprintf(`(function(){
var tc = document.getElementById('toast-container');
if (!tc) return;
var d = document.createElement('div');
d.className = 'alert %s';
d.innerHTML = '<span>Bookmark "%s" %s</span>';
tc.appendChild(d);
setTimeout(function(){ d.remove(); }, 3000);
})()`, alertClass, safeTitle, evt.Action))
c.Sync()
})
save := c.Action(func() {
title := titleSignal.String()
url := urlSignal.String()
if title == "" || url == "" {
return
}
targetID := targetIDSignal.String()
action := "created"
bookmarksMu.Lock()
if targetID != "" {
if _, idx := findBookmark(targetID); idx >= 0 {
bookmarks[idx].Title = title
bookmarks[idx].URL = url
action = "updated"
}
} else {
bookmarks = append(bookmarks, Bookmark{
ID: randomHex(8),
Title: title,
URL: url,
})
}
bookmarksMu.Unlock()
titleSignal.SetValue("")
urlSignal.SetValue("")
targetIDSignal.SetValue("")
via.Publish(c, "bookmarks.events", CRUDEvent{
Action: action,
Title: title,
UserID: userID,
})
c.Sync()
})
edit := c.Action(func() {
id := targetIDSignal.String()
bookmarksMu.RLock()
bm, idx := findBookmark(id)
bookmarksMu.RUnlock()
if idx < 0 {
return
}
titleSignal.SetValue(bm.Title)
urlSignal.SetValue(bm.URL)
})
del := c.Action(func() {
id := targetIDSignal.String()
bookmarksMu.Lock()
bm, idx := findBookmark(id)
if idx >= 0 {
bookmarks = append(bookmarks[:idx], bookmarks[idx+1:]...)
}
bookmarksMu.Unlock()
if idx < 0 {
return
}
targetIDSignal.SetValue("")
via.Publish(c, "bookmarks.events", CRUDEvent{
Action: "deleted",
Title: bm.Title,
UserID: userID,
})
c.Sync()
})
cancelEdit := c.Action(func() {
titleSignal.SetValue("")
urlSignal.SetValue("")
targetIDSignal.SetValue("")
})
c.View(func() h.H {
isEditing := targetIDSignal.String() != ""
// Build table rows
bookmarksMu.RLock()
var rows []h.H
for _, bm := range bookmarks {
rows = append(rows, h.Tr(
h.Td(h.Text(bm.Title)),
h.Td(h.A(h.Href(bm.URL), h.Attr("target", "_blank"), h.Class("link link-primary"), h.Text(bm.URL))),
h.Td(
h.Div(h.Class("flex gap-1"),
h.Button(h.Class("btn btn-xs btn-ghost"), h.Text("Edit"),
edit.OnClick(WithSignal(targetIDSignal, bm.ID)),
),
h.Button(h.Class("btn btn-xs btn-ghost text-error"), h.Text("Delete"),
del.OnClick(WithSignal(targetIDSignal, bm.ID)),
),
),
),
))
}
bookmarksMu.RUnlock()
saveLabel := "Add Bookmark"
if isEditing {
saveLabel = "Update Bookmark"
}
return h.Div(h.Class("min-h-screen bg-base-200"),
// Navbar
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
h.Div(h.Class("flex-1"),
h.A(h.Class("btn btn-ghost text-xl"), h.Text("Bookmarks")),
),
h.Div(h.Class("flex-none"),
h.Div(h.Class("badge badge-outline"), h.Text(userID[:8])),
),
),
h.Div(h.Class("container mx-auto p-4 max-w-3xl flex flex-col gap-4"),
// Form card
h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text(saveLabel)),
h.Div(h.Class("flex flex-col gap-2"),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
h.Div(h.Class("card-actions justify-end"),
h.If(isEditing,
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
),
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()),
),
),
),
),
// Table card
h.Div(h.Class("card bg-base-100 shadow"),
h.Div(h.Class("card-body"),
h.H2(h.Class("card-title"), h.Text("All Bookmarks")),
h.If(len(rows) == 0,
h.P(h.Class("text-base-content/60"), h.Text("No bookmarks yet. Add one above!")),
),
h.If(len(rows) > 0,
h.Div(h.Class("overflow-x-auto"),
h.Table(h.Class("table"),
h.THead(h.Tr(
h.Th(h.Text("Title")),
h.Th(h.Text("URL")),
h.Th(h.Text("Actions")),
)),
h.TBody(rows...),
),
),
),
),
),
),
// Toast container — ignored by morph so Sync() doesn't wipe active toasts
h.Div(h.ID("toast-container"), h.Class("toast toast-end toast-top"), h.DataIgnoreMorph()),
)
})
})
log.Println("Starting pubsub-crud example on :7331")
v.Start()
}

23
pubsub_helpers.go Normal file
View File

@@ -0,0 +1,23 @@
package via
import "encoding/json"
// Publish JSON-marshals msg and publishes to subject.
func Publish[T any](c *Context, subject string, msg T) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}
return c.Publish(subject, data)
}
// Subscribe JSON-unmarshals each message as T and calls handler.
func Subscribe[T any](c *Context, subject string, handler func(T)) (Subscription, error) {
return c.Subscribe(subject, func(data []byte) {
var msg T
if err := json.Unmarshal(data, &msg); err != nil {
return
}
handler(msg)
})
}

66
pubsub_helpers_test.go Normal file
View File

@@ -0,0 +1,66 @@
package via
import (
"sync"
"testing"
"github.com/ryanhamamura/via/h"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPublishSubscribe_RoundTrip(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
type event struct {
Name string `json:"name"`
Count int `json:"count"`
}
var got event
var wg sync.WaitGroup
wg.Add(1)
c := newContext("typed-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := Subscribe(c, "events", func(e event) {
got = e
wg.Done()
})
require.NoError(t, err)
err = Publish(c, "events", event{Name: "click", Count: 42})
require.NoError(t, err)
wg.Wait()
assert.Equal(t, "click", got.Name)
assert.Equal(t, 42, got.Count)
}
func TestSubscribe_SkipsBadJSON(t *testing.T) {
ps := newMockPubSub()
v := New()
v.Config(Options{PubSub: ps})
type msg struct {
Text string `json:"text"`
}
called := false
c := newContext("bad-json-ctx", "/", v)
c.View(func() h.H { return h.Div() })
_, err := Subscribe(c, "topic", func(m msg) {
called = true
})
require.NoError(t, err)
// Publish raw invalid JSON — handler should silently skip
err = c.Publish("topic", []byte("not json"))
require.NoError(t, err)
assert.False(t, called)
}

View File

@@ -4,7 +4,9 @@ package vianats
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/delaneyj/toolbelt/embeddednats"
"github.com/nats-io/nats.go"
@@ -76,3 +78,50 @@ func (n *NATS) Conn() *nats.Conn {
func (n *NATS) JetStream() nats.JetStreamContext {
return n.js
}
// StreamConfig holds the parameters for creating or updating a JetStream stream.
type StreamConfig struct {
Name string
Subjects []string
MaxMsgs int64
MaxAge time.Duration
}
// EnsureStream creates or updates a JetStream stream matching cfg.
func EnsureStream(n *NATS, cfg StreamConfig) error {
_, err := n.js.AddStream(&nats.StreamConfig{
Name: cfg.Name,
Subjects: cfg.Subjects,
Retention: nats.LimitsPolicy,
MaxMsgs: cfg.MaxMsgs,
MaxAge: cfg.MaxAge,
})
return err
}
// ReplayHistory fetches the last limit messages from subject,
// deserializing each as T. Returns an empty slice if nothing is available.
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
var msgs []T
for {
raw, err := sub.NextMsg(200 * time.Millisecond)
if err != nil {
break
}
var msg T
if json.Unmarshal(raw.Data, &msg) == nil {
msgs = append(msgs, msg)
}
}
if limit > 0 && len(msgs) > limit {
msgs = msgs[len(msgs)-limit:]
}
return msgs, nil
}