-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdist_chaos_test.go
More file actions
281 lines (217 loc) · 7.72 KB
/
dist_chaos_test.go
File metadata and controls
281 lines (217 loc) · 7.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package backend
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)
// recordingTransport counts ForwardSet invocations so chaos tests
// can assert "the inner transport was reached" or "the inner
// transport was NOT reached" depending on whether the chaos drop
// fired. The other DistTransport methods are stubs — chaos
// applies the same shape to every method, so testing one verb is
// enough.
type recordingTransport struct {
calls atomic.Int64
}
func (r *recordingTransport) ForwardSet(_ context.Context, _ string, _ *cache.Item, _ bool) error {
r.calls.Add(1)
return nil
}
func (*recordingTransport) ForwardGet(_ context.Context, _, _ string) (*cache.Item, bool, error) {
return nil, false, nil
}
func (*recordingTransport) ForwardRemove(_ context.Context, _, _ string, _ bool) error {
return nil
}
func (*recordingTransport) Health(_ context.Context, _ string) error { return nil }
func (*recordingTransport) IndirectHealth(_ context.Context, _, _ string) error {
return nil
}
func (*recordingTransport) Gossip(_ context.Context, _ string, _ []GossipMember) error {
return nil
}
func (*recordingTransport) FetchMerkle(_ context.Context, _ string) (*MerkleTree, error) {
return nil, nil //nolint:nilnil // stub never invoked by chaos unit tests
}
func (*recordingTransport) ListKeys(_ context.Context, _, _ string) ([]string, error) {
return nil, nil
}
// TestChaos_DropRateOneAlwaysDrops pins that DropRate=1.0 short-
// circuits every transport call with ErrChaosDrop. The inner
// transport must NOT be invoked.
func TestChaos_DropRateOneAlwaysDrops(t *testing.T) {
t.Parallel()
chaos := NewChaos()
chaos.SetDropRate(1.0)
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
const calls = 5
for range calls {
err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false)
if !errors.Is(err, ErrChaosDrop) {
t.Fatalf("want ErrChaosDrop, got %v", err)
}
}
if inner.calls.Load() != 0 {
t.Errorf("inner transport reached %d times; want 0", inner.calls.Load())
}
if chaos.Drops() != int64(calls) {
t.Errorf("Drops counter = %d, want %d", chaos.Drops(), calls)
}
}
// TestChaos_DropRateZeroNeverDrops pins the inverse: with chaos
// configured but DropRate=0, every call passes through to the
// inner transport. The chaos overhead path is exercised; the
// drop branch never fires.
func TestChaos_DropRateZeroNeverDrops(t *testing.T) {
t.Parallel()
chaos := NewChaos()
chaos.SetDropRate(0)
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
const calls = 50
for range calls {
err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false)
if err != nil {
t.Fatalf("unexpected err with DropRate=0: %v", err)
}
}
if inner.calls.Load() != int64(calls) {
t.Errorf("inner transport reached %d times; want %d", inner.calls.Load(), calls)
}
if chaos.Drops() != 0 {
t.Errorf("Drops counter = %d, want 0", chaos.Drops())
}
}
// TestChaos_LatencyFiresAndDelaysCall confirms latency injection
// holds the call for the configured duration. The probabilistic
// rate is set to 1.0 so every call gets latency; we measure
// wall-clock and verify it's at least the configured floor.
func TestChaos_LatencyFiresAndDelaysCall(t *testing.T) {
t.Parallel()
chaos := NewChaos()
chaos.SetLatency(20*time.Millisecond, 1.0)
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
start := time.Now()
err := wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
elapsed := time.Since(start)
// Allow a small floor below 20ms in case time.Sleep returns a
// hair early on some schedulers; the key assertion is "latency
// was clearly injected", not "exactly 20ms".
if elapsed < 15*time.Millisecond {
t.Errorf("elapsed = %v, want >= ~20ms (latency should have fired)", elapsed)
}
if chaos.Latencies() != 1 {
t.Errorf("Latencies counter = %d, want 1", chaos.Latencies())
}
}
// TestChaos_NilChaosIsNoop pins the disabled path: passing a nil
// Chaos to newChaosTransport returns the inner transport unwrapped
// so callers that don't configure chaos pay zero overhead.
func TestChaos_NilChaosIsNoop(t *testing.T) {
t.Parallel()
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, nil)
if wrapped != inner {
t.Fatalf("nil chaos should return inner unchanged; got distinct wrapper")
}
}
// TestChaos_DisabledChaosLeavesCallsUntouched pins that a Chaos
// with zero DropRate AND zero LatencyRate is functionally a
// pass-through even when the wrapper is installed.
func TestChaos_DisabledChaosLeavesCallsUntouched(t *testing.T) {
t.Parallel()
chaos := NewChaos() // all zero
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
const calls = 100
for range calls {
_ = wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false)
}
if inner.calls.Load() != int64(calls) {
t.Errorf("inner transport reached %d times; want %d", inner.calls.Load(), calls)
}
if chaos.Drops() != 0 || chaos.Latencies() != 0 {
t.Errorf("disabled chaos should not increment counters: drops=%d latencies=%d",
chaos.Drops(), chaos.Latencies())
}
}
// TestChaos_ConcurrentCallsAreRaceFree drives many goroutines
// through the chaos wrapper simultaneously with both faults
// configured. Run under `-race`; failure manifests as the race
// detector firing, not a content assertion.
func TestChaos_ConcurrentCallsAreRaceFree(t *testing.T) {
t.Parallel()
chaos := NewChaos()
chaos.SetDropRate(0.3)
chaos.SetLatency(1*time.Millisecond, 0.3)
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
const (
goroutines = 8
callsPerGoroutine = 50
)
var wg sync.WaitGroup
for range goroutines {
wg.Go(func() {
for range callsPerGoroutine {
_ = wrapped.ForwardSet(context.Background(), "peer", &cache.Item{Key: "k"}, false)
}
})
}
wg.Wait()
total := chaos.Drops() + inner.calls.Load()
want := int64(goroutines * callsPerGoroutine)
if total != want {
t.Errorf("drops+inner_calls = %d, want %d (drops=%d, inner=%d)",
total, want, chaos.Drops(), inner.calls.Load())
}
}
// TestChaos_SetDropRateClampsRange pins the boundary check: values
// outside [0, 1] get clamped rather than silently producing
// undefined behavior. Tests that pass 1.5 by accident should get
// 100% drop, not garbage.
func TestChaos_SetDropRateClampsRange(t *testing.T) {
t.Parallel()
chaos := NewChaos()
chaos.SetDropRate(-0.5)
// At drop=0 every call passes.
inner := &recordingTransport{}
wrapped := newChaosTransport(inner, chaos)
_ = wrapped.ForwardSet(context.Background(), "p", &cache.Item{Key: "k"}, false)
if chaos.Drops() != 0 {
t.Errorf("DropRate(-0.5) should clamp to 0; drops=%d", chaos.Drops())
}
chaos.SetDropRate(1.5)
_ = wrapped.ForwardSet(context.Background(), "p", &cache.Item{Key: "k"}, false)
if chaos.Drops() != 1 {
t.Errorf("DropRate(1.5) should clamp to 1.0 (always drop); drops=%d", chaos.Drops())
}
}
// TestChaos_NilReceiverIsSafe documents the nil-Chaos contract:
// methods called on a nil *Chaos must not panic. This matters
// because DistMemory.chaos may be nil when chaos isn't configured,
// and the Metrics() snapshot path calls Drops() / Latencies()
// unconditionally.
func TestChaos_NilReceiverIsSafe(t *testing.T) {
t.Parallel()
var c *Chaos
if got := c.Drops(); got != 0 {
t.Errorf("nil.Drops() = %d, want 0", got)
}
if got := c.Latencies(); got != 0 {
t.Errorf("nil.Latencies() = %d, want 0", got)
}
// Mutators on nil are silent no-ops; calling these would
// panic without the nil guard in SetDropRate / SetLatency.
c.SetDropRate(0.5)
c.SetLatency(time.Second, 1.0)
}