Add typed Publish[T] and Subscribe[T] generic helpers that handle JSON marshaling, along with vianats.EnsureStream and ReplayHistory helpers. Refactor nats-chatroom to use the new APIs. Add pubsub-crud example demonstrating CRUD operations with DaisyUI toast notifications broadcast to all connected clients via NATS.
128 lines
3.2 KiB
Go
128 lines
3.2 KiB
Go
// Package vianats provides an embedded NATS server with JetStream as a
|
|
// pub/sub backend for Via applications.
|
|
package vianats
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/delaneyj/toolbelt/embeddednats"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/ryanhamamura/via"
|
|
)
|
|
|
|
// NATS implements via.PubSub using an embedded NATS server with JetStream.
|
|
type NATS struct {
|
|
server *embeddednats.Server
|
|
nc *nats.Conn
|
|
js nats.JetStreamContext
|
|
}
|
|
|
|
// New starts an embedded NATS server with JetStream enabled and returns a
|
|
// ready-to-use NATS instance. The server stores data in dataDir and shuts
|
|
// down when ctx is cancelled.
|
|
func New(ctx context.Context, dataDir string) (*NATS, error) {
|
|
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("vianats: start server: %w", err)
|
|
}
|
|
ns.WaitForServer()
|
|
|
|
nc, err := ns.Client()
|
|
if err != nil {
|
|
ns.Close()
|
|
return nil, fmt.Errorf("vianats: connect client: %w", err)
|
|
}
|
|
|
|
js, err := nc.JetStream()
|
|
if err != nil {
|
|
nc.Close()
|
|
ns.Close()
|
|
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
|
|
}
|
|
|
|
return &NATS{server: ns, nc: nc, js: js}, nil
|
|
}
|
|
|
|
// Publish sends data to the given subject using core NATS publish.
|
|
// JetStream captures messages automatically if a matching stream exists.
|
|
func (n *NATS) Publish(subject string, data []byte) error {
|
|
return n.nc.Publish(subject, data)
|
|
}
|
|
|
|
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
|
|
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
|
|
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
handler(msg.Data)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sub, nil
|
|
}
|
|
|
|
// Close shuts down the client connection and embedded server.
|
|
func (n *NATS) Close() error {
|
|
n.nc.Close()
|
|
return n.server.Close()
|
|
}
|
|
|
|
// Conn returns the underlying NATS connection for advanced usage.
|
|
func (n *NATS) Conn() *nats.Conn {
|
|
return n.nc
|
|
}
|
|
|
|
// JetStream returns the JetStream context for stream configuration and replay.
|
|
func (n *NATS) JetStream() nats.JetStreamContext {
|
|
return n.js
|
|
}
|
|
|
|
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
|
type StreamConfig struct {
|
|
Name string
|
|
Subjects []string
|
|
MaxMsgs int64
|
|
MaxAge time.Duration
|
|
}
|
|
|
|
// EnsureStream creates or updates a JetStream stream matching cfg.
|
|
func EnsureStream(n *NATS, cfg StreamConfig) error {
|
|
_, err := n.js.AddStream(&nats.StreamConfig{
|
|
Name: cfg.Name,
|
|
Subjects: cfg.Subjects,
|
|
Retention: nats.LimitsPolicy,
|
|
MaxMsgs: cfg.MaxMsgs,
|
|
MaxAge: cfg.MaxAge,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ReplayHistory fetches the last limit messages from subject,
|
|
// deserializing each as T. Returns an empty slice if nothing is available.
|
|
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
|
|
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer sub.Unsubscribe()
|
|
|
|
var msgs []T
|
|
for {
|
|
raw, err := sub.NextMsg(200 * time.Millisecond)
|
|
if err != nil {
|
|
break
|
|
}
|
|
var msg T
|
|
if json.Unmarshal(raw.Data, &msg) == nil {
|
|
msgs = append(msgs, msg)
|
|
}
|
|
}
|
|
|
|
if limit > 0 && len(msgs) > limit {
|
|
msgs = msgs[len(msgs)-limit:]
|
|
}
|
|
return msgs, nil
|
|
}
|