Compare commits
2 Commits
5362614c3e
...
v0.13.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2310e45d35 | ||
|
|
10b4838f8d |
23
context.go
23
context.go
@@ -32,6 +32,7 @@ type Context struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
ctxDisposedChan chan struct{}
|
ctxDisposedChan chan struct{}
|
||||||
reqCtx context.Context
|
reqCtx context.Context
|
||||||
|
fields []*Field
|
||||||
subscriptions []Subscription
|
subscriptions []Subscription
|
||||||
subsMu sync.Mutex
|
subsMu sync.Mutex
|
||||||
disposeOnce sync.Once
|
disposeOnce sync.Once
|
||||||
@@ -482,16 +483,28 @@ func (c *Context) unsubscribeAll() {
|
|||||||
|
|
||||||
// Field creates a signal with validation rules attached.
|
// Field creates a signal with validation rules attached.
|
||||||
// The initial value seeds both the signal and the reset target.
|
// The initial value seeds both the signal and the reset target.
|
||||||
|
// The field is tracked on the context so ValidateAll/ResetFields
|
||||||
|
// can operate on all fields by default.
|
||||||
func (c *Context) Field(initial any, rules ...Rule) *Field {
|
func (c *Context) Field(initial any, rules ...Rule) *Field {
|
||||||
return &Field{
|
f := &Field{
|
||||||
signal: c.Signal(initial),
|
signal: c.Signal(initial),
|
||||||
rules: rules,
|
rules: rules,
|
||||||
initialVal: initial,
|
initialVal: initial,
|
||||||
}
|
}
|
||||||
|
target := c
|
||||||
|
if c.isComponent() {
|
||||||
|
target = c.parentPageCtx
|
||||||
|
}
|
||||||
|
target.fields = append(target.fields, f)
|
||||||
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateAll runs Validate on every field, returning true only if all pass.
|
// ValidateAll runs Validate on each field, returning true only if all pass.
|
||||||
|
// With no arguments it validates every field tracked on this context.
|
||||||
func (c *Context) ValidateAll(fields ...*Field) bool {
|
func (c *Context) ValidateAll(fields ...*Field) bool {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
fields = c.fields
|
||||||
|
}
|
||||||
ok := true
|
ok := true
|
||||||
for _, f := range fields {
|
for _, f := range fields {
|
||||||
if !f.Validate() {
|
if !f.Validate() {
|
||||||
@@ -501,8 +514,12 @@ func (c *Context) ValidateAll(fields ...*Field) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetFields resets every field to its initial value and clears errors.
|
// ResetFields resets each field to its initial value and clears errors.
|
||||||
|
// With no arguments it resets every field tracked on this context.
|
||||||
func (c *Context) ResetFields(fields ...*Field) {
|
func (c *Context) ResetFields(fields ...*Field) {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
fields = c.fields
|
||||||
|
}
|
||||||
for _, f := range fields {
|
for _, f := range fields {
|
||||||
f.Reset()
|
f.Reset()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,27 +96,22 @@ func TestFieldReset(t *testing.T) {
|
|||||||
|
|
||||||
func TestValidateAll(t *testing.T) {
|
func TestValidateAll(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
var username, email *Field
|
|
||||||
v.Page("/", func(c *Context) {
|
v.Page("/", func(c *Context) {
|
||||||
username = c.Field("", Required(), MinLen(3))
|
c.Field("", Required(), MinLen(3))
|
||||||
email = c.Field("", Required(), Email())
|
c.Field("", Required(), Email())
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
})
|
|
||||||
|
|
||||||
// both empty → both fail
|
// both empty → both fail
|
||||||
assert.False(t, false) // sanity
|
assert.False(t, c.ValidateAll())
|
||||||
ok := username.Validate() && email.Validate()
|
})
|
||||||
assert.False(t, ok)
|
|
||||||
|
|
||||||
// simulate ValidateAll via context
|
|
||||||
v2 := New()
|
v2 := New()
|
||||||
var u2, e2 *Field
|
|
||||||
v2.Page("/", func(c *Context) {
|
v2.Page("/", func(c *Context) {
|
||||||
u2 = c.Field("joe", Required(), MinLen(3))
|
c.Field("joe", Required(), MinLen(3))
|
||||||
e2 = c.Field("joe@x.com", Required(), Email())
|
c.Field("joe@x.com", Required(), Email())
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
assert.True(t, c.ValidateAll(u2, e2))
|
assert.True(t, c.ValidateAll())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,14 +122,30 @@ func TestValidateAllPartialFailure(t *testing.T) {
|
|||||||
bad := c.Field("", Required())
|
bad := c.Field("", Required())
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
// ValidateAll must run ALL fields even if first passes
|
ok := c.ValidateAll()
|
||||||
ok := c.ValidateAll(good, bad)
|
|
||||||
assert.False(t, ok)
|
assert.False(t, ok)
|
||||||
assert.False(t, good.HasError())
|
assert.False(t, good.HasError())
|
||||||
assert.True(t, bad.HasError())
|
assert.True(t, bad.HasError())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestValidateAllSelectiveArgs(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
a := c.Field("", Required())
|
||||||
|
b := c.Field("ok", Required())
|
||||||
|
cField := c.Field("", Required())
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
|
// only validate a and b — cField should be untouched
|
||||||
|
ok := c.ValidateAll(a, b)
|
||||||
|
assert.False(t, ok)
|
||||||
|
assert.True(t, a.HasError())
|
||||||
|
assert.False(t, b.HasError())
|
||||||
|
assert.False(t, cField.HasError(), "unselected field should not be validated")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestResetFields(t *testing.T) {
|
func TestResetFields(t *testing.T) {
|
||||||
v := New()
|
v := New()
|
||||||
v.Page("/", func(c *Context) {
|
v.Page("/", func(c *Context) {
|
||||||
@@ -146,13 +157,30 @@ func TestResetFields(t *testing.T) {
|
|||||||
b.SetValue("changed-b")
|
b.SetValue("changed-b")
|
||||||
a.AddError("err")
|
a.AddError("err")
|
||||||
|
|
||||||
c.ResetFields(a, b)
|
c.ResetFields()
|
||||||
assert.Equal(t, "a", a.String())
|
assert.Equal(t, "a", a.String())
|
||||||
assert.Equal(t, "b", b.String())
|
assert.Equal(t, "b", b.String())
|
||||||
assert.False(t, a.HasError())
|
assert.False(t, a.HasError())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestResetFieldsSelectiveArgs(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
v.Page("/", func(c *Context) {
|
||||||
|
a := c.Field("a")
|
||||||
|
b := c.Field("b")
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
|
a.SetValue("changed-a")
|
||||||
|
b.SetValue("changed-b")
|
||||||
|
|
||||||
|
// only reset a
|
||||||
|
c.ResetFields(a)
|
||||||
|
assert.Equal(t, "a", a.String())
|
||||||
|
assert.Equal(t, "changed-b", b.String(), "unselected field should not be reset")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestFieldValidateClearsPreviousErrors(t *testing.T) {
|
func TestFieldValidateClearsPreviousErrors(t *testing.T) {
|
||||||
f := newTestField("", Required())
|
f := newTestField("", Required())
|
||||||
f.Validate()
|
f.Validate()
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -38,6 +38,5 @@ require (
|
|||||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
golang.org/x/crypto v0.45.0 // indirect
|
golang.org/x/crypto v0.45.0 // indirect
|
||||||
golang.org/x/sys v0.38.0 // indirect
|
golang.org/x/sys v0.38.0 // indirect
|
||||||
golang.org/x/time v0.14.0
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -9,7 +8,6 @@ import (
|
|||||||
|
|
||||||
"github.com/ryanhamamura/via"
|
"github.com/ryanhamamura/via"
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
"github.com/ryanhamamura/via/vianats"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -36,15 +34,15 @@ func (u *UserInfo) Avatar() h.H {
|
|||||||
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
|
var roomNames = []string{"Go", "Rust", "Python", "JavaScript", "Clojure"}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ctx := context.Background()
|
v := via.New()
|
||||||
|
v.Config(via.Options{
|
||||||
|
DevMode: true,
|
||||||
|
DocumentTitle: "NATS Chat",
|
||||||
|
LogLevel: via.LogLevelInfo,
|
||||||
|
ServerAddress: ":7331",
|
||||||
|
})
|
||||||
|
|
||||||
ps, err := vianats.New(ctx, "./data/nats")
|
err := via.EnsureStream(v, via.StreamConfig{
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to start embedded NATS: %v", err)
|
|
||||||
}
|
|
||||||
defer ps.Close()
|
|
||||||
|
|
||||||
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
|
||||||
Name: "CHAT",
|
Name: "CHAT",
|
||||||
Subjects: []string{"chat.>"},
|
Subjects: []string{"chat.>"},
|
||||||
MaxMsgs: 1000,
|
MaxMsgs: 1000,
|
||||||
@@ -54,15 +52,6 @@ func main() {
|
|||||||
log.Fatalf("Failed to ensure stream: %v", err)
|
log.Fatalf("Failed to ensure stream: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := via.New()
|
|
||||||
v.Config(via.Options{
|
|
||||||
DevMode: true,
|
|
||||||
DocumentTitle: "NATS Chat",
|
|
||||||
LogLevel: via.LogLevelInfo,
|
|
||||||
ServerAddress: ":7331",
|
|
||||||
PubSub: ps,
|
|
||||||
})
|
|
||||||
|
|
||||||
v.AppendToHead(
|
v.AppendToHead(
|
||||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css")),
|
||||||
h.StyleEl(h.Raw(`
|
h.StyleEl(h.Raw(`
|
||||||
@@ -148,7 +137,7 @@ func main() {
|
|||||||
subject := "chat.room." + room
|
subject := "chat.room." + room
|
||||||
|
|
||||||
// Replay history from JetStream
|
// Replay history from JetStream
|
||||||
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil {
|
if hist, err := via.ReplayHistory[ChatMessage](v, subject, 50); err == nil {
|
||||||
messages = hist
|
messages = hist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
"html"
|
"html"
|
||||||
@@ -11,7 +10,6 @@ import (
|
|||||||
|
|
||||||
"github.com/ryanhamamura/via"
|
"github.com/ryanhamamura/via"
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
"github.com/ryanhamamura/via/vianats"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var WithSignal = via.WithSignal
|
var WithSignal = via.WithSignal
|
||||||
@@ -49,15 +47,15 @@ func findBookmark(id string) (Bookmark, int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ctx := context.Background()
|
v := via.New()
|
||||||
|
v.Config(via.Options{
|
||||||
|
DevMode: true,
|
||||||
|
DocumentTitle: "Bookmarks",
|
||||||
|
LogLevel: via.LogLevelInfo,
|
||||||
|
ServerAddress: ":7331",
|
||||||
|
})
|
||||||
|
|
||||||
ps, err := vianats.New(ctx, "./data/nats")
|
err := via.EnsureStream(v, via.StreamConfig{
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to start embedded NATS: %v", err)
|
|
||||||
}
|
|
||||||
defer ps.Close()
|
|
||||||
|
|
||||||
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
|
||||||
Name: "BOOKMARKS",
|
Name: "BOOKMARKS",
|
||||||
Subjects: []string{"bookmarks.>"},
|
Subjects: []string{"bookmarks.>"},
|
||||||
MaxMsgs: 1000,
|
MaxMsgs: 1000,
|
||||||
@@ -67,15 +65,6 @@ func main() {
|
|||||||
log.Fatalf("Failed to ensure stream: %v", err)
|
log.Fatalf("Failed to ensure stream: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := via.New()
|
|
||||||
v.Config(via.Options{
|
|
||||||
DevMode: true,
|
|
||||||
DocumentTitle: "Bookmarks",
|
|
||||||
LogLevel: via.LogLevelInfo,
|
|
||||||
ServerAddress: ":7331",
|
|
||||||
PubSub: ps,
|
|
||||||
})
|
|
||||||
|
|
||||||
v.AppendToHead(
|
v.AppendToHead(
|
||||||
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
||||||
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
||||||
|
|||||||
@@ -26,13 +26,13 @@ func main() {
|
|||||||
email := c.Field("", via.Required(), via.Email())
|
email := c.Field("", via.Required(), via.Email())
|
||||||
age := c.Field("", via.Required(), via.Min(13), via.Max(120))
|
age := c.Field("", via.Required(), via.Min(13), via.Max(120))
|
||||||
// Optional field — only validated when non-empty
|
// Optional field — only validated when non-empty
|
||||||
website := c.Field("", via.Custom(func(val string) error { return nil }), via.Pattern(`^$|^https?://\S+$`, "Must be a valid URL"))
|
website := c.Field("", via.Pattern(`^$|^https?://\S+$`, "Must be a valid URL"))
|
||||||
|
|
||||||
var success string
|
var success string
|
||||||
|
|
||||||
signup := c.Action(func() {
|
signup := c.Action(func() {
|
||||||
success = ""
|
success = ""
|
||||||
if !c.ValidateAll(username, email, age, website) {
|
if !c.ValidateAll() {
|
||||||
c.Sync()
|
c.Sync()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -43,13 +43,13 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
success = "Account created for " + username.String() + "!"
|
success = "Account created for " + username.String() + "!"
|
||||||
c.ResetFields(username, email, age, website)
|
c.ResetFields()
|
||||||
c.Sync()
|
c.Sync()
|
||||||
})
|
})
|
||||||
|
|
||||||
reset := c.Action(func() {
|
reset := c.Action(func() {
|
||||||
success = ""
|
success = ""
|
||||||
c.ResetFields(username, email, age, website)
|
c.ResetFields()
|
||||||
c.Sync()
|
c.Sync()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
190
nats.go
Normal file
190
nats.go
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
package via
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/delaneyj/toolbelt/embeddednats"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// defaultNATS is the process-scoped embedded NATS server.
|
||||||
|
type defaultNATS struct {
|
||||||
|
server *embeddednats.Server
|
||||||
|
nc *nats.Conn
|
||||||
|
js nats.JetStreamContext
|
||||||
|
cancel context.CancelFunc
|
||||||
|
dataDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
sharedNATS *defaultNATS
|
||||||
|
sharedNATSOnce sync.Once
|
||||||
|
sharedNATSErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
// getSharedNATS returns a process-level singleton embedded NATS server.
|
||||||
|
// The server starts once and is reused across all V instances.
|
||||||
|
func getSharedNATS() (*defaultNATS, error) {
|
||||||
|
sharedNATSOnce.Do(func() {
|
||||||
|
sharedNATS, sharedNATSErr = startDefaultNATS()
|
||||||
|
})
|
||||||
|
return sharedNATS, sharedNATSErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func startDefaultNATS() (dn *defaultNATS, err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
err = fmt.Errorf("nats server panic: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
dataDir, err := os.MkdirTemp("", "via-nats-*")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create temp dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
os.RemoveAll(dataDir)
|
||||||
|
return nil, fmt.Errorf("start embedded nats: %w", err)
|
||||||
|
}
|
||||||
|
ns.WaitForServer()
|
||||||
|
|
||||||
|
nc, err := ns.Client()
|
||||||
|
if err != nil {
|
||||||
|
ns.Close()
|
||||||
|
cancel()
|
||||||
|
os.RemoveAll(dataDir)
|
||||||
|
return nil, fmt.Errorf("connect nats client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := nc.JetStream()
|
||||||
|
if err != nil {
|
||||||
|
nc.Close()
|
||||||
|
ns.Close()
|
||||||
|
cancel()
|
||||||
|
os.RemoveAll(dataDir)
|
||||||
|
return nil, fmt.Errorf("init jetstream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &defaultNATS{
|
||||||
|
server: ns,
|
||||||
|
nc: nc,
|
||||||
|
js: js,
|
||||||
|
cancel: cancel,
|
||||||
|
dataDir: dataDir,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *defaultNATS) Publish(subject string, data []byte) error {
|
||||||
|
return n.nc.Publish(subject, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *defaultNATS) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
||||||
|
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||||
|
handler(msg.Data)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// natsRef wraps a shared defaultNATS as a PubSub. Close is a no-op because
|
||||||
|
// the underlying server is process-scoped and outlives individual V instances.
|
||||||
|
type natsRef struct {
|
||||||
|
dn *defaultNATS
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *natsRef) Publish(subject string, data []byte) error {
|
||||||
|
return r.dn.Publish(subject, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *natsRef) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
||||||
|
return r.dn.Subscribe(subject, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *natsRef) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NATSConn returns the underlying NATS connection from the built-in embedded
|
||||||
|
// server, or nil if a custom PubSub backend is in use.
|
||||||
|
func (v *V) NATSConn() *nats.Conn {
|
||||||
|
if v.defaultNATS != nil {
|
||||||
|
return v.defaultNATS.nc
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// JetStream returns the JetStream context from the built-in embedded server,
|
||||||
|
// or nil if a custom PubSub backend is in use.
|
||||||
|
func (v *V) JetStream() nats.JetStreamContext {
|
||||||
|
if v.defaultNATS != nil {
|
||||||
|
return v.defaultNATS.js
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
||||||
|
type StreamConfig struct {
|
||||||
|
Name string
|
||||||
|
Subjects []string
|
||||||
|
MaxMsgs int64
|
||||||
|
MaxAge time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureStream creates or updates a JetStream stream matching cfg.
|
||||||
|
func EnsureStream(v *V, cfg StreamConfig) error {
|
||||||
|
js := v.JetStream()
|
||||||
|
if js == nil {
|
||||||
|
return fmt.Errorf("jetstream not available")
|
||||||
|
}
|
||||||
|
_, err := js.AddStream(&nats.StreamConfig{
|
||||||
|
Name: cfg.Name,
|
||||||
|
Subjects: cfg.Subjects,
|
||||||
|
Retention: nats.LimitsPolicy,
|
||||||
|
MaxMsgs: cfg.MaxMsgs,
|
||||||
|
MaxAge: cfg.MaxAge,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplayHistory fetches the last limit messages from subject,
|
||||||
|
// deserializing each as T. Returns an empty slice if nothing is available.
|
||||||
|
func ReplayHistory[T any](v *V, subject string, limit int) ([]T, error) {
|
||||||
|
js := v.JetStream()
|
||||||
|
if js == nil {
|
||||||
|
return nil, fmt.Errorf("jetstream not available")
|
||||||
|
}
|
||||||
|
sub, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
var msgs []T
|
||||||
|
for {
|
||||||
|
raw, err := sub.NextMsg(200 * time.Millisecond)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var msg T
|
||||||
|
if json.Unmarshal(raw.Data, &msg) == nil {
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit > 0 && len(msgs) > limit {
|
||||||
|
msgs = msgs[len(msgs)-limit:]
|
||||||
|
}
|
||||||
|
return msgs, nil
|
||||||
|
}
|
||||||
106
nats_test.go
106
nats_test.go
@@ -2,7 +2,6 @@ package via
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -11,88 +10,36 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockHandler struct {
|
|
||||||
id int64
|
|
||||||
fn func([]byte)
|
|
||||||
active atomic.Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// mockPubSub implements PubSub for testing without NATS.
|
|
||||||
type mockPubSub struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
subs map[string][]*mockHandler
|
|
||||||
nextID atomic.Int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMockPubSub() *mockPubSub {
|
|
||||||
return &mockPubSub{subs: make(map[string][]*mockHandler)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockPubSub) Publish(subject string, data []byte) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
handlers := make([]*mockHandler, len(m.subs[subject]))
|
|
||||||
copy(handlers, m.subs[subject])
|
|
||||||
m.mu.Unlock()
|
|
||||||
for _, h := range handlers {
|
|
||||||
if h.active.Load() {
|
|
||||||
h.fn(data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockPubSub) Subscribe(subject string, handler func(data []byte)) (Subscription, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
mh := &mockHandler{
|
|
||||||
id: m.nextID.Add(1),
|
|
||||||
fn: handler,
|
|
||||||
}
|
|
||||||
mh.active.Store(true)
|
|
||||||
m.subs[subject] = append(m.subs[subject], mh)
|
|
||||||
return &mockSub{handler: mh}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockPubSub) Close() error { return nil }
|
|
||||||
|
|
||||||
type mockSub struct {
|
|
||||||
handler *mockHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *mockSub) Unsubscribe() error {
|
|
||||||
s.handler.active.Store(false)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPubSub_RoundTrip(t *testing.T) {
|
func TestPubSub_RoundTrip(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
var received []byte
|
var received []byte
|
||||||
var wg sync.WaitGroup
|
done := make(chan struct{})
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
c := newContext("test-ctx", "/", v)
|
c := newContext("test-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
_, err := c.Subscribe("test.topic", func(data []byte) {
|
_, err := c.Subscribe("test.topic", func(data []byte) {
|
||||||
received = data
|
received = data
|
||||||
wg.Done()
|
close(done)
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = c.Publish("test.topic", []byte("hello"))
|
err = c.Publish("test.topic", []byte("hello"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
wg.Wait()
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for message")
|
||||||
|
}
|
||||||
assert.Equal(t, []byte("hello"), received)
|
assert.Equal(t, []byte("hello"), received)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var results []string
|
var results []string
|
||||||
@@ -119,7 +66,17 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
c1.Publish("broadcast", []byte("msg"))
|
c1.Publish("broadcast", []byte("msg"))
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for messages")
|
||||||
|
}
|
||||||
|
|
||||||
assert.Len(t, results, 2)
|
assert.Len(t, results, 2)
|
||||||
assert.Contains(t, results, "c1:msg")
|
assert.Contains(t, results, "c1:msg")
|
||||||
@@ -127,9 +84,8 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
c := newContext("cleanup-ctx", "/", v)
|
c := newContext("cleanup-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
@@ -144,9 +100,8 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
c := newContext("unsub-ctx", "/", v)
|
c := newContext("unsub-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
@@ -160,28 +115,13 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
|||||||
sub.Unsubscribe()
|
sub.Unsubscribe()
|
||||||
|
|
||||||
c.Publish("topic", []byte("ignored"))
|
c.Publish("topic", []byte("ignored"))
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
assert.False(t, called)
|
assert.False(t, called)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_NoOpWhenNotConfigured(t *testing.T) {
|
|
||||||
v := New()
|
|
||||||
|
|
||||||
c := newContext("noop-ctx", "/", v)
|
|
||||||
c.View(func() h.H { return h.Div() })
|
|
||||||
|
|
||||||
err := c.Publish("topic", []byte("data"))
|
|
||||||
assert.Error(t, err)
|
|
||||||
|
|
||||||
sub, err := c.Subscribe("topic", func(data []byte) {})
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Nil(t, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
|
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
// Panic-check context has id=""
|
// Panic-check context has id=""
|
||||||
c := newContext("", "/", v)
|
c := newContext("", "/", v)
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
// PubSub is an interface for publish/subscribe messaging backends.
|
// PubSub is an interface for publish/subscribe messaging backends.
|
||||||
// The vianats sub-package provides an embedded NATS implementation.
|
// By default, New() starts an embedded NATS server. Supply a custom
|
||||||
|
// implementation via Config(Options{PubSub: yourBackend}) to override.
|
||||||
type PubSub interface {
|
type PubSub interface {
|
||||||
Publish(subject string, data []byte) error
|
Publish(subject string, data []byte) error
|
||||||
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
|
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -10,9 +10,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
type event struct {
|
type event struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -20,30 +19,32 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var got event
|
var got event
|
||||||
var wg sync.WaitGroup
|
done := make(chan struct{})
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
c := newContext("typed-ctx", "/", v)
|
c := newContext("typed-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
_, err := Subscribe(c, "events", func(e event) {
|
_, err := Subscribe(c, "events", func(e event) {
|
||||||
got = e
|
got = e
|
||||||
wg.Done()
|
close(done)
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = Publish(c, "events", event{Name: "click", Count: 42})
|
err = Publish(c, "events", event{Name: "click", Count: 42})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
wg.Wait()
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("timed out waiting for message")
|
||||||
|
}
|
||||||
assert.Equal(t, "click", got.Name)
|
assert.Equal(t, "click", got.Name)
|
||||||
assert.Equal(t, 42, got.Count)
|
assert.Equal(t, 42, got.Count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
||||||
ps := newMockPubSub()
|
|
||||||
v := New()
|
v := New()
|
||||||
v.Config(Options{PubSub: ps})
|
defer v.Shutdown()
|
||||||
|
|
||||||
type msg struct {
|
type msg struct {
|
||||||
Text string `json:"text"`
|
Text string `json:"text"`
|
||||||
@@ -62,5 +63,6 @@ func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
|||||||
err = c.Publish("topic", []byte("not json"))
|
err = c.Publish("topic", []byte("not json"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
assert.False(t, called)
|
assert.False(t, called)
|
||||||
}
|
}
|
||||||
|
|||||||
24
rule.go
24
rule.go
@@ -1,10 +1,12 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"unicode/utf8"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Rule defines a single validation check for a Field.
|
// Rule defines a single validation check for a Field.
|
||||||
@@ -20,7 +22,7 @@ func Required(msg ...string) Rule {
|
|||||||
}
|
}
|
||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
if strings.TrimSpace(val) == "" {
|
if strings.TrimSpace(val) == "" {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -33,8 +35,8 @@ func MinLen(n int, msg ...string) Rule {
|
|||||||
m = msg[0]
|
m = msg[0]
|
||||||
}
|
}
|
||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
if len(val) < n {
|
if utf8.RuneCountInString(val) < n {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -47,8 +49,8 @@ func MaxLen(n int, msg ...string) Rule {
|
|||||||
m = msg[0]
|
m = msg[0]
|
||||||
}
|
}
|
||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
if len(val) > n {
|
if utf8.RuneCountInString(val) > n {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -63,10 +65,10 @@ func Min(n int, msg ...string) Rule {
|
|||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
v, err := strconv.Atoi(val)
|
v, err := strconv.Atoi(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New("Must be a valid number")
|
||||||
}
|
}
|
||||||
if v < n {
|
if v < n {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -81,10 +83,10 @@ func Max(n int, msg ...string) Rule {
|
|||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
v, err := strconv.Atoi(val)
|
v, err := strconv.Atoi(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New("Must be a valid number")
|
||||||
}
|
}
|
||||||
if v > n {
|
if v > n {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -99,7 +101,7 @@ func Pattern(re string, msg ...string) Rule {
|
|||||||
compiled := regexp.MustCompile(re)
|
compiled := regexp.MustCompile(re)
|
||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
if !compiled.MatchString(val) {
|
if !compiled.MatchString(val) {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
@@ -115,7 +117,7 @@ func Email(msg ...string) Rule {
|
|||||||
}
|
}
|
||||||
return Rule{func(val string) error {
|
return Rule{func(val string) error {
|
||||||
if !emailRegexp.MatchString(val) {
|
if !emailRegexp.MatchString(val) {
|
||||||
return fmt.Errorf("%s", m)
|
return errors.New(m)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}}
|
}}
|
||||||
|
|||||||
12
via.go
12
via.go
@@ -49,6 +49,7 @@ type V struct {
|
|||||||
devModePageInitFnMap map[string]func(*Context)
|
devModePageInitFnMap map[string]func(*Context)
|
||||||
sessionManager *scs.SessionManager
|
sessionManager *scs.SessionManager
|
||||||
pubsub PubSub
|
pubsub PubSub
|
||||||
|
defaultNATS *defaultNATS
|
||||||
actionRateLimit RateLimitConfig
|
actionRateLimit RateLimitConfig
|
||||||
datastarPath string
|
datastarPath string
|
||||||
datastarContent []byte
|
datastarContent []byte
|
||||||
@@ -130,6 +131,7 @@ func (v *V) Config(cfg Options) {
|
|||||||
v.datastarPath = cfg.DatastarPath
|
v.datastarPath = cfg.DatastarPath
|
||||||
}
|
}
|
||||||
if cfg.PubSub != nil {
|
if cfg.PubSub != nil {
|
||||||
|
v.defaultNATS = nil
|
||||||
v.pubsub = cfg.PubSub
|
v.pubsub = cfg.PubSub
|
||||||
}
|
}
|
||||||
if cfg.ContextTTL != 0 {
|
if cfg.ContextTTL != 0 {
|
||||||
@@ -379,6 +381,7 @@ func (v *V) Shutdown() {
|
|||||||
v.logErr(nil, "pubsub close error: %v", err)
|
v.logErr(nil, "pubsub close error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
v.defaultNATS = nil
|
||||||
|
|
||||||
v.logInfo(nil, "shutdown complete")
|
v.logInfo(nil, "shutdown complete")
|
||||||
}
|
}
|
||||||
@@ -725,6 +728,15 @@ func New() *V {
|
|||||||
v.logDebug(c, "session close event triggered")
|
v.logDebug(c, "session close event triggered")
|
||||||
v.cleanupCtx(c)
|
v.cleanupCtx(c)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
dn, err := getSharedNATS()
|
||||||
|
if err != nil {
|
||||||
|
v.logWarn(nil, "embedded NATS unavailable: %v", err)
|
||||||
|
} else {
|
||||||
|
v.defaultNATS = dn
|
||||||
|
v.pubsub = &natsRef{dn: dn}
|
||||||
|
}
|
||||||
|
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,127 +0,0 @@
|
|||||||
// Package vianats provides an embedded NATS server with JetStream as a
|
|
||||||
// pub/sub backend for Via applications.
|
|
||||||
package vianats
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/delaneyj/toolbelt/embeddednats"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
"github.com/ryanhamamura/via"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NATS implements via.PubSub using an embedded NATS server with JetStream.
|
|
||||||
type NATS struct {
|
|
||||||
server *embeddednats.Server
|
|
||||||
nc *nats.Conn
|
|
||||||
js nats.JetStreamContext
|
|
||||||
}
|
|
||||||
|
|
||||||
// New starts an embedded NATS server with JetStream enabled and returns a
|
|
||||||
// ready-to-use NATS instance. The server stores data in dataDir and shuts
|
|
||||||
// down when ctx is cancelled.
|
|
||||||
func New(ctx context.Context, dataDir string) (*NATS, error) {
|
|
||||||
ns, err := embeddednats.New(ctx, embeddednats.WithDirectory(dataDir))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("vianats: start server: %w", err)
|
|
||||||
}
|
|
||||||
ns.WaitForServer()
|
|
||||||
|
|
||||||
nc, err := ns.Client()
|
|
||||||
if err != nil {
|
|
||||||
ns.Close()
|
|
||||||
return nil, fmt.Errorf("vianats: connect client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
js, err := nc.JetStream()
|
|
||||||
if err != nil {
|
|
||||||
nc.Close()
|
|
||||||
ns.Close()
|
|
||||||
return nil, fmt.Errorf("vianats: init jetstream: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &NATS{server: ns, nc: nc, js: js}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish sends data to the given subject using core NATS publish.
|
|
||||||
// JetStream captures messages automatically if a matching stream exists.
|
|
||||||
func (n *NATS) Publish(subject string, data []byte) error {
|
|
||||||
return n.nc.Publish(subject, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe creates a core NATS subscription for real-time fan-out delivery.
|
|
||||||
func (n *NATS) Subscribe(subject string, handler func(data []byte)) (via.Subscription, error) {
|
|
||||||
sub, err := n.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
||||||
handler(msg.Data)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return sub, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close shuts down the client connection and embedded server.
|
|
||||||
func (n *NATS) Close() error {
|
|
||||||
n.nc.Close()
|
|
||||||
return n.server.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Conn returns the underlying NATS connection for advanced usage.
|
|
||||||
func (n *NATS) Conn() *nats.Conn {
|
|
||||||
return n.nc
|
|
||||||
}
|
|
||||||
|
|
||||||
// JetStream returns the JetStream context for stream configuration and replay.
|
|
||||||
func (n *NATS) JetStream() nats.JetStreamContext {
|
|
||||||
return n.js
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
|
||||||
type StreamConfig struct {
|
|
||||||
Name string
|
|
||||||
Subjects []string
|
|
||||||
MaxMsgs int64
|
|
||||||
MaxAge time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnsureStream creates or updates a JetStream stream matching cfg.
|
|
||||||
func EnsureStream(n *NATS, cfg StreamConfig) error {
|
|
||||||
_, err := n.js.AddStream(&nats.StreamConfig{
|
|
||||||
Name: cfg.Name,
|
|
||||||
Subjects: cfg.Subjects,
|
|
||||||
Retention: nats.LimitsPolicy,
|
|
||||||
MaxMsgs: cfg.MaxMsgs,
|
|
||||||
MaxAge: cfg.MaxAge,
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplayHistory fetches the last limit messages from subject,
|
|
||||||
// deserializing each as T. Returns an empty slice if nothing is available.
|
|
||||||
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
|
|
||||||
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
var msgs []T
|
|
||||||
for {
|
|
||||||
raw, err := sub.NextMsg(200 * time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
var msg T
|
|
||||||
if json.Unmarshal(raw.Data, &msg) == nil {
|
|
||||||
msgs = append(msgs, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if limit > 0 && len(msgs) > limit {
|
|
||||||
msgs = msgs[len(msgs)-limit:]
|
|
||||||
}
|
|
||||||
return msgs, nil
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user