Skip to content

Commit 71e1eb2

Browse files
authored
fix(cluster): resolve four steady-state distributed cluster regressions (#133)
2 parents 606e824 + 0168f93 commit 71e1eb2

9 files changed

Lines changed: 726 additions & 17 deletions

File tree

CHANGELOG.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,81 @@ All notable changes to HyperCache are recorded here. The format follows
286286

287287
### Fixed
288288

289+
- **applySet now clones the key string before storing it as the shard's map key.** Under HTTP traffic
290+
(Fiber + the v1 cache API), path parameters returned by `c.Params("key")` are backed by a pooled
291+
request buffer that the framework reuses for the next request. The original `applySet` stored the
292+
caller's string directly as the `ConcurrentMap` key; when the next request landed, the buffer's
293+
bytes mutated, and so did every map key (and `Item.Key` field) we'd previously stored. The
294+
immediate symptom: the same logical key drifted across multiple shards (`first-24` showing up in
295+
shards 2, 3, 4, and twice in shard 6), and phantom keys like `first-479` materialized in the
296+
iteration (a "first-4" buffer overlaid with "79" from the next URL). The rebalance loop, scanning
297+
`sh.items.All()`, kept re-flagging these phantoms — `RebalancedPrimary` climbed at ~60/s on a
298+
5-node cluster after a single 100-key write batch, even though MembershipVersion, hint queues,
299+
merkle counters, and the `WriteApplyRefused` guard were all quiet. The fix is one
300+
`strings.Clone(item.Key)` call in [`applySet`](pkg/backend/dist_memory.go) before recording the
301+
originalPrimary and storing: the cloned key has its own backing array, fully detached from the
302+
caller's pooled buffer. The `Item.Key` field on the stored clone gets the same stable value so
303+
any downstream code observes a coherent shard entry. Post-fix the cluster's rebalance counters
304+
stay at exactly zero in steady state across all 5 nodes.
305+
- **Receiver-side ownership guard breaks the divergent-ring-view rebalance loop.** After the
306+
`migrateIfNeeded`-side fix (one migration per stuck key, then release) shipped, operators on a 5-node
307+
cluster running [`scripts/tests/30-test-cluster-writes.sh`](scripts/tests/30-test-cluster-writes.sh)
308+
still saw `RebalancedPrimary` climb at ~60/s post-script with no membership, hint, or merkle activity.
309+
Root cause: when the migration target's ring view still treated the original source as a replica, the
310+
target's `applySet` fan-out re-planted the key on the source. The source released it (per the earlier
311+
fix), then received it back on the next gossip tick, then migrated again — perpetual cycle even though
312+
no state was actually transitioning. New [`applyForwardedSet`](pkg/backend/dist_memory.go) is the entry
313+
point used by the transport-receiver paths (`InProcessTransport.ForwardSet` and the HTTP
314+
`/internal/set` handler) and applies an ownership guard: if the receiver's ring view says it isn't an
315+
owner of the key, the write is silently dropped. The sender's transport call still returns nil (no
316+
behavioral break — best-effort semantics already governed the hot path), but the receiver's shard
317+
stays clean. Merkle anti-entropy is the convergence safety net for any write refused here. The guard
318+
is deliberately NOT in `applySet` itself: legitimate internal callers (setImpl primary path,
319+
`migrateIfNeeded` forwarder, merkle pull, read-repair) have either already verified ownership or
320+
explicitly want to plant regardless — moving the guard would have broken `TestHTTPFetchMerkle`. New
321+
`dist.write.apply_refused` counter exposes how often the guard fires (zero on healthy views;
322+
non-zero indicates divergence operators may want to investigate). New test
323+
[`TestDistRebalance_ApplyOwnershipGuardRefusesForeignWrites`](tests/hypercache_distmemory_rebalance_steady_test.go)
324+
drives a direct `ForwardSet` to a non-owner and asserts the shard stays clean and the refused-counter
325+
ticks up.
326+
- **Rebalance counters no longer climb in a steady-state cluster.** When a key was no longer owned by the
327+
current node — because the ring had shifted away from it (typical after a node joins or a singleton
328+
cluster gains peers) — [`migrateIfNeeded`](pkg/backend/dist_memory.go) forwarded the value to the new
329+
primary but only scheduled the LOCAL copy for deletion when `WithDistRemovalGrace > 0`. The default
330+
removal-grace setting is zero, which meant the local item was never released; on every subsequent
331+
rebalance tick `shouldRebalance` re-flagged the same key via its `!ownsKeyInternal` branch, and
332+
`migrateIfNeeded` re-emitted the migration. Operators saw `RebalancedKeys` and `RebalancedPrimary`
333+
climb at the scan-tick rate forever — e.g. 5,326 keys / 5,102 primary migrations on a 5-node cluster
334+
with ~14 stuck keys and a 100ms ticker, even though no membership had actually changed. Migration now
335+
releases the local copy immediately when `removalGracePeriod == 0` (and continues to schedule a
336+
deferred delete via `shedRemovedKeys` when a grace period is configured), so each stuck key produces
337+
exactly one migration and the loop quiesces. Two new integration tests in
338+
[`tests/hypercache_distmemory_rebalance_steady_test.go`](tests/hypercache_distmemory_rebalance_steady_test.go)
339+
pin both contracts: `TestDistRebalance_IdleClusterIsSilent` asserts a 5-node RF=3 cluster with no
340+
out-of-place keys produces zero counter bumps across many ticks, and
341+
`TestDistRebalance_LostOwnershipDrainsOnce` plants one stuck key per node via `DebugInject` and asserts
342+
the counters reach exactly one bump per stuck key and never advance after that.
343+
- **Incarnation and MembershipVersion no longer churn on every heartbeat.** SWIM-style incarnation
344+
numbers and the membership version vector were both inflating roughly in lock-step with elapsed-probes —
345+
a 5-node cluster running for a few hours showed incarnations near 2,378 per peer and a MembershipVersion
346+
past 4,800, even though no nodes had actually changed state. [`Membership.Mark`](internal/cluster/membership.go)
347+
was unconditionally incrementing incarnation, advancing the version counter, AND firing observers on
348+
every call; the heartbeat-success path in `evaluateLiveness` calls `Mark(peer, NodeAlive)` once per probe
349+
per peer. Three downstream effects: (i) operators couldn't read incarnation as a state-change signal,
350+
(ii) gossip-merge fanned out spurious "version went up" deltas, (iii) SSE consumers received constant
351+
no-op `members` events. Mark now treats same-state as a full no-op — LastSeen still refreshes (the
352+
suspect-after timeout machinery needs that), but incarnation, version, and observers all stay quiet.
353+
Genuine state transitions (Alive↔Suspect) still bump all three, so the "higher incarnation wins" gossip
354+
merge continues to propagate real changes. New [`Membership.Refute`](internal/cluster/membership.go) is
355+
the explicit SWIM self-refute primitive: it always bumps incarnation and sets state to NodeAlive, even
356+
when the local view is already Alive — the one path that legitimately needs to publish a
357+
higher-incarnation refutation packet regardless of local-view state. `refuteIfSuspected` in
358+
[`pkg/backend/dist_memory.go`](pkg/backend/dist_memory.go) switched from `Mark(localID, NodeAlive)` to
359+
`Refute(localID)` so the divergent semantic is obvious at the call site. Five new unit tests in
360+
[`internal/cluster/membership_test.go`](internal/cluster/membership_test.go) pin: no-incarnation-bump on
361+
same-state Mark, no-version-bump and no-observer-fire on same-state Mark, bump-on-transition, refute
362+
always bumps, and the ghost-node guard. The existing `TestDistSWIM_SelfRefute` integration test
363+
continues to pass byte-identical.
289364
- **Remove path no longer silently succeeds when the primary is unreachable.**
290365
Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to
291366
swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a

cspell.config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ words:
194194
- mset
195195
- msgpack
196196
- mvdan
197+
- neighbouring
197198
- nestif
198199
- Newf
199200
- nilnil
@@ -221,6 +222,7 @@ words:
221222
- pyenv
222223
- pygments
223224
- pymdownx
225+
- quiesces
224226
- reaad
225227
- readrepair
226228
- recvcheck

internal/cluster/membership.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,34 @@ func (m *Membership) Remove(id NodeID) bool {
134134
return true
135135
}
136136

137-
// Mark updates node state + incarnation and refreshes LastSeen. Returns true if node exists.
137+
// Mark records an observer-side state update for the given node.
138+
// Behavior depends on whether `state` represents a real transition:
139+
//
140+
// - state == current: no-op. LastSeen refreshes (the peer answered
141+
// a heartbeat probe, so its observability window resets), but
142+
// incarnation, the membership version vector, and registered
143+
// observers are all left alone.
144+
// - state != current: incarnation bumps, version vector advances,
145+
// and observers fire — this is a real membership state change
146+
// and gossip-merge / SSE consumers need to know.
147+
//
148+
// Returns true if the node exists.
149+
//
150+
// Why no-op on same state: pre-fix every successful heartbeat probe
151+
// called Mark(peer, Alive) and unconditionally bumped both the
152+
// node's incarnation AND the membership version counter. Operators
153+
// running a 5-node cluster for a few hours saw incarnations climb
154+
// to ~2,378 and MembershipVersion past 4,800 — the value tracked
155+
// "elapsed probes," not "state changes." SWIM defines incarnation
156+
// as a sequence number OWNED by the node itself, and the
157+
// membership-version vector should signal "something real
158+
// happened" so gossip knows when to fan out a delta. Both must be
159+
// quiet during steady-state liveness churn.
160+
//
161+
// Self-refute — the one observer-side path that legitimately needs
162+
// to bump incarnation regardless of the local state value — lives
163+
// in `Refute()` so the divergent semantic is explicit at the call
164+
// site.
138165
func (m *Membership) Mark(id NodeID, state NodeState) bool {
139166
m.mu.Lock()
140167

@@ -145,9 +172,20 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool {
145172
return false
146173
}
147174

148-
n.State = state
175+
if n.State == state {
176+
// Same-state no-op: refresh LastSeen so the suspect-after
177+
// timeout machinery sees the probe response, but do NOT
178+
// advance version / fire observers / bump incarnation.
179+
// Nothing materially changed.
180+
n.LastSeen = time.Now()
181+
m.mu.Unlock()
182+
183+
return true
184+
}
185+
149186
n.Incarnation++
150187

188+
n.State = state
151189
n.LastSeen = time.Now()
152190

153191
version := m.ver.Next()
@@ -160,6 +198,48 @@ func (m *Membership) Mark(id NodeID, state NodeState) bool {
160198
return true
161199
}
162200

201+
// Refute is the SWIM self-refute primitive: when this node receives
202+
// gossip that it is Suspect/Dead at some incarnation N, it MUST
203+
// publish a higher incarnation (N+1) with state=Alive so the next
204+
// gossip tick disseminates the refutation cluster-wide (peers using
205+
// "higher incarnation wins" adopt the new value).
206+
//
207+
// Unlike `Mark()`, Refute always bumps incarnation — the local state
208+
// is already Alive (a node never thinks itself dead), so a
209+
// transition-only rule would silently no-op and the refutation would
210+
// fail to propagate. The dedicated method makes the divergent
211+
// semantic obvious at the only call site that needs it
212+
// (`refuteIfSuspected` in pkg/backend/dist_memory.go).
213+
//
214+
// Returns true if the node exists. State is unconditionally set to
215+
// NodeAlive; the typical caller already knows the local node is
216+
// alive but the explicit assignment guards against odd states the
217+
// local view might be holding.
218+
func (m *Membership) Refute(id NodeID) bool {
219+
m.mu.Lock()
220+
221+
n, ok := m.nodes[id]
222+
if !ok {
223+
m.mu.Unlock()
224+
225+
return false
226+
}
227+
228+
n.Incarnation++
229+
230+
n.State = NodeAlive
231+
n.LastSeen = time.Now()
232+
233+
version := m.ver.Next()
234+
observers := m.observers
235+
236+
m.mu.Unlock()
237+
238+
notify(observers, id, NodeAlive, version)
239+
240+
return true
241+
}
242+
163243
// notify invokes each observer in registration order with the
164244
// resolved state and version. Pulled out so the call sites read
165245
// "mutate, unlock, notify" uniformly without inline loops.

internal/cluster/membership_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,184 @@ func TestMembership_OnStateChange_DoesNotBlockMutation(t *testing.T) {
211211
close(releaseObserver)
212212
wg.Wait()
213213
}
214+
215+
// TestMembership_Mark_NoBumpOnSameState pins the
216+
// transition-only-bump contract: Mark() must NOT increment a node's
217+
// incarnation when the requested state matches the current state.
218+
// Without this rule, steady-state heartbeat success paths
219+
// (evaluateLiveness → Mark(peer, Alive)) inflate the counter once
220+
// per probe — operators saw incarnation values in the thousands
221+
// after a few hours of normal operation. Incarnation is owned by
222+
// the node itself in SWIM; observers should not churn it.
223+
func TestMembership_Mark_NoBumpOnSameState(t *testing.T) {
224+
t.Parallel()
225+
226+
m := NewMembership(NewRing())
227+
m.Upsert(NewNode("n1", "127.0.0.1:7946"))
228+
229+
before := m.List()[0].Incarnation
230+
231+
// Repeat the same Mark a few times — this models the
232+
// "successful probe every second" pattern that drove the bug.
233+
for range 50 {
234+
m.Mark("n1", NodeAlive)
235+
}
236+
237+
after := m.List()[0].Incarnation
238+
if after != before {
239+
t.Errorf("incarnation churned on same-state Mark: before=%d after=%d (want stable)",
240+
before, after)
241+
}
242+
}
243+
244+
// TestMembership_Mark_SameStateIsFullNoOp pins the rest of the
245+
// no-op-Mark contract beyond just incarnation: when state matches
246+
// the current value, the membership version vector must NOT
247+
// advance and registered observers must NOT fire. Without this
248+
// rule, every successful heartbeat probe bumps the version
249+
// counter once per peer per interval — a 5-node cluster running
250+
// for a few hours showed MembershipVersion past 4,800 even though
251+
// no nodes had actually changed state. Cascading effects: gossip
252+
// fans out spurious "version went up" deltas, SSE consumers see
253+
// constant "members" event spam, and the metric stops being
254+
// useful as a real-membership-change indicator.
255+
func TestMembership_Mark_SameStateIsFullNoOp(t *testing.T) {
256+
t.Parallel()
257+
258+
m := NewMembership(NewRing())
259+
m.Upsert(NewNode("n1", "127.0.0.1:7946"))
260+
261+
var fired atomic.Int32
262+
263+
m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) {
264+
fired.Add(1)
265+
})
266+
267+
// Capture the version baseline after the Upsert above.
268+
versionBefore := m.Version()
269+
270+
// Pound on Mark with the existing state. Models the steady
271+
// "probe succeeded again" pattern that drove the bug.
272+
for range 100 {
273+
m.Mark("n1", NodeAlive)
274+
}
275+
276+
if got := m.Version(); got != versionBefore {
277+
t.Errorf("Version drifted on same-state Mark: before=%d after=%d (want stable)",
278+
versionBefore, got)
279+
}
280+
281+
if got := fired.Load(); got != 0 {
282+
t.Errorf("observer fired %d times for same-state Mark, want 0", got)
283+
}
284+
285+
// Sanity: LastSeen still refreshes so the suspect-timeout
286+
// machinery sees probes. We can't assert exact wall-clock
287+
// values cleanly here, but we can assert it advanced past the
288+
// Upsert moment by being non-zero.
289+
if m.List()[0].LastSeen.IsZero() {
290+
t.Errorf("LastSeen wasn't refreshed by same-state Mark")
291+
}
292+
}
293+
294+
// TestMembership_Mark_BumpsOnTransition guards the other side of the
295+
// contract: a genuine state transition (Alive→Suspect, Suspect→Alive,
296+
// etc.) MUST bump incarnation so the gossip-merge rule
297+
// "higher incarnation wins" propagates the change cluster-wide. If we
298+
// over-suppress, transitions would silently fail to propagate and a
299+
// peer briefly marked Suspect would stay Suspect on neighbouring
300+
// nodes forever.
301+
func TestMembership_Mark_BumpsOnTransition(t *testing.T) {
302+
t.Parallel()
303+
304+
m := NewMembership(NewRing())
305+
m.Upsert(NewNode("n1", "127.0.0.1:7946"))
306+
307+
v0 := m.List()[0].Incarnation
308+
309+
m.Mark("n1", NodeSuspect)
310+
311+
v1 := m.List()[0].Incarnation
312+
if v1 != v0+1 {
313+
t.Errorf("Alive→Suspect: got incarnation %d, want %d", v1, v0+1)
314+
}
315+
316+
m.Mark("n1", NodeAlive)
317+
318+
v2 := m.List()[0].Incarnation
319+
if v2 != v1+1 {
320+
t.Errorf("Suspect→Alive: got incarnation %d, want %d", v2, v1+1)
321+
}
322+
323+
// Same-state again — must NOT bump even after recent transitions.
324+
m.Mark("n1", NodeAlive)
325+
m.Mark("n1", NodeAlive)
326+
327+
v3 := m.List()[0].Incarnation
328+
if v3 != v2 {
329+
t.Errorf("Alive→Alive after a transition burst: got %d, want stable at %d", v3, v2)
330+
}
331+
}
332+
333+
// TestMembership_Refute_AlwaysBumps pins the SWIM self-refute
334+
// primitive: Refute() unconditionally increments incarnation, even
335+
// when the local view of the node is already Alive. Without this,
336+
// the refutation packet a node sends back to a peer that suspected
337+
// it would carry the SAME incarnation as the suspect claim — and
338+
// "higher incarnation wins" would refuse to overwrite, so the
339+
// suspect claim would stick even though the node refuted it.
340+
func TestMembership_Refute_AlwaysBumps(t *testing.T) {
341+
t.Parallel()
342+
343+
m := NewMembership(NewRing())
344+
m.Upsert(NewNode("self", "127.0.0.1:7946"))
345+
346+
v0 := m.List()[0].Incarnation
347+
348+
// Local state is Alive — a transition-only rule would no-op
349+
// here, which would silently break refutation propagation.
350+
m.Refute("self")
351+
352+
v1 := m.List()[0].Incarnation
353+
if v1 != v0+1 {
354+
t.Errorf("first Refute: got incarnation %d, want %d", v1, v0+1)
355+
}
356+
357+
// Each subsequent refute climbs one more — chained suspect
358+
// claims from different peers must each be answerable with a
359+
// strictly-higher incarnation.
360+
m.Refute("self")
361+
m.Refute("self")
362+
363+
v2 := m.List()[0].Incarnation
364+
if v2 != v1+2 {
365+
t.Errorf("chained Refute: got incarnation %d, want %d", v2, v1+2)
366+
}
367+
368+
// State must end Alive regardless of intermediate values.
369+
if got := m.List()[0].State; got != NodeAlive {
370+
t.Errorf("Refute state: got %v, want NodeAlive", got)
371+
}
372+
}
373+
374+
// TestMembership_Refute_GhostReturnsFalse mirrors the
375+
// non-existent-node guard already present on Mark/Remove.
376+
func TestMembership_Refute_GhostReturnsFalse(t *testing.T) {
377+
t.Parallel()
378+
379+
m := NewMembership(NewRing())
380+
381+
var fired atomic.Int32
382+
383+
m.OnStateChange(func(_ NodeID, _ NodeState, _ uint64) {
384+
fired.Add(1)
385+
})
386+
387+
if m.Refute("ghost") {
388+
t.Fatal("Refute on non-existent node returned true")
389+
}
390+
391+
if got := fired.Load(); got != 0 {
392+
t.Errorf("observer fired %d times for ghost Refute, want 0", got)
393+
}
394+
}

0 commit comments

Comments
 (0)