Skip to content

Commit 606e824

Browse files
authored
fix(dist): plug Remove silent-swallow and hint-replay abandon on transport errors (#132)
2 parents b64c273 + d344c2e commit 606e824

5 files changed

Lines changed: 240 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ All notable changes to HyperCache are recorded here. The format follows
286286

287287
### Fixed
288288

289+
- **Remove path no longer silently succeeds when the primary is unreachable.**
290+
Symmetric audit-fix to the Set-forward change: [`removeImpl`](pkg/backend/dist_memory.go) used to
291+
swallow the `ForwardRemove` error with `_ = transport.ForwardRemove(...)` and return `nil`, so a
292+
Remove against a downed primary "succeeded" while the stale value lingered on every owner. Promotion
293+
is now extracted into `forwardOrPromoteRemove`, mirroring `handleForwardPrimary`'s contract: on any
294+
non-nil error, if the local node is a replica owner, apply the remove locally + fan out to peer
295+
replicas via the existing `applyRemove(replicate=true)` path; otherwise return the error. The
296+
promotion path bumps the shared `dist.write.forward_promotion` counter, so operators see Set + Remove
297+
promotions on the same observable instrument. The dead primary catches up via merkle anti-entropy on
298+
restart — the same convergence mechanism that already handles replica-side tombstones in
299+
`replicateRemoveWithSpan`. New test [`TestDistRemove_PromotesOnGenericForwardError`](tests/hypercache_distmemory_audit_fixes_test.go)
300+
drives chaos at `DropRate=1.0` and asserts the Remove returns `nil` (promotion succeeded), the local
301+
copy is cleared, and the promotion counter bumped.
302+
- **Hint replay retains the queue on any transient transport error.**
303+
[`processHint`](pkg/backend/dist_memory.go) used to drop the hint unless the in-process
304+
`errors.Is(err, sentinel.ErrBackendNotFound)` matched. Production HTTP/gRPC transports surface
305+
`net.OpError` / `io.EOF` / `context.DeadlineExceeded` for a peer that's mid-restart or briefly
306+
unreachable — none of which matched the gate, so the hint was abandoned on its very first replay
307+
attempt instead of being retained through the outage. The exact failure mode behind the
308+
`recovery on :8083 timed out after 60s: pre=50/50, during=43/50` symptom in the cluster-resilience
309+
workflow: even with the Set-forward promotion in place, the hint queue lost the writes to the dead
310+
primary before it came back. Now any non-nil error retains the hint; the configured `WithDistHintTTL`
311+
bounds total retry time, so a permanently-broken target still drains. The deprecated `HintedDropped`
312+
/ `MigrationHintDropped` OTel counters remain registered for stability but now only bump on
313+
queue-capacity overflow, not replay errors. New test
314+
[`TestDistHintReplay_RetainsOnGenericReplayError`](tests/hypercache_distmemory_audit_fixes_test.go)
315+
forces a 150ms window of failed replays under chaos, heals chaos, and asserts the hint still replays
316+
onto the recovered peer.
289317
- **Set-forward promotion no longer requires the in-process `ErrBackendNotFound` sentinel, and the dead
290318
primary now converges via the hint queue (not just the next merkle tick).**
291319
[`handleForwardPrimary`](pkg/backend/dist_memory.go) used to gate "primary unreachable → promote to

cspell.config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ words:
5353
- benchstat
5454
- benchtime
5555
- bitnami
56+
- blackholed
5657
- bodyclose
5758
- bufbuild
5859
- buildx

pkg/backend/dist_memory.go

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,13 +1255,13 @@ type distMetrics struct {
12551255
hintedQueued atomic.Int64 // hints queued (both sources)
12561256
hintedReplayed atomic.Int64 // hints successfully replayed
12571257
hintedExpired atomic.Int64 // hints expired before delivery
1258-
hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors
1258+
hintedDropped atomic.Int64 // Deprecated: no longer bumped — processHint retains on any error; OTel-registered for stability
12591259
hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes)
12601260
hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort)
12611261
migrationHintQueued atomic.Int64 // subset of hintedQueued: rebalance migration source only
12621262
migrationHintReplayed atomic.Int64 // subset of hintedReplayed: migration-source hints that drained
12631263
migrationHintExpired atomic.Int64 // subset of hintedExpired: migration-source hints aged out
1264-
migrationHintDropped atomic.Int64 // subset of hintedDropped + hintedGlobalDropped: migration-source hints that died
1264+
migrationHintDropped atomic.Int64 // migration-source hints dropped by global cap overflow (replay errors no longer bump this)
12651265
migrationHintLastAgeNanos atomic.Int64 // queue residency of the most-recently-replayed migration hint (ns)
12661266
merkleSyncs atomic.Int64 // merkle sync operations completed
12671267
merkleKeysPulled atomic.Int64 // keys applied during sync
@@ -3140,24 +3140,23 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
31403140
return 1
31413141
}
31423142

3143-
if errors.Is(err, sentinel.ErrBackendNotFound) { // keep – backend still absent
3144-
return 0
3145-
}
3146-
3147-
dm.metrics.hintedDropped.Add(1)
3148-
3149-
if entry.source == hintSourceMigration {
3150-
dm.metrics.migrationHintDropped.Add(1)
3151-
}
3152-
3153-
dm.logger.Warn(
3154-
"hint dropped after replay error",
3143+
// Retain on any non-nil error. Pre-fix this dropped the hint
3144+
// unless the in-process `ErrBackendNotFound` sentinel matched,
3145+
// but production HTTP/gRPC transports surface `net.OpError`,
3146+
// `io.EOF`, or `context.DeadlineExceeded` for a briefly-
3147+
// unreachable peer (mid-restart, network blip), causing the
3148+
// hint to be abandoned on its very first replay attempt
3149+
// instead of being retained through the outage. The hint TTL
3150+
// bounds total retry time, so a permanently-broken target
3151+
// still drains; flapping targets converge once they stabilize.
3152+
dm.logger.Debug(
3153+
"hint replay attempt failed; retaining for retry",
31553154
slog.String("peer_id", nodeID),
31563155
slog.String("key", entry.item.Key),
31573156
slog.Any("err", err),
31583157
)
31593158

3160-
return 1
3159+
return 0
31613160
}
31623161

31633162
// --- Simple gossip (in-process only) ---.
@@ -3908,14 +3907,52 @@ func (dm *DistMemory) removeImpl(ctx context.Context, keys []string) error {
39083907
return sentinel.ErrNotOwner
39093908
}
39103909

3911-
dm.metrics.forwardRemove.Add(1)
3912-
3913-
_ = transport.ForwardRemove(ctx, string(owners[0]), key, true)
3910+
err := dm.forwardOrPromoteRemove(ctx, transport, key, owners)
3911+
if err != nil {
3912+
return err
3913+
}
39143914
}
39153915

39163916
return nil
39173917
}
39183918

3919+
// forwardOrPromoteRemove forwards a Remove to the listed primary; on
3920+
// any non-nil transport error, promotes to a local replica owner if
3921+
// possible (apply locally + fan out to peer replicas). This mirrors
3922+
// handleForwardPrimary's promotion contract for the write path —
3923+
// pre-fix the Remove path silently swallowed the forward error and
3924+
// returned `nil` to the caller, so a `docker stop` of the primary
3925+
// caused deletes to be lost on every owner. The dead primary catches
3926+
// up via merkle anti-entropy on restart, the same convergence
3927+
// mechanism that already handles replica-side tombstones in
3928+
// `replicateRemoveWithSpan`.
3929+
func (dm *DistMemory) forwardOrPromoteRemove(
3930+
ctx context.Context,
3931+
transport DistTransport,
3932+
key string,
3933+
owners []cluster.NodeID,
3934+
) error {
3935+
dm.metrics.forwardRemove.Add(1)
3936+
3937+
err := transport.ForwardRemove(ctx, string(owners[0]), key, true)
3938+
if err == nil {
3939+
return nil
3940+
}
3941+
3942+
if len(owners) > 1 {
3943+
for _, oid := range owners[1:] {
3944+
if oid == dm.localNode.ID && dm.ownsKeyInternal(key) {
3945+
dm.metrics.writeForwardPromotion.Add(1)
3946+
dm.applyRemove(ctx, key, true)
3947+
3948+
return nil
3949+
}
3950+
}
3951+
}
3952+
3953+
return err
3954+
}
3955+
39193956
// distMetricSpec describes one OTel observable instrument backed by a
39203957
// field on DistMetrics. The kind selects between cumulative-counter and
39213958
// gauge semantics (OTel exporters render them differently); `get` reads

pkg/backend/dist_migration_hint_test.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,17 @@ func TestMigrationHint_ExpiredBumpsPerSourceCounter(t *testing.T) {
213213
}
214214
}
215215

216-
// TestMigrationHint_TransportErrorBumpsDroppedCounter pins the drop
217-
// path: when the transport returns a non-NotFound error (auth failure,
218-
// 5xx, parse error), the hint is removed and the per-source counter
219-
// bumps. ErrBackendNotFound stays "keep" — that path is exercised in
220-
// the next test.
221-
func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) {
216+
// TestMigrationHint_TransportErrorKeepsEntry pins the new
217+
// retain-on-any-error contract: when a hint replay fails with any
218+
// non-nil transport error (e.g. net.OpError, io.EOF, a scripted
219+
// generic error like here), the hint stays queued for the next
220+
// replay tick. Pre-fix this branch dropped the hint and bumped
221+
// hintedDropped / migrationHintDropped, but that only matched the
222+
// in-process ErrBackendNotFound sentinel — production HTTP/gRPC
223+
// transports surfaced a different error class and lost the hint on
224+
// the first replay attempt. TTL bounds total retry time, so a
225+
// permanently-broken target still drains.
226+
func TestMigrationHint_TransportErrorKeepsEntry(t *testing.T) {
222227
t.Parallel()
223228

224229
dm, transport := newMigrationHintTestDM(t)
@@ -234,16 +239,16 @@ func TestMigrationHint_TransportErrorBumpsDroppedCounter(t *testing.T) {
234239
source: hintSourceMigration,
235240
}
236241

237-
if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 1 {
238-
t.Errorf("transport error: want action=1 (remove), got %d", action)
242+
if action := dm.processHint(context.Background(), "peer-B", migEntry, now); action != 0 {
243+
t.Errorf("transport error: want action=0 (keep for retry), got %d", action)
239244
}
240245

241-
if got := dm.metrics.hintedDropped.Load(); got != 1 {
242-
t.Errorf("aggregate hintedDropped: want 1, got %d", got)
246+
if got := dm.metrics.hintedDropped.Load(); got != 0 {
247+
t.Errorf("aggregate hintedDropped on retainable error: want 0 (no longer bumped on replay failures), got %d", got)
243248
}
244249

245-
if got := dm.metrics.migrationHintDropped.Load(); got != 1 {
246-
t.Errorf("migrationHintDropped: want 1, got %d", got)
250+
if got := dm.metrics.migrationHintDropped.Load(); got != 0 {
251+
t.Errorf("migrationHintDropped on retainable error: want 0, got %d", got)
247252
}
248253
}
249254

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/hyp3rd/hypercache/internal/cluster"
9+
"github.com/hyp3rd/hypercache/pkg/backend"
10+
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
11+
)
12+
13+
// TestDistRemove_PromotesOnGenericForwardError pins the resilience
14+
// contract for the Remove path symmetric to the Set path: a
15+
// `ForwardRemove` that fails for any reason (not just the in-process
16+
// `ErrBackendNotFound` sentinel) must promote to a local replica
17+
// owner and apply the remove locally + fan out to peer replicas.
18+
// Pre-fix the error was blackholed via `_ = transport.ForwardRemove(...)`,
19+
// so Remove silently succeeded on a downed primary while leaving the
20+
// stale value on every owner.
21+
func TestDistRemove_PromotesOnGenericForwardError(t *testing.T) {
22+
t.Parallel()
23+
24+
chaos := backend.NewChaos()
25+
26+
dc := SetupInProcessClusterRF(
27+
t, 3, 3,
28+
backend.WithDistChaos(chaos),
29+
backend.WithDistWriteConsistency(backend.ConsistencyOne),
30+
)
31+
32+
a := dc.Nodes[0]
33+
b := dc.Nodes[1]
34+
c := dc.Nodes[2]
35+
36+
desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()}
37+
38+
key, ok := FindOwnerKey(a, "remove-promote-", desired, 5000)
39+
if !ok {
40+
t.Fatalf("could not find key with owner ordering [B, A, C]")
41+
}
42+
43+
// Seed: write the key while chaos is off so it lands on every
44+
// owner via the normal replication path.
45+
err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"})
46+
if err != nil {
47+
t.Fatalf("seed Set: %v", err)
48+
}
49+
50+
if !a.LocalContains(key) {
51+
t.Fatalf("seed: A.LocalContains is false; replication failed")
52+
}
53+
54+
// Now block every forward and call Remove from A. The Remove
55+
// must NOT silently succeed; it must promote and clear the
56+
// local copy. The promotion counter (shared with the Set path
57+
// fix) bumps once.
58+
chaos.SetDropRate(1.0)
59+
60+
rerr := a.Remove(context.Background(), key)
61+
if rerr != nil {
62+
t.Fatalf("Remove: got %v, want nil (promotion should have succeeded)", rerr)
63+
}
64+
65+
if a.LocalContains(key) {
66+
t.Errorf("LocalContains(%q) after promoted Remove: got true, want false (local applyRemove didn't fire)",
67+
key)
68+
}
69+
70+
if got := a.Metrics().WriteForwardPromotion; got == 0 {
71+
t.Errorf("WriteForwardPromotion: got 0, want > 0 (Remove promotion didn't bump the counter)")
72+
}
73+
}
74+
75+
// TestDistHintReplay_RetainsOnGenericReplayError pins the hint-queue
76+
// recovery contract: a replay attempt that fails with anything other
77+
// than the in-process `ErrBackendNotFound` sentinel — including the
78+
// network errors production transports surface — must keep the hint
79+
// queued for a later retry, not abandon it on the first failure.
80+
// Pre-fix the hint was dropped on `net.OpError` / `io.EOF` etc.,
81+
// meaning a peer that was briefly unreachable during replay lost the
82+
// queued writes entirely.
83+
func TestDistHintReplay_RetainsOnGenericReplayError(t *testing.T) {
84+
t.Parallel()
85+
86+
chaos := backend.NewChaos()
87+
88+
dc := SetupInProcessClusterRF(
89+
t, 3, 3,
90+
backend.WithDistChaos(chaos),
91+
backend.WithDistWriteConsistency(backend.ConsistencyOne),
92+
backend.WithDistHintTTL(time.Minute),
93+
backend.WithDistHintReplayInterval(20*time.Millisecond),
94+
)
95+
96+
a := dc.Nodes[0]
97+
b := dc.Nodes[1]
98+
c := dc.Nodes[2]
99+
100+
desired := []cluster.NodeID{b.LocalNodeID(), a.LocalNodeID(), c.LocalNodeID()}
101+
102+
key, ok := FindOwnerKey(a, "hint-retain-", desired, 5000)
103+
if !ok {
104+
t.Fatalf("could not find key with owner ordering [B, A, C]")
105+
}
106+
107+
// Drop every forward, then write. The Set promotes locally and
108+
// queues a hint for the dead primary B; the replay tick fires
109+
// shortly after, also fails (chaos still on), and pre-fix would
110+
// have dropped the hint.
111+
chaos.SetDropRate(1.0)
112+
113+
err := a.Set(context.Background(), &cache.Item{Key: key, Value: "v1"})
114+
if err != nil {
115+
t.Fatalf("Set: %v", err)
116+
}
117+
118+
queued := a.Metrics().HintedQueued
119+
if queued == 0 {
120+
t.Fatalf("HintedQueued: got 0, want > 0 (promotion didn't queue a hint)")
121+
}
122+
123+
// Let the replay loop tick a few times against the still-chaotic
124+
// transport. Pre-fix: the very first tick drops the hint and
125+
// HintedReplayed stays 0 forever even after chaos clears.
126+
time.Sleep(150 * time.Millisecond)
127+
128+
// Now heal chaos. The retained hint replays on the next tick
129+
// and lands on B.
130+
chaos.SetDropRate(0)
131+
132+
if !waitForLocalContains(b, key, 2*time.Second) {
133+
t.Errorf("primary did not receive the write after chaos cleared (hint was dropped on the failed replay attempts instead of being retained)")
134+
}
135+
136+
if got := a.Metrics().HintedReplayed; got == 0 {
137+
t.Errorf("HintedReplayed: got 0, want > 0 (hint was abandoned before chaos cleared)")
138+
}
139+
}

0 commit comments

Comments
 (0)