Skip to content

Commit 2f1dade

Browse files
authored
test: fix flaky reserve worker startup (#5384)
1 parent ece6db4 commit 2f1dade

5 files changed

Lines changed: 111 additions & 24 deletions

File tree

pkg/node/node.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1131,7 +1131,8 @@ func NewBee(
11311131
pullerService = puller.New(swarmAddress, stateStore, kad, localStore, pullSyncProtocol, p2ps, logger, puller.Options{})
11321132
b.pullerCloser = pullerService
11331133

1134-
localStore.StartReserveWorker(ctx, pullerService, waitNetworkRFunc)
1134+
// we pass an empty channel since startup synchronization is not needed for production code, only tests.
1135+
localStore.StartReserveWorker(ctx, pullerService, waitNetworkRFunc, nil)
11351136
nodeStatus.SetSync(pullerService)
11361137

11371138
// measure full sync duration

pkg/storer/compact_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ func TestCompact(t *testing.T) {
3535
if err != nil {
3636
t.Fatal(err)
3737
}
38-
st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
38+
readyC := make(chan struct{})
39+
st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
40+
<-readyC
3941

4042
batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()}
4143
evictBatch := batches[1]
@@ -134,7 +136,9 @@ func TestCompactNoEvictions(t *testing.T) {
134136
if err != nil {
135137
t.Fatal(err)
136138
}
137-
st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
139+
readyC := make(chan struct{})
140+
st.StartReserveWorker(ctx, pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
141+
<-readyC
138142

139143
batches := []*postage.Batch{postagetesting.MustNewBatch(), postagetesting.MustNewBatch(), postagetesting.MustNewBatch()}
140144

pkg/storer/reserve.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func threshold(capacity int) int { return capacity * 5 / 10 }
4747
func (db *DB) startReserveWorkers(
4848
ctx context.Context,
4949
radius func() (uint8, error),
50+
ready chan<- struct{},
5051
) {
5152
ctx, cancel := context.WithCancel(ctx)
5253
go func() {
@@ -55,7 +56,7 @@ func (db *DB) startReserveWorkers(
5556
}()
5657

5758
db.inFlight.Add(1)
58-
go db.reserveWorker(ctx)
59+
go db.reserveWorker(ctx, ready)
5960

6061
sub, unsubscribe := db.reserveOptions.startupStabilizer.Subscribe()
6162
defer unsubscribe()
@@ -117,7 +118,7 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) {
117118
return count, err
118119
}
119120

120-
func (db *DB) reserveWorker(ctx context.Context) {
121+
func (db *DB) reserveWorker(ctx context.Context, ready chan<- struct{}) {
121122
defer db.inFlight.Done()
122123

123124
batchExpiryTrigger, batchExpiryUnsub := db.events.Subscribe(batchExpiry)
@@ -135,6 +136,10 @@ func (db *DB) reserveWorker(ctx context.Context) {
135136
db.events.Trigger(reserveOverCapacity)
136137
}
137138

139+
if ready != nil {
140+
close(ready)
141+
}
142+
138143
for {
139144
select {
140145
case <-ctx.Done():
@@ -467,7 +472,6 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
467472
errC := make(chan error, 1)
468473

469474
db.inFlight.Go(func() {
470-
471475
trigger, unsub := db.reserveBinEvents.Subscribe(string(bin))
472476
defer unsub()
473477
defer close(out)

pkg/storer/reserve_test.go

Lines changed: 92 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,13 @@ func TestIndexCollision(t *testing.T) {
7575
if err != nil {
7676
t.Fatal(err)
7777
}
78-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
78+
readyC := make(chan struct{})
79+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
80+
select {
81+
case <-readyC:
82+
case <-t.Context().Done():
83+
t.Fatal("start reserve worker timeout")
84+
}
7985
testF(t, baseAddr, storer)
8086
})
8187
t.Run("mem", func(t *testing.T) {
@@ -85,7 +91,13 @@ func TestIndexCollision(t *testing.T) {
8591
if err != nil {
8692
t.Fatal(err)
8793
}
88-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
94+
readyC := make(chan struct{})
95+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
96+
select {
97+
case <-readyC:
98+
case <-t.Context().Done():
99+
t.Fatal("start reserve worker timeout")
100+
}
89101
testF(t, baseAddr, storer)
90102
})
91103
}
@@ -163,7 +175,13 @@ func TestReplaceOldIndex(t *testing.T) {
163175
if err != nil {
164176
t.Fatal(err)
165177
}
166-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
178+
readyC := make(chan struct{})
179+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
180+
select {
181+
case <-readyC:
182+
case <-t.Context().Done():
183+
t.Fatal("start reserve worker timeout")
184+
}
167185
testF(t, baseAddr, storer)
168186
})
169187
t.Run("mem", func(t *testing.T) {
@@ -173,7 +191,13 @@ func TestReplaceOldIndex(t *testing.T) {
173191
if err != nil {
174192
t.Fatal(err)
175193
}
176-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
194+
readyC := make(chan struct{})
195+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
196+
select {
197+
case <-readyC:
198+
case <-t.Context().Done():
199+
t.Fatal("start reserve worker timeout")
200+
}
177201
testF(t, baseAddr, storer)
178202
})
179203
}
@@ -187,8 +211,13 @@ func TestEvictBatch(t *testing.T) {
187211
if err != nil {
188212
t.Fatal(err)
189213
}
190-
st.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
191-
214+
readyC := make(chan struct{})
215+
st.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
216+
select {
217+
case <-readyC:
218+
case <-t.Context().Done():
219+
t.Fatal("start reserve worker timeout")
220+
}
192221
ctx := context.Background()
193222

194223
var chunks []swarm.Chunk
@@ -274,7 +303,7 @@ func TestUnreserveCap(t *testing.T) {
274303
testF := func(t *testing.T, baseAddr swarm.Address, bs *batchstore.BatchStore, storer *storer.DB) {
275304
t.Helper()
276305

277-
var chunksPO = make([][]swarm.Chunk, 5)
306+
chunksPO := make([][]swarm.Chunk, 5)
278307
var chunksPerPO uint64 = 10
279308

280309
batch := postagetesting.MustNewBatch()
@@ -348,7 +377,13 @@ func TestUnreserveCap(t *testing.T) {
348377
if err != nil {
349378
t.Fatal(err)
350379
}
351-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
380+
readyC := make(chan struct{})
381+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
382+
select {
383+
case <-readyC:
384+
case <-t.Context().Done():
385+
t.Fatal("start reserve worker timeout")
386+
}
352387
testF(t, baseAddr, bs, storer)
353388
})
354389
t.Run("mem", func(t *testing.T) {
@@ -359,7 +394,13 @@ func TestUnreserveCap(t *testing.T) {
359394
if err != nil {
360395
t.Fatal(err)
361396
}
362-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0))
397+
readyC := make(chan struct{})
398+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(0), readyC)
399+
select {
400+
case <-readyC:
401+
case <-t.Context().Done():
402+
t.Fatal("start reserve worker timeout")
403+
}
363404
testF(t, baseAddr, bs, storer)
364405
})
365406
}
@@ -374,7 +415,13 @@ func TestNetworkRadius(t *testing.T) {
374415
if err != nil {
375416
t.Fatal(err)
376417
}
377-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(1))
418+
readyC := make(chan struct{})
419+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(1), readyC)
420+
select {
421+
case <-readyC:
422+
case <-t.Context().Done():
423+
t.Fatal("start reserve worker timeout")
424+
}
378425
time.Sleep(time.Second)
379426
if want, got := uint8(1), storer.StorageRadius(); want != got {
380427
t.Fatalf("want radius %d, got radius %d", want, got)
@@ -387,7 +434,13 @@ func TestNetworkRadius(t *testing.T) {
387434
if err != nil {
388435
t.Fatal(err)
389436
}
390-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(1))
437+
readyC := make(chan struct{})
438+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(1), readyC)
439+
select {
440+
case <-readyC:
441+
case <-t.Context().Done():
442+
t.Fatal("start reserve worker timeout")
443+
}
391444
time.Sleep(time.Second)
392445
if want, got := uint8(1), storer.StorageRadius(); want != got {
393446
t.Fatalf("want radius %d, got radius %d", want, got)
@@ -428,8 +481,13 @@ func TestRadiusManager(t *testing.T) {
428481
if err != nil {
429482
t.Fatal(err)
430483
}
431-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(3))
432-
484+
readyC := make(chan struct{})
485+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(3), readyC)
486+
select {
487+
case <-readyC:
488+
case <-t.Context().Done():
489+
t.Fatal("start reserve worker timeout")
490+
}
433491
batch := postagetesting.MustNewBatch()
434492
err = bs.Save(batch)
435493
if err != nil {
@@ -464,7 +522,13 @@ func TestRadiusManager(t *testing.T) {
464522
if err != nil {
465523
t.Fatal(err)
466524
}
467-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(1), networkRadiusFunc(3))
525+
readyC := make(chan struct{})
526+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(1), networkRadiusFunc(3), readyC)
527+
select {
528+
case <-readyC:
529+
case <-t.Context().Done():
530+
t.Fatal("start reserve worker timeout")
531+
}
468532
waitForRadius(t, storer.Reserve(), 3)
469533
})
470534
}
@@ -749,7 +813,13 @@ func TestNeighborhoodStats(t *testing.T) {
749813
if err != nil {
750814
t.Fatal(err)
751815
}
752-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth))
816+
readyC := make(chan struct{})
817+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth), readyC)
818+
select {
819+
case <-readyC:
820+
case <-t.Context().Done():
821+
t.Fatal("start reserve worker timeout")
822+
}
753823
err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == responsibiliyDepth })
754824
if err != nil {
755825
t.Fatal(err)
@@ -764,7 +834,13 @@ func TestNeighborhoodStats(t *testing.T) {
764834
if err != nil {
765835
t.Fatal(err)
766836
}
767-
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth))
837+
readyC := make(chan struct{})
838+
storer.StartReserveWorker(context.Background(), pullerMock.NewMockRateReporter(0), networkRadiusFunc(responsibiliyDepth), readyC)
839+
select {
840+
case <-readyC:
841+
case <-t.Context().Done():
842+
t.Fatal("start reserve worker timeout")
843+
}
768844
err = spinlock.Wait(time.Minute, func() bool { return storer.StorageRadius() == responsibiliyDepth })
769845
if err != nil {
770846
t.Fatal(err)

pkg/storer/storer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -674,10 +674,12 @@ func (db *DB) SetRetrievalService(r retrieval.Interface) {
674674
db.retrieval = r
675675
}
676676

677-
func (db *DB) StartReserveWorker(ctx context.Context, s Syncer, radius func() (uint8, error)) {
677+
// StartReserveWorker starts the reserve worker. It takes an optional ready channel that is closed whenever the reserve
678+
// worker has finished starting, as this synchronization is needed for some tests. The channel is not used for writing anywhere.
679+
func (db *DB) StartReserveWorker(ctx context.Context, s Syncer, radius func() (uint8, error), ready chan<- struct{}) {
678680
db.setSyncerOnce.Do(func() {
679681
db.syncer = s
680-
go db.startReserveWorkers(ctx, radius)
682+
go db.startReserveWorkers(ctx, radius, ready)
681683
})
682684
}
683685

0 commit comments

Comments
 (0)