Skip to content

Commit 47fa3c9

Browse files
committed
fix(puller): back off after non-fatal sync errors
When syncer.Sync returned a non-fatal error (connection reset, protocol error, stream timeout), the goroutine fell through to limiter.WaitN with count=0 and looped immediately. Any persistent non-fatal error caused a tight CPU spin until the peer disconnected or context was cancelled. Add syncRetryBackoff (1s) with a ctx.Done()-escape after any non-fatal sync error before the next retry. This bounds the retry rate to <=1/s per goroutine under persistent errors. Covered by TestSyncErrorBackoff (synctest).
1 parent 184af1e commit 47fa3c9

3 files changed

Lines changed: 60 additions & 1 deletion

File tree

pkg/puller/export_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ package puller
66

77
import "github.com/ethersphere/bee/v2/pkg/swarm"
88

9-
var PeerIntervalKey = peerIntervalKey
9+
var (
10+
PeerIntervalKey = peerIntervalKey
11+
SyncRetryBackoff = syncRetryBackoff
12+
)
1013

1114
func (p *Puller) IsSyncing(addr swarm.Address) bool {
1215
p.syncPeersMtx.Lock()

pkg/puller/puller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ const (
7575
maxChunksPerSecond = 1000 // roughly 4 MB/s
7676

7777
maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing.
78+
79+
syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error
7880
)
7981

8082
type Options struct {
@@ -384,6 +386,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
384386
}
385387
errCount := countErrors(err)
386388
p.logger.Debug("syncWorker interval failed", "error_count", errCount, "example_error", errors.Unwrap(err), "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
389+
select {
390+
case <-time.After(syncRetryBackoff):
391+
case <-ctx.Done():
392+
return
393+
}
387394
}
388395

389396
_ = p.limiter.WaitN(ctx, count)

pkg/puller/puller_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,55 @@ func TestContinueSyncing(t *testing.T) {
508508
}
509509
}
510510

511+
// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a
512+
// backoff before the next retry. Under synctest the worker blocks after one
513+
// failed call; without the backoff it would spin and consume both replies, so
514+
// the "exactly one call" check fails without the fix.
515+
func TestSyncErrorBackoff(t *testing.T) {
516+
addr := swarm.RandAddress(t)
517+
518+
synctest.Test(t, func(t *testing.T) {
519+
// Topmost=0 keeps top < start so the interval never advances and the loop
520+
// retries with the same start value. Two replies cover the failed retry.
521+
ps := mockps.NewPullSync(
522+
mockps.WithCursors([]uint64{100}, 0),
523+
mockps.WithSyncError(errors.New("stream error")),
524+
mockps.WithReplies(
525+
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
526+
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
527+
),
528+
)
529+
kad := kadMock.NewMockKademlia(
530+
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}),
531+
)
532+
533+
ctx, cancel := context.WithCancel(context.Background())
534+
defer cancel()
535+
536+
p := puller.New(swarm.RandAddress(t), mock.NewStateStore(), kad, resMock.NewReserve(resMock.WithRadius(0)), ps, nil, log.Noop, puller.Options{Bins: 1})
537+
p.Start(ctx)
538+
539+
kad.Trigger()
540+
541+
// One failed call, then the worker blocks in the backoff.
542+
synctest.Wait()
543+
if got := len(ps.SyncCalls(addr)); got != 1 {
544+
t.Fatalf("expected worker to back off after 1 failed call, got %d calls", got)
545+
}
546+
547+
// Sleeping out the backoff releases exactly one retry.
548+
time.Sleep(puller.SyncRetryBackoff)
549+
synctest.Wait()
550+
if got := len(ps.SyncCalls(addr)); got != 2 {
551+
t.Fatalf("expected exactly one retry after the backoff, got %d calls", got)
552+
}
553+
554+
if err := p.Close(); err != nil {
555+
t.Errorf("close puller: %v", err)
556+
}
557+
})
558+
}
559+
511560
// TestRadiusDecreaseLiveness verifies that after a radius decrease a new sync
512561
// worker starts without the manager blocking while the disconnected peer's
513562
// in-flight worker drains. Under synctest, a blocking disconnect holds up the

0 commit comments

Comments
 (0)