-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdist_read_repair_test.go
More file actions
433 lines (343 loc) · 12 KB
/
dist_read_repair_test.go
File metadata and controls
433 lines (343 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
package backend
import (
"context"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/hyp3rd/hypercache/internal/cluster"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)
// captureTransport records every ForwardSet call's (peer, key,
// version) tuple so unit tests can assert "exactly these repairs
// fired". Other DistTransport methods are stubs — the queue only
// dispatches via ForwardSet, so the other verbs aren't exercised.
type captureTransport struct {
mu sync.Mutex
calls []capturedSet
// flushDelay simulates per-call latency so parallelism is
// visible in wall-clock measurements (used by the parallel-flush
// test). Zero disables the delay.
flushDelay time.Duration
}
type capturedSet struct {
peer string
key string
version uint64
}
func (c *captureTransport) ForwardSet(_ context.Context, peer string, item *cache.Item, _ bool) error {
if c.flushDelay > 0 {
time.Sleep(c.flushDelay)
}
c.mu.Lock()
defer c.mu.Unlock()
c.calls = append(c.calls, capturedSet{peer: peer, key: item.Key, version: item.Version})
return nil
}
func (*captureTransport) ForwardGet(_ context.Context, _, _ string) (*cache.Item, bool, error) {
return nil, false, nil
}
func (*captureTransport) ForwardRemove(_ context.Context, _, _ string, _ bool) error {
return nil
}
func (*captureTransport) Health(_ context.Context, _ string) error { return nil }
func (*captureTransport) IndirectHealth(_ context.Context, _, _ string) error { return nil }
func (*captureTransport) Gossip(_ context.Context, _ string, _ []GossipMember) error {
return nil
}
func (*captureTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) {
return nil, nil //nolint:nilnil // stub never invoked by these unit tests
}
func (*captureTransport) ListKeys(_ context.Context, _, _ string) ([]string, error) {
return nil, nil
}
func (c *captureTransport) callCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.calls)
}
// newQueueForTest builds a repairQueue wired to the supplied
// transport. Interval is set high enough (5s) that no tick fires
// during the test body unless the test explicitly flushes — that
// keeps the assertions deterministic.
func newQueueForTest(transport DistTransport, batchSize int) (*repairQueue, *distMetrics) {
metrics := &distMetrics{}
q := newRepairQueue(
5*time.Second, // interval far enough out that tick doesn't fire mid-test
batchSize,
func() DistTransport { return transport },
metrics,
slog.New(slog.DiscardHandler),
)
return q, metrics
}
// TestRepairQueue_CoalesceByPeerKey pins the central contract:
// two enqueues for the same (peer, key) collapse to one entry
// (highest version wins), and ReadRepairCoalesced bumps for
// every collapsed duplicate.
func TestRepairQueue_CoalesceByPeerKey(t *testing.T) {
t.Parallel()
transport := &captureTransport{}
q, metrics := newQueueForTest(transport, 100)
peer := cluster.NodeID("peer-1")
// First enqueue: version 1.
q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"})
// Second enqueue: same key, higher version → replaces, coalesce++.
q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 2, Origin: "A"})
// Third enqueue: same key, LOWER version than current → dropped, coalesce++.
q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"})
// Flush and inspect what actually went on the wire.
q.flushAll(context.Background())
if got := transport.callCount(); got != 1 {
t.Errorf("ForwardSet calls: got %d, want 1 (three enqueues should coalesce to one)", got)
}
if got := metrics.readRepairCoalesced.Load(); got != 2 {
t.Errorf("readRepairCoalesced: got %d, want 2 (two duplicates collapsed)", got)
}
if got := transport.calls[0].version; got != 2 {
t.Errorf("dispatched version: got %d, want 2 (highest of {1,2,1})", got)
}
}
// TestRepairQueue_DistinctPeersAreIndependent pins that the queue
// keys correctly: same key to different peers are NOT coalesced.
func TestRepairQueue_DistinctPeersAreIndependent(t *testing.T) {
t.Parallel()
transport := &captureTransport{}
q, metrics := newQueueForTest(transport, 100)
q.enqueue(context.Background(), cluster.NodeID("peer-A"), &cache.Item{Key: "k1", Version: 1, Origin: "X"})
q.enqueue(context.Background(), cluster.NodeID("peer-B"), &cache.Item{Key: "k1", Version: 1, Origin: "X"})
q.flushAll(context.Background())
if got := transport.callCount(); got != 2 {
t.Errorf("ForwardSet calls: got %d, want 2 (different peers, no coalesce)", got)
}
if got := metrics.readRepairCoalesced.Load(); got != 0 {
t.Errorf("readRepairCoalesced: got %d, want 0", got)
}
}
// TestRepairQueue_FlushPeerRunsParallel pins the per-peer
// parallelism contract. Each peer's flush dispatches its entries
// in parallel via errgroup. We measure wall-clock against a stub
// transport that sleeps per-call; sequential dispatch would take
// N×delay, parallel takes ~delay.
func TestRepairQueue_FlushPeerRunsParallel(t *testing.T) {
t.Parallel()
const (
entries = 6
perCallLatency = 50 * time.Millisecond
)
transport := &captureTransport{flushDelay: perCallLatency}
q, _ := newQueueForTest(transport, 100)
peer := cluster.NodeID("peer-1")
for i := range entries {
q.enqueue(context.Background(), peer, &cache.Item{Key: keyfmt(i), Version: 1, Origin: "A"})
}
start := time.Now()
q.flushPeer(context.Background(), peer)
elapsed := time.Since(start)
// Sequential would take ≥ entries × delay = 300ms.
// Parallel via errgroup completes in ~delay (plus scheduling slack).
maxParallel := 3 * perCallLatency // generous ceiling for CI scheduler noise
if elapsed > maxParallel {
t.Errorf("flushPeer wall-clock = %v, want < %v (per-peer dispatch should be parallel)",
elapsed, maxParallel)
}
if got := transport.callCount(); got != entries {
t.Errorf("dispatched: got %d, want %d", got, entries)
}
}
// TestRepairQueue_NilTransportIsNoop documents the defensive path:
// if the transport closure returns nil mid-flush (e.g. Stop
// landed between two flushes), the queue drops the entries without
// panicking. Merkle anti-entropy is the safety net for any repair
// that doesn't make it to the wire.
func TestRepairQueue_NilTransportIsNoop(t *testing.T) {
t.Parallel()
metrics := &distMetrics{}
q := newRepairQueue(
5*time.Second, 100,
func() DistTransport { return nil },
metrics,
slog.New(slog.DiscardHandler),
)
q.enqueue(context.Background(), cluster.NodeID("peer-1"), &cache.Item{Key: "k1", Version: 1})
// Should not panic.
q.flushAll(context.Background())
// Queue is drained even when transport is nil — the entries
// are dropped, not retained, because the queue can't know
// when transport will return non-nil again.
q.mu.Lock()
remaining := len(q.entries)
q.mu.Unlock()
if remaining != 0 {
t.Errorf("entries remaining after nil-transport flush: got %d, want 0", remaining)
}
}
// TestRepairQueue_StopDrainsPending pins the clean-shutdown
// contract: stop() doesn't return until pending entries have been
// flushed. The flusher goroutine's final flushAll runs on the
// stopCh path.
func TestRepairQueue_StopDrainsPending(t *testing.T) {
t.Parallel()
transport := &captureTransport{}
metrics := &distMetrics{}
q := newRepairQueue(
10*time.Second, // long interval — only stop() drives the final flush
100,
func() DistTransport { return transport },
metrics,
slog.New(slog.DiscardHandler),
)
q.start(context.Background())
q.enqueue(context.Background(), cluster.NodeID("peer-1"), &cache.Item{Key: "k1", Version: 1, Origin: "A"})
q.enqueue(context.Background(), cluster.NodeID("peer-2"), &cache.Item{Key: "k2", Version: 1, Origin: "A"})
q.stop()
if got := transport.callCount(); got != 2 {
t.Errorf("ForwardSet calls after stop: got %d, want 2 (stop must drain)", got)
}
}
// TestRepairQueue_SizeThresholdFlush pins that hitting
// maxBatchSize triggers an inline flush of that peer's entries
// without waiting for the interval tick.
func TestRepairQueue_SizeThresholdFlush(t *testing.T) {
t.Parallel()
transport := &captureTransport{}
q, _ := newQueueForTest(transport, 3) // tiny batch — easy to trip
peer := cluster.NodeID("peer-1")
q.enqueue(context.Background(), peer, &cache.Item{Key: "k1", Version: 1, Origin: "A"})
q.enqueue(context.Background(), peer, &cache.Item{Key: "k2", Version: 1, Origin: "A"})
if transport.callCount() != 0 {
t.Fatalf("flush fired before threshold: %d calls", transport.callCount())
}
// Third enqueue trips threshold (entries map hits maxBatchSize=3).
// The flush runs in a background goroutine; poll for completion.
q.enqueue(context.Background(), peer, &cache.Item{Key: "k3", Version: 1, Origin: "A"})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if transport.callCount() == 3 {
break
}
time.Sleep(5 * time.Millisecond)
}
if got := transport.callCount(); got != 3 {
t.Errorf("ForwardSet calls: got %d, want 3 (size-threshold should have triggered flush)", got)
}
}
// TestIsHigherVersion pins the chosen-version tie-break rule
// the coalescer uses. Mirrors repairLocalReplica's logic in
// dist_memory.go so the local and queued paths agree on which
// item wins.
func TestIsHigherVersion(t *testing.T) {
t.Parallel()
tests := []struct {
name string
candidate cache.Item
existing cache.Item
want bool
}{
{
"higher version wins",
cache.Item{Version: 2, Origin: "A"},
cache.Item{Version: 1, Origin: "A"},
true,
},
{
"lower version loses",
cache.Item{Version: 1, Origin: "A"},
cache.Item{Version: 2, Origin: "A"},
false,
},
{
"same version, lower origin wins",
cache.Item{Version: 1, Origin: "A"},
cache.Item{Version: 1, Origin: "B"},
true,
},
{
"same version, higher origin loses",
cache.Item{Version: 1, Origin: "B"},
cache.Item{Version: 1, Origin: "A"},
false,
},
{
"same version, same origin loses (no replace)",
cache.Item{Version: 1, Origin: "A"},
cache.Item{Version: 1, Origin: "A"},
false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
candidate := tc.candidate
existing := tc.existing
if got := isHigherVersion(&candidate, &existing); got != tc.want {
t.Errorf("isHigherVersion(%+v, %+v) = %v, want %v",
candidate, existing, got, tc.want)
}
})
}
}
// TestRepairQueue_ConcurrentEnqueueIsRaceFree drives many
// goroutines through enqueue + concurrent flushes; race detector
// catches any unsynchronized access. Failure manifests as a -race
// trip, not a content assertion.
func TestRepairQueue_ConcurrentEnqueueIsRaceFree(t *testing.T) {
t.Parallel()
transport := &captureTransport{}
q, _ := newQueueForTest(transport, 1000)
const (
goroutines = 8
enqueuesPerWorker = 50
)
var wg sync.WaitGroup
for g := range goroutines {
gid := g
wg.Go(func() {
peer := cluster.NodeID("peer-" + keyfmt(gid%3))
for i := range enqueuesPerWorker {
q.enqueue(context.Background(), peer, &cache.Item{
Key: keyfmt(i),
Version: uint64(i + 1),
Origin: "A",
})
}
})
}
wg.Wait()
q.flushAll(context.Background())
// Sanity: at least one dispatch happened.
if transport.callCount() == 0 {
t.Errorf("no ForwardSet calls after concurrent enqueue (race or fixture bug)")
}
}
// keyfmt is a tiny formatter helper so the tests aren't full of
// strconv.Itoa noise. Public-shaped so it's reusable from sibling
// tests in this package.
func keyfmt(i int) string {
return "k" + itoa(i)
}
func itoa(i int) string {
if i == 0 {
return "0"
}
var buf [20]byte
pos := len(buf)
for i > 0 {
pos--
buf[pos] = byte('0' + i%10)
i /= 10
}
return string(buf[pos:])
}
// Compile-time guard: captureTransport must satisfy DistTransport.
var _ DistTransport = (*captureTransport)(nil)
// Compile-time check on metrics shape — if the field is removed
// the test file fails to compile, surfacing the breakage at
// `go build` rather than a runtime nil deref.
var _ = func() {
var m distMetrics
_ = m.readRepairBatched.Load()
_ = m.readRepairCoalesced.Load()
_ = atomic.Int64{}
}