fix: make embedded NATS opt-in so tests don't hang
All checks were successful
CI / Build and Test (push) Successful in 31s
All checks were successful
CI / Build and Test (push) Successful in 31s
Move NATS startup from New() to Start(), so tests that don't use pubsub never block on server initialization. Add a 10s timeout to WaitForServer() and skip NATS tests gracefully when unavailable.
This commit is contained in:
12
nats.go
12
nats.go
@@ -56,7 +56,19 @@ func startDefaultNATS() (dn *defaultNATS, err error) {
|
|||||||
os.RemoveAll(dataDir)
|
os.RemoveAll(dataDir)
|
||||||
return nil, fmt.Errorf("start embedded nats: %w", err)
|
return nil, fmt.Errorf("start embedded nats: %w", err)
|
||||||
}
|
}
|
||||||
|
ready := make(chan struct{})
|
||||||
|
go func() {
|
||||||
ns.WaitForServer()
|
ns.WaitForServer()
|
||||||
|
close(ready)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
ns.Close()
|
||||||
|
cancel()
|
||||||
|
os.RemoveAll(dataDir)
|
||||||
|
return nil, fmt.Errorf("embedded nats server did not start within 10s")
|
||||||
|
}
|
||||||
|
|
||||||
nc, err := ns.Client()
|
nc, err := ns.Client()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
31
nats_test.go
31
nats_test.go
@@ -10,9 +10,24 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPubSub_RoundTrip(t *testing.T) {
|
// setupNATSTest creates a *V with an embedded NATS server.
|
||||||
|
// Skips the test if NATS fails to start (e.g. port conflict in CI).
|
||||||
|
func setupNATSTest(t *testing.T) *V {
|
||||||
|
t.Helper()
|
||||||
v := New()
|
v := New()
|
||||||
defer v.Shutdown()
|
dn, err := getSharedNATS()
|
||||||
|
if err != nil {
|
||||||
|
v.Shutdown()
|
||||||
|
t.Skipf("embedded NATS unavailable: %v", err)
|
||||||
|
}
|
||||||
|
v.defaultNATS = dn
|
||||||
|
v.pubsub = &natsRef{dn: dn}
|
||||||
|
t.Cleanup(v.Shutdown)
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_RoundTrip(t *testing.T) {
|
||||||
|
v := setupNATSTest(t)
|
||||||
|
|
||||||
var received []byte
|
var received []byte
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@@ -38,8 +53,7 @@ func TestPubSub_RoundTrip(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var results []string
|
var results []string
|
||||||
@@ -84,8 +98,7 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
c := newContext("cleanup-ctx", "/", v)
|
c := newContext("cleanup-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
@@ -100,8 +113,7 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
c := newContext("unsub-ctx", "/", v)
|
c := newContext("unsub-ctx", "/", v)
|
||||||
c.View(func() h.H { return h.Div() })
|
c.View(func() h.H { return h.Div() })
|
||||||
@@ -120,8 +132,7 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
|
func TestPubSub_NoOpDuringPanicCheck(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
// Panic-check context has id=""
|
// Panic-check context has id=""
|
||||||
c := newContext("", "/", v)
|
c := newContext("", "/", v)
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
// PubSub is an interface for publish/subscribe messaging backends.
|
// PubSub is an interface for publish/subscribe messaging backends.
|
||||||
// By default, New() starts an embedded NATS server. Supply a custom
|
// By default, Start() launches an embedded NATS server if no backend
|
||||||
// implementation via Config(Options{PubSub: yourBackend}) to override.
|
// has been configured. Supply a custom implementation via
|
||||||
|
// Config(Options{PubSub: yourBackend}) to override.
|
||||||
type PubSub interface {
|
type PubSub interface {
|
||||||
Publish(subject string, data []byte) error
|
Publish(subject string, data []byte) error
|
||||||
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
|
Subscribe(subject string, handler func(data []byte)) (Subscription, error)
|
||||||
|
|||||||
@@ -10,8 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
type event struct {
|
type event struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@@ -43,8 +42,7 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
||||||
v := New()
|
v := setupNATSTest(t)
|
||||||
defer v.Shutdown()
|
|
||||||
|
|
||||||
type msg struct {
|
type msg struct {
|
||||||
Text string `json:"text"`
|
Text string `json:"text"`
|
||||||
|
|||||||
18
via.go
18
via.go
@@ -358,6 +358,16 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) {
|
|||||||
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
||||||
// signal is received, then performs a graceful shutdown.
|
// signal is received, then performs a graceful shutdown.
|
||||||
func (v *V) Start() {
|
func (v *V) Start() {
|
||||||
|
if v.pubsub == nil {
|
||||||
|
dn, err := getSharedNATS()
|
||||||
|
if err != nil {
|
||||||
|
v.logWarn(nil, "embedded NATS unavailable: %v", err)
|
||||||
|
} else {
|
||||||
|
v.defaultNATS = dn
|
||||||
|
v.pubsub = &natsRef{dn: dn}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
handler := http.Handler(v.mux)
|
handler := http.Handler(v.mux)
|
||||||
if v.sessionManager != nil {
|
if v.sessionManager != nil {
|
||||||
handler = v.sessionManager.LoadAndSave(v.mux)
|
handler = v.sessionManager.LoadAndSave(v.mux)
|
||||||
@@ -833,14 +843,6 @@ func New() *V {
|
|||||||
v.cleanupCtx(c)
|
v.cleanupCtx(c)
|
||||||
})
|
})
|
||||||
|
|
||||||
dn, err := getSharedNATS()
|
|
||||||
if err != nil {
|
|
||||||
v.logWarn(nil, "embedded NATS unavailable: %v", err)
|
|
||||||
} else {
|
|
||||||
v.defaultNATS = dn
|
|
||||||
v.pubsub = &natsRef{dn: dn}
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user