Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6dcd54c88b | ||
|
|
2c44671d0e |
@@ -1,6 +1,8 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/alexedwards/scs/v2"
|
"github.com/alexedwards/scs/v2"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
@@ -54,4 +56,9 @@ type Options struct {
|
|||||||
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
|
// PubSub enables publish/subscribe messaging. Use vianats.New() for an
|
||||||
// embedded NATS backend, or supply any PubSub implementation.
|
// embedded NATS backend, or supply any PubSub implementation.
|
||||||
PubSub PubSub
|
PubSub PubSub
|
||||||
|
|
||||||
|
// ContextTTL is the maximum time a context may exist without an SSE
|
||||||
|
// connection before the background reaper disposes it.
|
||||||
|
// Default: 30s. Negative value disables the reaper.
|
||||||
|
ContextTTL time.Duration
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"maps"
|
"maps"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
@@ -33,6 +34,8 @@ type Context struct {
|
|||||||
subscriptions []Subscription
|
subscriptions []Subscription
|
||||||
subsMu sync.Mutex
|
subsMu sync.Mutex
|
||||||
disposeOnce sync.Once
|
disposeOnce sync.Once
|
||||||
|
createdAt time.Time
|
||||||
|
sseConnected atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// View defines the UI rendered by this context.
|
// View defines the UI rendered by this context.
|
||||||
@@ -481,5 +484,6 @@ func newContext(id string, route string, v *V) *Context {
|
|||||||
signals: new(sync.Map),
|
signals: new(sync.Map),
|
||||||
patchChan: make(chan patch, 1),
|
patchChan: make(chan patch, 1),
|
||||||
ctxDisposedChan: make(chan struct{}, 1),
|
ctxDisposedChan: make(chan struct{}, 1),
|
||||||
|
createdAt: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,11 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
"github.com/ryanhamamura/via"
|
"github.com/ryanhamamura/via"
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
"github.com/ryanhamamura/via/vianats"
|
"github.com/ryanhamamura/via/vianats"
|
||||||
@@ -46,15 +44,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer ps.Close()
|
defer ps.Close()
|
||||||
|
|
||||||
// Create JetStream stream for message durability
|
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
||||||
js := ps.JetStream()
|
Name: "CHAT",
|
||||||
js.AddStream(&nats.StreamConfig{
|
Subjects: []string{"chat.>"},
|
||||||
Name: "CHAT",
|
MaxMsgs: 1000,
|
||||||
Subjects: []string{"chat.>"},
|
MaxAge: 24 * time.Hour,
|
||||||
Retention: nats.LimitsPolicy,
|
|
||||||
MaxMsgs: 1000,
|
|
||||||
MaxAge: 24 * time.Hour,
|
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to ensure stream: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
v := via.New()
|
v := via.New()
|
||||||
v.Config(via.Options{
|
v.Config(via.Options{
|
||||||
@@ -147,30 +145,14 @@ func main() {
|
|||||||
currentSub.Unsubscribe()
|
currentSub.Unsubscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replay history from JetStream before subscribing for real-time
|
|
||||||
subject := "chat.room." + room
|
subject := "chat.room." + room
|
||||||
if hist, err := js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer()); err == nil {
|
|
||||||
for {
|
// Replay history from JetStream
|
||||||
msg, err := hist.NextMsg(200 * time.Millisecond)
|
if hist, err := vianats.ReplayHistory[ChatMessage](ps, subject, 50); err == nil {
|
||||||
if err != nil {
|
messages = hist
|
||||||
break
|
|
||||||
}
|
|
||||||
var chatMsg ChatMessage
|
|
||||||
if json.Unmarshal(msg.Data, &chatMsg) == nil {
|
|
||||||
messages = append(messages, chatMsg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hist.Unsubscribe()
|
|
||||||
if len(messages) > 50 {
|
|
||||||
messages = messages[len(messages)-50:]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, _ := c.Subscribe(subject, func(data []byte) {
|
sub, _ := via.Subscribe(c, subject, func(msg ChatMessage) {
|
||||||
var msg ChatMessage
|
|
||||||
if err := json.Unmarshal(data, &msg); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
messagesMu.Lock()
|
messagesMu.Lock()
|
||||||
messages = append(messages, msg)
|
messages = append(messages, msg)
|
||||||
if len(messages) > 50 {
|
if len(messages) > 50 {
|
||||||
@@ -203,12 +185,11 @@ func main() {
|
|||||||
}
|
}
|
||||||
statement.SetValue("")
|
statement.SetValue("")
|
||||||
|
|
||||||
data, _ := json.Marshal(ChatMessage{
|
via.Publish(c, "chat.room."+currentRoom, ChatMessage{
|
||||||
User: currentUser,
|
User: currentUser,
|
||||||
Message: msg,
|
Message: msg,
|
||||||
Time: time.Now().UnixMilli(),
|
Time: time.Now().UnixMilli(),
|
||||||
})
|
})
|
||||||
c.Publish("chat.room."+currentRoom, data)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
c.View(func() h.H {
|
c.View(func() h.H {
|
||||||
|
|||||||
284
internal/examples/pubsub-crud/main.go
Normal file
284
internal/examples/pubsub-crud/main.go
Normal file
@@ -0,0 +1,284 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"html"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ryanhamamura/via"
|
||||||
|
"github.com/ryanhamamura/via/h"
|
||||||
|
"github.com/ryanhamamura/via/vianats"
|
||||||
|
)
|
||||||
|
|
||||||
|
var WithSignal = via.WithSignal
|
||||||
|
|
||||||
|
type Bookmark struct {
|
||||||
|
ID string
|
||||||
|
Title string
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
|
||||||
|
type CRUDEvent struct {
|
||||||
|
Action string `json:"action"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
bookmarks []Bookmark
|
||||||
|
bookmarksMu sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
func randomHex(n int) string {
|
||||||
|
b := make([]byte, n)
|
||||||
|
rand.Read(b)
|
||||||
|
return fmt.Sprintf("%x", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findBookmark(id string) (Bookmark, int) {
|
||||||
|
for i, bm := range bookmarks {
|
||||||
|
if bm.ID == id {
|
||||||
|
return bm, i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Bookmark{}, -1
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
ps, err := vianats.New(ctx, "./data/nats")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to start embedded NATS: %v", err)
|
||||||
|
}
|
||||||
|
defer ps.Close()
|
||||||
|
|
||||||
|
err = vianats.EnsureStream(ps, vianats.StreamConfig{
|
||||||
|
Name: "BOOKMARKS",
|
||||||
|
Subjects: []string{"bookmarks.>"},
|
||||||
|
MaxMsgs: 1000,
|
||||||
|
MaxAge: 24 * time.Hour,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to ensure stream: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
v := via.New()
|
||||||
|
v.Config(via.Options{
|
||||||
|
DevMode: true,
|
||||||
|
DocumentTitle: "Bookmarks",
|
||||||
|
LogLevel: via.LogLevelInfo,
|
||||||
|
ServerAddress: ":7331",
|
||||||
|
PubSub: ps,
|
||||||
|
})
|
||||||
|
|
||||||
|
v.AppendToHead(
|
||||||
|
h.Link(h.Rel("stylesheet"), h.Href("https://cdn.jsdelivr.net/npm/daisyui@4/dist/full.min.css")),
|
||||||
|
h.Script(h.Src("https://cdn.tailwindcss.com")),
|
||||||
|
)
|
||||||
|
|
||||||
|
v.Page("/", func(c *via.Context) {
|
||||||
|
userID := randomHex(8)
|
||||||
|
|
||||||
|
titleSignal := c.Signal("")
|
||||||
|
urlSignal := c.Signal("")
|
||||||
|
targetIDSignal := c.Signal("")
|
||||||
|
|
||||||
|
via.Subscribe(c, "bookmarks.events", func(evt CRUDEvent) {
|
||||||
|
if evt.UserID == userID {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
safeTitle := html.EscapeString(evt.Title)
|
||||||
|
var alertClass string
|
||||||
|
switch evt.Action {
|
||||||
|
case "created":
|
||||||
|
alertClass = "alert-success"
|
||||||
|
case "updated":
|
||||||
|
alertClass = "alert-info"
|
||||||
|
case "deleted":
|
||||||
|
alertClass = "alert-error"
|
||||||
|
}
|
||||||
|
c.ExecScript(fmt.Sprintf(`(function(){
|
||||||
|
var tc = document.getElementById('toast-container');
|
||||||
|
if (!tc) return;
|
||||||
|
var d = document.createElement('div');
|
||||||
|
d.className = 'alert %s';
|
||||||
|
d.innerHTML = '<span>Bookmark "%s" %s</span>';
|
||||||
|
tc.appendChild(d);
|
||||||
|
setTimeout(function(){ d.remove(); }, 3000);
|
||||||
|
})()`, alertClass, safeTitle, evt.Action))
|
||||||
|
c.Sync()
|
||||||
|
})
|
||||||
|
|
||||||
|
save := c.Action(func() {
|
||||||
|
title := titleSignal.String()
|
||||||
|
url := urlSignal.String()
|
||||||
|
if title == "" || url == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
targetID := targetIDSignal.String()
|
||||||
|
action := "created"
|
||||||
|
|
||||||
|
bookmarksMu.Lock()
|
||||||
|
if targetID != "" {
|
||||||
|
if _, idx := findBookmark(targetID); idx >= 0 {
|
||||||
|
bookmarks[idx].Title = title
|
||||||
|
bookmarks[idx].URL = url
|
||||||
|
action = "updated"
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
bookmarks = append(bookmarks, Bookmark{
|
||||||
|
ID: randomHex(8),
|
||||||
|
Title: title,
|
||||||
|
URL: url,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
bookmarksMu.Unlock()
|
||||||
|
|
||||||
|
titleSignal.SetValue("")
|
||||||
|
urlSignal.SetValue("")
|
||||||
|
targetIDSignal.SetValue("")
|
||||||
|
|
||||||
|
via.Publish(c, "bookmarks.events", CRUDEvent{
|
||||||
|
Action: action,
|
||||||
|
Title: title,
|
||||||
|
UserID: userID,
|
||||||
|
})
|
||||||
|
c.Sync()
|
||||||
|
})
|
||||||
|
|
||||||
|
edit := c.Action(func() {
|
||||||
|
id := targetIDSignal.String()
|
||||||
|
bookmarksMu.RLock()
|
||||||
|
bm, idx := findBookmark(id)
|
||||||
|
bookmarksMu.RUnlock()
|
||||||
|
if idx < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
titleSignal.SetValue(bm.Title)
|
||||||
|
urlSignal.SetValue(bm.URL)
|
||||||
|
})
|
||||||
|
|
||||||
|
del := c.Action(func() {
|
||||||
|
id := targetIDSignal.String()
|
||||||
|
bookmarksMu.Lock()
|
||||||
|
bm, idx := findBookmark(id)
|
||||||
|
if idx >= 0 {
|
||||||
|
bookmarks = append(bookmarks[:idx], bookmarks[idx+1:]...)
|
||||||
|
}
|
||||||
|
bookmarksMu.Unlock()
|
||||||
|
if idx < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
targetIDSignal.SetValue("")
|
||||||
|
|
||||||
|
via.Publish(c, "bookmarks.events", CRUDEvent{
|
||||||
|
Action: "deleted",
|
||||||
|
Title: bm.Title,
|
||||||
|
UserID: userID,
|
||||||
|
})
|
||||||
|
c.Sync()
|
||||||
|
})
|
||||||
|
|
||||||
|
cancelEdit := c.Action(func() {
|
||||||
|
titleSignal.SetValue("")
|
||||||
|
urlSignal.SetValue("")
|
||||||
|
targetIDSignal.SetValue("")
|
||||||
|
})
|
||||||
|
|
||||||
|
c.View(func() h.H {
|
||||||
|
isEditing := targetIDSignal.String() != ""
|
||||||
|
|
||||||
|
// Build table rows
|
||||||
|
bookmarksMu.RLock()
|
||||||
|
var rows []h.H
|
||||||
|
for _, bm := range bookmarks {
|
||||||
|
rows = append(rows, h.Tr(
|
||||||
|
h.Td(h.Text(bm.Title)),
|
||||||
|
h.Td(h.A(h.Href(bm.URL), h.Attr("target", "_blank"), h.Class("link link-primary"), h.Text(bm.URL))),
|
||||||
|
h.Td(
|
||||||
|
h.Div(h.Class("flex gap-1"),
|
||||||
|
h.Button(h.Class("btn btn-xs btn-ghost"), h.Text("Edit"),
|
||||||
|
edit.OnClick(WithSignal(targetIDSignal, bm.ID)),
|
||||||
|
),
|
||||||
|
h.Button(h.Class("btn btn-xs btn-ghost text-error"), h.Text("Delete"),
|
||||||
|
del.OnClick(WithSignal(targetIDSignal, bm.ID)),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
bookmarksMu.RUnlock()
|
||||||
|
|
||||||
|
saveLabel := "Add Bookmark"
|
||||||
|
if isEditing {
|
||||||
|
saveLabel = "Update Bookmark"
|
||||||
|
}
|
||||||
|
|
||||||
|
return h.Div(h.Class("min-h-screen bg-base-200"),
|
||||||
|
// Navbar
|
||||||
|
h.Div(h.Class("navbar bg-base-100 shadow-sm"),
|
||||||
|
h.Div(h.Class("flex-1"),
|
||||||
|
h.A(h.Class("btn btn-ghost text-xl"), h.Text("Bookmarks")),
|
||||||
|
),
|
||||||
|
h.Div(h.Class("flex-none"),
|
||||||
|
h.Div(h.Class("badge badge-outline"), h.Text(userID[:8])),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
h.Div(h.Class("container mx-auto p-4 max-w-3xl flex flex-col gap-4"),
|
||||||
|
// Form card
|
||||||
|
h.Div(h.Class("card bg-base-100 shadow"),
|
||||||
|
h.Div(h.Class("card-body"),
|
||||||
|
h.H2(h.Class("card-title"), h.Text(saveLabel)),
|
||||||
|
h.Div(h.Class("flex flex-col gap-2"),
|
||||||
|
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("Title"), titleSignal.Bind()),
|
||||||
|
h.Input(h.Class("input input-bordered w-full"), h.Type("text"), h.Placeholder("https://example.com"), urlSignal.Bind()),
|
||||||
|
h.Div(h.Class("card-actions justify-end"),
|
||||||
|
h.If(isEditing,
|
||||||
|
h.Button(h.Class("btn btn-ghost"), h.Text("Cancel"), cancelEdit.OnClick()),
|
||||||
|
),
|
||||||
|
h.Button(h.Class("btn btn-primary"), h.Text(saveLabel), save.OnClick()),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
// Table card
|
||||||
|
h.Div(h.Class("card bg-base-100 shadow"),
|
||||||
|
h.Div(h.Class("card-body"),
|
||||||
|
h.H2(h.Class("card-title"), h.Text("All Bookmarks")),
|
||||||
|
h.If(len(rows) == 0,
|
||||||
|
h.P(h.Class("text-base-content/60"), h.Text("No bookmarks yet. Add one above!")),
|
||||||
|
),
|
||||||
|
h.If(len(rows) > 0,
|
||||||
|
h.Div(h.Class("overflow-x-auto"),
|
||||||
|
h.Table(h.Class("table"),
|
||||||
|
h.THead(h.Tr(
|
||||||
|
h.Th(h.Text("Title")),
|
||||||
|
h.Th(h.Text("URL")),
|
||||||
|
h.Th(h.Text("Actions")),
|
||||||
|
)),
|
||||||
|
h.TBody(rows...),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
// Toast container — ignored by morph so Sync() doesn't wipe active toasts
|
||||||
|
h.Div(h.ID("toast-container"), h.Class("toast toast-end toast-top"), h.DataIgnoreMorph()),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Println("Starting pubsub-crud example on :7331")
|
||||||
|
v.Start()
|
||||||
|
}
|
||||||
23
pubsub_helpers.go
Normal file
23
pubsub_helpers.go
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
package via
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
// Publish JSON-marshals msg and publishes to subject.
|
||||||
|
func Publish[T any](c *Context, subject string, msg T) error {
|
||||||
|
data, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.Publish(subject, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe JSON-unmarshals each message as T and calls handler.
|
||||||
|
func Subscribe[T any](c *Context, subject string, handler func(T)) (Subscription, error) {
|
||||||
|
return c.Subscribe(subject, func(data []byte) {
|
||||||
|
var msg T
|
||||||
|
if err := json.Unmarshal(data, &msg); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handler(msg)
|
||||||
|
})
|
||||||
|
}
|
||||||
66
pubsub_helpers_test.go
Normal file
66
pubsub_helpers_test.go
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package via
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ryanhamamura/via/h"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPublishSubscribe_RoundTrip(t *testing.T) {
|
||||||
|
ps := newMockPubSub()
|
||||||
|
v := New()
|
||||||
|
v.Config(Options{PubSub: ps})
|
||||||
|
|
||||||
|
type event struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Count int `json:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var got event
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
c := newContext("typed-ctx", "/", v)
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
|
_, err := Subscribe(c, "events", func(e event) {
|
||||||
|
got = e
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = Publish(c, "events", event{Name: "click", Count: 42})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
assert.Equal(t, "click", got.Name)
|
||||||
|
assert.Equal(t, 42, got.Count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribe_SkipsBadJSON(t *testing.T) {
|
||||||
|
ps := newMockPubSub()
|
||||||
|
v := New()
|
||||||
|
v.Config(Options{PubSub: ps})
|
||||||
|
|
||||||
|
type msg struct {
|
||||||
|
Text string `json:"text"`
|
||||||
|
}
|
||||||
|
|
||||||
|
called := false
|
||||||
|
c := newContext("bad-json-ctx", "/", v)
|
||||||
|
c.View(func() h.H { return h.Div() })
|
||||||
|
|
||||||
|
_, err := Subscribe(c, "topic", func(m msg) {
|
||||||
|
called = true
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Publish raw invalid JSON — handler should silently skip
|
||||||
|
err = c.Publish("topic", []byte("not json"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.False(t, called)
|
||||||
|
}
|
||||||
102
via.go
102
via.go
@@ -36,20 +36,21 @@ var datastarJS []byte
|
|||||||
// V is the root application.
|
// V is the root application.
|
||||||
// It manages page routing, user sessions, and SSE connections for live updates.
|
// It manages page routing, user sessions, and SSE connections for live updates.
|
||||||
type V struct {
|
type V struct {
|
||||||
cfg Options
|
cfg Options
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
server *http.Server
|
server *http.Server
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
contextRegistry map[string]*Context
|
contextRegistry map[string]*Context
|
||||||
contextRegistryMutex sync.RWMutex
|
contextRegistryMutex sync.RWMutex
|
||||||
documentHeadIncludes []h.H
|
documentHeadIncludes []h.H
|
||||||
documentFootIncludes []h.H
|
documentFootIncludes []h.H
|
||||||
devModePageInitFnMap map[string]func(*Context)
|
devModePageInitFnMap map[string]func(*Context)
|
||||||
sessionManager *scs.SessionManager
|
sessionManager *scs.SessionManager
|
||||||
pubsub PubSub
|
pubsub PubSub
|
||||||
datastarPath string
|
datastarPath string
|
||||||
datastarContent []byte
|
datastarContent []byte
|
||||||
datastarOnce sync.Once
|
datastarOnce sync.Once
|
||||||
|
reaperStop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event {
|
func (v *V) logEvent(evt *zerolog.Event, c *Context) *zerolog.Event {
|
||||||
@@ -127,6 +128,9 @@ func (v *V) Config(cfg Options) {
|
|||||||
if cfg.PubSub != nil {
|
if cfg.PubSub != nil {
|
||||||
v.pubsub = cfg.PubSub
|
v.pubsub = cfg.PubSub
|
||||||
}
|
}
|
||||||
|
if cfg.ContextTTL != 0 {
|
||||||
|
v.cfg.ContextTTL = cfg.ContextTTL
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendToHead appends the given h.H nodes to the head of the base HTML document.
|
// AppendToHead appends the given h.H nodes to the head of the base HTML document.
|
||||||
@@ -238,6 +242,14 @@ func (v *V) currSessionNum() int {
|
|||||||
return len(v.contextRegistry)
|
return len(v.contextRegistry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *V) cleanupCtx(c *Context) {
|
||||||
|
c.dispose()
|
||||||
|
if v.cfg.DevMode {
|
||||||
|
v.devModeRemovePersisted(c)
|
||||||
|
}
|
||||||
|
v.unregisterCtx(c)
|
||||||
|
}
|
||||||
|
|
||||||
func (v *V) unregisterCtx(c *Context) {
|
func (v *V) unregisterCtx(c *Context) {
|
||||||
if c.id == "" {
|
if c.id == "" {
|
||||||
v.logErr(c, "unregister ctx failed: ctx contains empty id")
|
v.logErr(c, "unregister ctx failed: ctx contains empty id")
|
||||||
@@ -259,6 +271,50 @@ func (v *V) getCtx(id string) (*Context, error) {
|
|||||||
return nil, fmt.Errorf("ctx '%s' not found", id)
|
return nil, fmt.Errorf("ctx '%s' not found", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *V) startReaper() {
|
||||||
|
ttl := v.cfg.ContextTTL
|
||||||
|
if ttl < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ttl == 0 {
|
||||||
|
ttl = 30 * time.Second
|
||||||
|
}
|
||||||
|
interval := ttl / 3
|
||||||
|
if interval < 5*time.Second {
|
||||||
|
interval = 5 * time.Second
|
||||||
|
}
|
||||||
|
v.reaperStop = make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-v.reaperStop:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
v.reapOrphanedContexts(ttl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *V) reapOrphanedContexts(ttl time.Duration) {
|
||||||
|
now := time.Now()
|
||||||
|
v.contextRegistryMutex.RLock()
|
||||||
|
var orphans []*Context
|
||||||
|
for _, c := range v.contextRegistry {
|
||||||
|
if !c.sseConnected.Load() && now.Sub(c.createdAt) > ttl {
|
||||||
|
orphans = append(orphans, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v.contextRegistryMutex.RUnlock()
|
||||||
|
|
||||||
|
for _, c := range orphans {
|
||||||
|
v.logInfo(c, "reaping orphaned context (no SSE connection after %s)", ttl)
|
||||||
|
v.cleanupCtx(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
// Start starts the Via HTTP server and blocks until a SIGINT or SIGTERM
|
||||||
// signal is received, then performs a graceful shutdown.
|
// signal is received, then performs a graceful shutdown.
|
||||||
func (v *V) Start() {
|
func (v *V) Start() {
|
||||||
@@ -271,6 +327,8 @@ func (v *V) Start() {
|
|||||||
Handler: handler,
|
Handler: handler,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v.startReaper()
|
||||||
|
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
errCh <- v.server.ListenAndServe()
|
errCh <- v.server.ListenAndServe()
|
||||||
@@ -301,6 +359,9 @@ func (v *V) Shutdown() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (v *V) shutdown() {
|
func (v *V) shutdown() {
|
||||||
|
if v.reaperStop != nil {
|
||||||
|
close(v.reaperStop)
|
||||||
|
}
|
||||||
v.logInfo(nil, "draining all contexts")
|
v.logInfo(nil, "draining all contexts")
|
||||||
v.drainAllContexts()
|
v.drainAllContexts()
|
||||||
|
|
||||||
@@ -400,10 +461,7 @@ func (v *V) devModeRemovePersisted(c *Context) {
|
|||||||
}
|
}
|
||||||
file.Close()
|
file.Close()
|
||||||
|
|
||||||
// remove ctx to persisted list
|
delete(ctxRegMap, c.id)
|
||||||
if _, ok := ctxRegMap[c.id]; !ok {
|
|
||||||
delete(ctxRegMap, c.id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// write persisted list to file
|
// write persisted list to file
|
||||||
file, err = os.Create(p)
|
file, err = os.Create(p)
|
||||||
@@ -507,6 +565,7 @@ func New() *V {
|
|||||||
// use last-event-id to tell if request is a sse reconnect
|
// use last-event-id to tell if request is a sse reconnect
|
||||||
sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via"))
|
sse.Send(datastar.EventTypePatchElements, []string{}, datastar.WithSSEEventId("via"))
|
||||||
|
|
||||||
|
c.sseConnected.Store(true)
|
||||||
v.logDebug(c, "SSE connection established")
|
v.logDebug(c, "SSE connection established")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@@ -517,6 +576,7 @@ func New() *V {
|
|||||||
select {
|
select {
|
||||||
case <-sse.Context().Done():
|
case <-sse.Context().Done():
|
||||||
v.logDebug(c, "SSE connection ended")
|
v.logDebug(c, "SSE connection ended")
|
||||||
|
v.cleanupCtx(c)
|
||||||
return
|
return
|
||||||
case <-c.ctxDisposedChan:
|
case <-c.ctxDisposedChan:
|
||||||
v.logDebug(c, "context disposed, closing SSE")
|
v.logDebug(c, "context disposed, closing SSE")
|
||||||
@@ -603,12 +663,8 @@ func New() *V {
|
|||||||
v.logErr(c, "failed to handle session close: %v", err)
|
v.logErr(c, "failed to handle session close: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.dispose()
|
|
||||||
v.logDebug(c, "session close event triggered")
|
v.logDebug(c, "session close event triggered")
|
||||||
if v.cfg.DevMode {
|
v.cleanupCtx(c)
|
||||||
v.devModeRemovePersisted(c)
|
|
||||||
}
|
|
||||||
v.unregisterCtx(c)
|
|
||||||
})
|
})
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|||||||
94
via_test.go
94
via_test.go
@@ -1,9 +1,13 @@
|
|||||||
package via
|
package via
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ryanhamamura/via/h"
|
"github.com/ryanhamamura/via/h"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -235,3 +239,93 @@ func TestPage_PanicsOnNoView(t *testing.T) {
|
|||||||
v.Page("/", func(c *Context) {})
|
v.Page("/", func(c *Context) {})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReaperCleansOrphanedContexts(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("orphan-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-time.Minute) // created 1 min ago
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
_, err := v.getCtx("orphan-1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(10 * time.Second)
|
||||||
|
|
||||||
|
_, err = v.getCtx("orphan-1")
|
||||||
|
assert.Error(t, err, "orphaned context should have been reaped")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperIgnoresConnectedContexts(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("connected-1", "/", v)
|
||||||
|
c.createdAt = time.Now().Add(-time.Minute)
|
||||||
|
c.sseConnected.Store(true)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
v.reapOrphanedContexts(10 * time.Second)
|
||||||
|
|
||||||
|
_, err := v.getCtx("connected-1")
|
||||||
|
assert.NoError(t, err, "connected context should survive reaping")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReaperDisabledWithNegativeTTL(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
v.cfg.ContextTTL = -1
|
||||||
|
v.startReaper()
|
||||||
|
assert.Nil(t, v.reaperStop, "reaper should not start with negative TTL")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCleanupCtxIdempotent(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
c := newContext("idempotent-1", "/", v)
|
||||||
|
v.registerCtx(c)
|
||||||
|
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
v.cleanupCtx(c)
|
||||||
|
v.cleanupCtx(c)
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := v.getCtx("idempotent-1")
|
||||||
|
assert.Error(t, err, "context should be removed after cleanup")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDevModeRemovePersistedFix(t *testing.T) {
|
||||||
|
v := New()
|
||||||
|
v.cfg.DevMode = true
|
||||||
|
|
||||||
|
dir := filepath.Join(t.TempDir(), ".via", "devmode")
|
||||||
|
p := filepath.Join(dir, "ctx.json")
|
||||||
|
assert.NoError(t, os.MkdirAll(dir, 0755))
|
||||||
|
|
||||||
|
// Write a persisted context
|
||||||
|
ctxRegMap := map[string]string{"test-ctx-1": "/"}
|
||||||
|
f, err := os.Create(p)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NoError(t, json.NewEncoder(f).Encode(ctxRegMap))
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
// Patch devModeRemovePersisted to use our temp path by calling it
|
||||||
|
// directly — we need to override the path. Instead, test via the
|
||||||
|
// actual function by temporarily changing the working dir.
|
||||||
|
origDir, _ := os.Getwd()
|
||||||
|
assert.NoError(t, os.Chdir(t.TempDir()))
|
||||||
|
defer os.Chdir(origDir)
|
||||||
|
|
||||||
|
// Re-create the structure in the temp dir
|
||||||
|
assert.NoError(t, os.MkdirAll(filepath.Join(".via", "devmode"), 0755))
|
||||||
|
p2 := filepath.Join(".via", "devmode", "ctx.json")
|
||||||
|
f2, _ := os.Create(p2)
|
||||||
|
json.NewEncoder(f2).Encode(map[string]string{"test-ctx-1": "/"})
|
||||||
|
f2.Close()
|
||||||
|
|
||||||
|
c := newContext("test-ctx-1", "/", v)
|
||||||
|
v.devModeRemovePersisted(c)
|
||||||
|
|
||||||
|
// Read back and verify
|
||||||
|
f3, err := os.Open(p2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer f3.Close()
|
||||||
|
var result map[string]string
|
||||||
|
assert.NoError(t, json.NewDecoder(f3).Decode(&result))
|
||||||
|
assert.Empty(t, result, "persisted context should be removed")
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ package vianats
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/delaneyj/toolbelt/embeddednats"
|
"github.com/delaneyj/toolbelt/embeddednats"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
@@ -76,3 +78,50 @@ func (n *NATS) Conn() *nats.Conn {
|
|||||||
func (n *NATS) JetStream() nats.JetStreamContext {
|
func (n *NATS) JetStream() nats.JetStreamContext {
|
||||||
return n.js
|
return n.js
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamConfig holds the parameters for creating or updating a JetStream stream.
|
||||||
|
type StreamConfig struct {
|
||||||
|
Name string
|
||||||
|
Subjects []string
|
||||||
|
MaxMsgs int64
|
||||||
|
MaxAge time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnsureStream creates or updates a JetStream stream matching cfg.
|
||||||
|
func EnsureStream(n *NATS, cfg StreamConfig) error {
|
||||||
|
_, err := n.js.AddStream(&nats.StreamConfig{
|
||||||
|
Name: cfg.Name,
|
||||||
|
Subjects: cfg.Subjects,
|
||||||
|
Retention: nats.LimitsPolicy,
|
||||||
|
MaxMsgs: cfg.MaxMsgs,
|
||||||
|
MaxAge: cfg.MaxAge,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplayHistory fetches the last limit messages from subject,
|
||||||
|
// deserializing each as T. Returns an empty slice if nothing is available.
|
||||||
|
func ReplayHistory[T any](n *NATS, subject string, limit int) ([]T, error) {
|
||||||
|
sub, err := n.js.SubscribeSync(subject, nats.DeliverAll(), nats.OrderedConsumer())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
var msgs []T
|
||||||
|
for {
|
||||||
|
raw, err := sub.NextMsg(200 * time.Millisecond)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var msg T
|
||||||
|
if json.Unmarshal(raw.Data, &msg) == nil {
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit > 0 && len(msgs) > limit {
|
||||||
|
msgs = msgs[len(msgs)-limit:]
|
||||||
|
}
|
||||||
|
return msgs, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user