diff --git a/go.mod b/go.mod index cf602914109..3e474dd7899 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/caddyserver/certmagic v0.21.6 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.15.11 - github.com/ethersphere/batch-archive v0.0.5 + github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 diff --git a/go.sum b/go.sum index fab9e1c2f69..c3653e0463e 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4= -github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= +github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY= +github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0bcdcfae9cb..bf0e88f1925 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -42,6 +42,8 @@ const ( maxChunksPerSecond = 1000 // roughly 4 MB/s maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing. + + syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error ) type Options struct { @@ -189,12 +191,13 @@ func (p *Puller) manage(ctx context.Context) { } // disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. +// Goroutines drain in the background; the caller is not blocked waiting for them. // Must be called under lock. func (p *Puller) disconnectPeer(addr swarm.Address) { p.logger.Debug("disconnecting peer", "peer_address", addr) if peer, ok := p.syncPeers[addr.ByteString()]; ok { peer.mtx.Lock() - peer.stop() + peer.cancelBins() peer.mtx.Unlock() } delete(p.syncPeers, addr.ByteString()) @@ -349,6 +352,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint return } p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) + select { + case <-time.After(syncRetryBackoff): + case <-ctx.Done(): + return + } } _ = p.limiter.WaitN(ctx, count) @@ -548,12 +556,19 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { } } -// called when peer disconnects or on shutdown, cleans up ongoing sync operations -func (p *syncPeer) stop() { +// cancelBins cancels all per-bin sync contexts and clears the cancel-func map. +// Does not wait for goroutines to exit. Must be called under peer.mtx. +func (p *syncPeer) cancelBins() { for bin, c := range p.binCancelFuncs { c() delete(p.binCancelFuncs, bin) } +} + +// stop cancels all per-bin sync contexts and waits for goroutines to exit. +// Used on shutdown. Must be called under peer.mtx. +func (p *syncPeer) stop() { + p.cancelBins() p.wg.Wait() } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..e12007d9a3f 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -507,6 +507,59 @@ func TestContinueSyncing(t *testing.T) { } } +// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a +// backoff before the next retry, bounding the retry rate to roughly 1/s. +func TestSyncErrorBackoff(t *testing.T) { + t.Parallel() + + addr := swarm.RandAddress(t) + + // Use Topmost=0 so that top < start and the interval is never advanced, + // causing the loop to retry with the same start value each time. + // Provide two replies so we can observe two successive sync calls. + _, _, kad, ps := newPuller(t, opts{ + kad: []kadMock.Option{ + kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{100}, 0), + mockps.WithSyncError(errors.New("stream error")), + mockps.WithReplies( + mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr}, + mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr}, + ), + }, + bins: 1, + rs: resMock.NewReserve(resMock.WithRadius(0)), + }) + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + // wait for the first call + err := spinlock.Wait(2*time.Second, func() bool { + return len(ps.SyncCalls(addr)) >= 1 + }) + if err != nil { + t.Fatal("timed out waiting for first sync call") + } + t1 := time.Now() + + // wait for the second call — must be separated by at least syncRetryBackoff + err = spinlock.Wait(3*time.Second, func() bool { + return len(ps.SyncCalls(addr)) >= 2 + }) + if err != nil { + t.Fatal("timed out waiting for second sync call") + } + elapsed := time.Since(t1) + + const minBackoff = 800 * time.Millisecond // allow 200ms tolerance below syncRetryBackoff + if elapsed < minBackoff { + t.Fatalf("retry happened too fast: elapsed %v, want >= %v", elapsed, minBackoff) + } +} + func TestPeerGone(t *testing.T) { t.Parallel()