Skip to content

Commit b394cee

Browse files
authored
Merge pull request #119 from hyp3rd/feat/dist-mem-cache
feat(mgmt): add SSE endpoint for live cluster topology events
2 parents 9c1961c + fbf2954 commit b394cee

11 files changed

Lines changed: 1307 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,36 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
88

99
### Added
1010

11+
- **`GET /cluster/events` — SSE stream of topology updates.** New
12+
Server-Sent Events endpoint on the management HTTP port that
13+
pushes `members` (full membership snapshot) and `heartbeat`
14+
(counters snapshot) frames to subscribers, replacing the
15+
monitor's 2-second poll cadence with a live stream. Connect-time
16+
frames carry the current snapshot so a fresh subscriber doesn't
17+
wait for the next mutation. The cache wires an in-process
18+
broadcaster ([`internal/eventbus`](internal/eventbus/bus.go))
19+
that drops events for slow consumers (per-subscriber bounded
20+
buffer) so the SWIM heartbeat loop never backpressures on a
21+
stuck operator's browser. Membership state changes propagate via
22+
the new [`Membership.OnStateChange`](internal/cluster/membership.go)
23+
observer hook; heartbeat snapshots tick at 1 Hz aligned with
24+
the SWIM interval. Read-scope auth (matches existing
25+
`/cluster/*` routes); honors the management server's lifecycle
26+
context for graceful drain on `Stop()`.
27+
28+
### Changed
29+
30+
- **Management server `defaultWriteTimeout` 5 s → 0 (no cap).**
31+
fasthttp resets WriteTimeout per response; the 5 s default
32+
force-closed the new SSE stream at exactly that mark, the
33+
consumer saw "other side closed", and the monitor fell back
34+
to polling. Lifting the cap is safe for the mgmt port because
35+
it's internal-only and JSON handlers complete in milliseconds;
36+
idle keep-alive connections are still bounded by the 60 s
37+
IdleTimeout. Operators who need a write cap can opt in via
38+
[`WithMgmtWriteTimeout`](management_http.go).
39+
Regression-pinned by `TestManagementHTTP_ClusterEvents`, which
40+
reads frames for 6 s past the historic deadline.
1141
- **Per-route scope enforcement on the management HTTP port.**
1242
`WithMgmtControlAuth` is a new option that wraps the cluster-
1343
mutating control endpoints (`POST /evict`, `POST /clear`,

cspell.config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ words:
4040
- assertable
4141
- autosync
4242
- backpressure
43+
- backpressures
4344
- baselining
4445
- benchmarkdist
4546
- benchmem
@@ -49,6 +50,7 @@ words:
4950
- bodyclose
5051
- bufbuild
5152
- buildx
53+
- bursty
5254
- cacheerrors
5355
- cachev
5456
- calledback
@@ -80,6 +82,7 @@ words:
8082
- Equalf
8183
- errcheck
8284
- errp
85+
- eventbus
8386
- ewrap
8487
- excalidraw
8588
- excludeonly
@@ -230,6 +233,8 @@ words:
230233
- unmarshals
231234
- unpadded
232235
- unsharded
236+
- unsub
237+
- unsubbed
233238
- upserted
234239
- upserts
235240
- varnamelen

hypercache_dist.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,28 @@ package hypercache
33
import (
44
"context"
55

6+
"github.com/hyp3rd/hypercache/internal/eventbus"
67
"github.com/hyp3rd/hypercache/pkg/backend"
78
)
89

10+
// EventBus returns the in-process broadcaster the distributed
11+
// backend uses to publish topology changes (`members`,
12+
// `heartbeat`). Returns nil when the backend is not DistMemory —
13+
// the management HTTP server's SSE handler treats nil as
14+
// "streaming unsupported" and falls back to a 503.
15+
//
16+
// The bus is owned by DistMemory and lives until the backend's
17+
// lifecycle context is cancelled. SSE handlers Subscribe via the
18+
// request context so they're reaped when either the client
19+
// disconnects or the cache shuts down.
20+
func (hyperCache *HyperCache[T]) EventBus() *eventbus.Bus {
21+
if dm, ok := any(hyperCache.backend).(*backend.DistMemory); ok {
22+
return dm.EventBus()
23+
}
24+
25+
return nil
26+
}
27+
928
// DistMetrics returns distributed backend metrics if the underlying backend is DistMemory.
1029
// Returns nil if unsupported.
1130
func (hyperCache *HyperCache[T]) DistMetrics() any { // generic any to avoid exporting type into core interface

internal/cluster/membership.go

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,54 @@ import (
77
"time"
88
)
99

10+
// StateChangeObserver is invoked AFTER a membership mutation
11+
// (Upsert / Mark / Remove) commits, with the membership lock
12+
// already released. Observers MUST NOT call back into the
13+
// Membership they were registered on (deadlock); they SHOULD do
14+
// minimal work (publish to a channel, increment a counter) and
15+
// return promptly, since multiple observers run sequentially on
16+
// the goroutine that performed the mutation.
17+
//
18+
// Phase C SSE: the cache binary registers one observer that
19+
// publishes a `members` event onto the in-process event bus, so
20+
// SSE subscribers see state transitions without re-deriving them
21+
// by polling.
22+
type StateChangeObserver func(id NodeID, state NodeState, version uint64)
23+
1024
// Membership tracks current cluster nodes (static MVP, future: gossip/swim).
1125
type Membership struct {
12-
mu sync.RWMutex
13-
nodes map[NodeID]*Node
14-
ring *Ring
15-
ver MembershipVersion
26+
mu sync.RWMutex
27+
nodes map[NodeID]*Node
28+
ring *Ring
29+
ver MembershipVersion
30+
observers []StateChangeObserver
1631
}
1732

1833
// NewMembership creates a new membership container bound to a ring.
1934
func NewMembership(ring *Ring) *Membership { return &Membership{nodes: map[NodeID]*Node{}, ring: ring} }
2035

36+
// OnStateChange registers a callback invoked after every membership
37+
// mutation (Upsert, Mark, Remove). Registration is append-only and
38+
// not safe for concurrent use with mutations — call this once at
39+
// construction before the cache starts driving heartbeats / gossip.
40+
//
41+
// The callback runs OUTSIDE the membership lock so a slow observer
42+
// does not block the SWIM heartbeat loop. Observers fire in
43+
// registration order; one panicking observer would skip every
44+
// observer registered after it, so observer authors must recover
45+
// in their own code (the package itself does not wrap with
46+
// recover() because that would mask programming bugs).
47+
func (m *Membership) OnStateChange(fn StateChangeObserver) {
48+
if fn == nil {
49+
return
50+
}
51+
52+
m.mu.Lock()
53+
54+
m.observers = append(m.observers, fn)
55+
m.mu.Unlock()
56+
}
57+
2158
// Upsert adds or updates a node and rebuilds ring.
2259
func (m *Membership) Upsert(n *Node) {
2360
m.mu.Lock()
@@ -32,10 +69,14 @@ func (m *Membership) Upsert(n *Node) {
3269
}
3370
}
3471

35-
m.ver.Next()
72+
version := m.ver.Next()
73+
observers := m.observers
74+
id := n.ID
75+
state := n.State
3676
m.mu.Unlock()
3777

3878
m.ring.Build(nodes)
79+
notify(observers, id, state, version)
3980
}
4081

4182
// List returns current nodes snapshot.
@@ -80,10 +121,15 @@ func (m *Membership) Remove(id NodeID) bool {
80121
}
81122
}
82123

83-
m.ver.Next()
124+
version := m.ver.Next()
125+
observers := m.observers
84126
m.mu.Unlock()
85127

86128
m.ring.Build(nodes)
129+
// State NodeDead represents "removed from membership" for
130+
// observer purposes; the node is gone from the map and will
131+
// not appear in any subsequent List() call.
132+
notify(observers, id, NodeDead, version)
87133

88134
return true
89135
}
@@ -93,18 +139,34 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool {
93139
m.mu.Lock()
94140

95141
n, ok := m.nodes[id]
96-
if ok {
97-
n.State = state
98-
n.Incarnation++
99-
100-
n.LastSeen = time.Now()
142+
if !ok {
143+
m.mu.Unlock()
101144

102-
m.ver.Next()
145+
return false
103146
}
104147

148+
n.State = state
149+
n.Incarnation++
150+
151+
n.LastSeen = time.Now()
152+
153+
version := m.ver.Next()
154+
observers := m.observers
155+
105156
m.mu.Unlock()
106157

107-
return ok
158+
notify(observers, id, state, version)
159+
160+
return true
161+
}
162+
163+
// notify invokes each observer in registration order with the
164+
// resolved state and version. Pulled out so the call sites read
165+
// "mutate, unlock, notify" uniformly without inline loops.
166+
func notify(observers []StateChangeObserver, id NodeID, state NodeState, version uint64) {
167+
for _, fn := range observers {
168+
fn(id, state, version)
169+
}
108170
}
109171

110172
// Version returns current membership version.

0 commit comments

Comments
 (0)