Skip to content

Commit 498ec63

Browse files
committed
fix(puller): back off after non-fatal sync errors
When syncer.Sync returned a non-fatal error (connection reset, protocol error, stream timeout), the goroutine fell through to limiter.WaitN with count=0 and looped immediately. Any persistent non-fatal error caused a tight CPU spin until the peer disconnected or context was cancelled. Add syncRetryBackoff (1s) with a ctx.Done()-escape after any non-fatal sync error before the next retry. This bounds the retry rate to <=1/s per goroutine under persistent errors. Covered by TestSyncErrorBackoff (synctest).
1 parent baecac8 commit 498ec63

3 files changed

Lines changed: 62 additions & 1 deletion

File tree

pkg/puller/export_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ package puller
66

77
import "github.com/ethersphere/bee/v2/pkg/swarm"
88

9-
var PeerIntervalKey = peerIntervalKey
9+
var (
10+
PeerIntervalKey = peerIntervalKey
11+
SyncRetryBackoff = syncRetryBackoff
12+
)
1013

1114
func (p *Puller) IsSyncing(addr swarm.Address) bool {
1215
p.syncPeersMtx.Lock()

pkg/puller/puller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ const (
7575
maxChunksPerSecond = 1000 // roughly 4 MB/s
7676

7777
maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing.
78+
79+
syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error
7880
)
7981

8082
type Options struct {
@@ -384,6 +386,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
384386
}
385387
errCount := countErrors(err)
386388
p.logger.Debug("syncWorker interval failed", "error_count", errCount, "example_error", errors.Unwrap(err), "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
389+
select {
390+
case <-time.After(syncRetryBackoff):
391+
case <-ctx.Done():
392+
return
393+
}
387394
}
388395

389396
_ = p.limiter.WaitN(ctx, count)

pkg/puller/puller_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,57 @@ func TestContinueSyncing(t *testing.T) {
508508
}
509509
}
510510

511+
// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a
512+
// backoff before the next retry. Under synctest the worker blocks after one
513+
// failed call; without the backoff it would spin and consume both replies, so
514+
// the "exactly one call" check fails without the fix.
515+
func TestSyncErrorBackoff(t *testing.T) {
516+
t.Parallel()
517+
518+
addr := swarm.RandAddress(t)
519+
520+
synctest.Test(t, func(t *testing.T) {
521+
// Topmost=0 keeps top < start so the interval never advances and the loop
522+
// retries with the same start value. Two replies cover the failed retry.
523+
ps := mockps.NewPullSync(
524+
mockps.WithCursors([]uint64{100}, 0),
525+
mockps.WithSyncError(errors.New("stream error")),
526+
mockps.WithReplies(
527+
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
528+
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
529+
),
530+
)
531+
kad := kadMock.NewMockKademlia(
532+
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}),
533+
)
534+
535+
ctx, cancel := context.WithCancel(context.Background())
536+
defer cancel()
537+
538+
p := puller.New(swarm.RandAddress(t), mock.NewStateStore(), kad, resMock.NewReserve(resMock.WithRadius(0)), ps, nil, log.Noop, puller.Options{Bins: 1})
539+
p.Start(ctx)
540+
541+
kad.Trigger()
542+
543+
// One failed call, then the worker blocks in the backoff.
544+
synctest.Wait()
545+
if got := len(ps.SyncCalls(addr)); got != 1 {
546+
t.Fatalf("expected worker to back off after 1 failed call, got %d calls", got)
547+
}
548+
549+
// Sleeping out the backoff releases exactly one retry.
550+
time.Sleep(puller.SyncRetryBackoff)
551+
synctest.Wait()
552+
if got := len(ps.SyncCalls(addr)); got != 2 {
553+
t.Fatalf("expected exactly one retry after the backoff, got %d calls", got)
554+
}
555+
556+
if err := p.Close(); err != nil {
557+
t.Errorf("close puller: %v", err)
558+
}
559+
})
560+
}
561+
511562
// TestRadiusDecreaseRecalcPeersLiveness verifies that after a radius decrease a
512563
// new sync worker starts without the manager blocking while the disconnected
513564
// peer's in-flight worker drains. Under synctest, a blocking disconnect holds

0 commit comments

Comments
 (0)