From 5d61149fa34cdc0e3df451fd714c7eadbffb24c5 Mon Sep 17 00:00:00 2001 From: Ryan Hamamura <58859899+ryanhamamura@users.noreply.github.com> Date: Wed, 18 Feb 2026 08:45:03 -1000 Subject: [PATCH] fix: make embedded NATS opt-in so tests don't hang Move NATS startup from New() to Start(), so tests that don't use pubsub never block on server initialization. Add a 10s timeout to WaitForServer() and skip NATS tests gracefully when unavailable. --- nats.go | 14 +++++++++++++- nats_test.go | 31 +++++++++++++++++++++---------- pubsub.go | 5 +++-- pubsub_helpers_test.go | 6 ++---- via.go | 18 ++++++++++-------- 5 files changed, 49 insertions(+), 25 deletions(-) diff --git a/nats.go b/nats.go index e22b850..fc69822 100644 --- a/nats.go +++ b/nats.go @@ -56,7 +56,19 @@ func startDefaultNATS() (dn *defaultNATS, err error) { os.RemoveAll(dataDir) return nil, fmt.Errorf("start embedded nats: %w", err) } - ns.WaitForServer() + ready := make(chan struct{}) + go func() { + ns.WaitForServer() + close(ready) + }() + select { + case <-ready: + case <-time.After(10 * time.Second): + ns.Close() + cancel() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("embedded nats server did not start within 10s") + } nc, err := ns.Client() if err != nil { diff --git a/nats_test.go b/nats_test.go index bb44a2f..748c783 100644 --- a/nats_test.go +++ b/nats_test.go @@ -10,9 +10,24 @@ import ( "github.com/stretchr/testify/require" ) -func TestPubSub_RoundTrip(t *testing.T) { +// setupNATSTest creates a *V with an embedded NATS server. +// Skips the test if NATS fails to start (e.g. port conflict in CI). +func setupNATSTest(t *testing.T) *V { + t.Helper() v := New() - defer v.Shutdown() + dn, err := getSharedNATS() + if err != nil { + v.Shutdown() + t.Skipf("embedded NATS unavailable: %v", err) + } + v.defaultNATS = dn + v.pubsub = &natsRef{dn: dn} + t.Cleanup(v.Shutdown) + return v +} + +func TestPubSub_RoundTrip(t *testing.T) { + v := setupNATSTest(t) var received []byte done := make(chan struct{}) @@ -38,8 +53,7 @@ func TestPubSub_RoundTrip(t *testing.T) { } func TestPubSub_MultipleSubscribers(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) var mu sync.Mutex var results []string @@ -84,8 +98,7 @@ func TestPubSub_MultipleSubscribers(t *testing.T) { } func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) c := newContext("cleanup-ctx", "/", v) c.View(func() h.H { return h.Div() }) @@ -100,8 +113,7 @@ func TestPubSub_SubscriptionCleanupOnDispose(t *testing.T) { } func TestPubSub_ManualUnsubscribe(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) c := newContext("unsub-ctx", "/", v) c.View(func() h.H { return h.Div() }) @@ -120,8 +132,7 @@ func TestPubSub_ManualUnsubscribe(t *testing.T) { } func TestPubSub_NoOpDuringPanicCheck(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) // Panic-check context has id="" c := newContext("", "/", v) diff --git a/pubsub.go b/pubsub.go index 594a3dc..977a60f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1,8 +1,9 @@ package via // PubSub is an interface for publish/subscribe messaging backends. -// By default, New() starts an embedded NATS server. Supply a custom -// implementation via Config(Options{PubSub: yourBackend}) to override. +// By default, Start() launches an embedded NATS server if no backend +// has been configured. Supply a custom implementation via +// Config(Options{PubSub: yourBackend}) to override. type PubSub interface { Publish(subject string, data []byte) error Subscribe(subject string, handler func(data []byte)) (Subscription, error) diff --git a/pubsub_helpers_test.go b/pubsub_helpers_test.go index 497273d..a5b5c8e 100644 --- a/pubsub_helpers_test.go +++ b/pubsub_helpers_test.go @@ -10,8 +10,7 @@ import ( ) func TestPublishSubscribe_RoundTrip(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) type event struct { Name string `json:"name"` @@ -43,8 +42,7 @@ func TestPublishSubscribe_RoundTrip(t *testing.T) { } func TestSubscribe_SkipsBadJSON(t *testing.T) { - v := New() - defer v.Shutdown() + v := setupNATSTest(t) type msg struct { Text string `json:"text"` diff --git a/via.go b/via.go index 26b845b..b2e1568 100644 --- a/via.go +++ b/via.go @@ -358,6 +358,16 @@ func (v *V) reapOrphanedContexts(suspendAfter, ttl time.Duration) { // Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM // signal is received, then performs a graceful shutdown. func (v *V) Start() { + if v.pubsub == nil { + dn, err := getSharedNATS() + if err != nil { + v.logWarn(nil, "embedded NATS unavailable: %v", err) + } else { + v.defaultNATS = dn + v.pubsub = &natsRef{dn: dn} + } + } + handler := http.Handler(v.mux) if v.sessionManager != nil { handler = v.sessionManager.LoadAndSave(v.mux) @@ -833,14 +843,6 @@ func New() *V { v.cleanupCtx(c) }) - dn, err := getSharedNATS() - if err != nil { - v.logWarn(nil, "embedded NATS unavailable: %v", err) - } else { - v.defaultNATS = dn - v.pubsub = &natsRef{dn: dn} - } - return v }