From 61fab37b0f031b1aabbd33c26262d10598d86103 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:10:10 +0100 Subject: [PATCH 1/2] chore: update postage snapshot to v0.0.6 (#5401) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index cf602914109..3e474dd7899 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/caddyserver/certmagic v0.21.6 github.com/coreos/go-semver v0.3.0 github.com/ethereum/go-ethereum v1.15.11 - github.com/ethersphere/batch-archive v0.0.5 + github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 diff --git a/go.sum b/go.sum index fab9e1c2f69..c3653e0463e 100644 --- a/go.sum +++ b/go.sum @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4= -github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= +github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY= +github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= From 8f239857b11275264a5c02a51b55ff611ce29385 Mon Sep 17 00:00:00 2001 From: Marios Isaakidis Date: Mon, 30 Mar 2026 13:30:11 +0300 Subject: [PATCH 2/2] fix(puller): non-blocking peer disconnect and sync error backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When onChange held syncPeersMtx and called disconnectPeer, the inner peer.stop() cancelled per-bin goroutine contexts and then called peer.wg.Wait(). Live goroutines blocked in Sync() → ReadMsgWithContext and only unblocked after pageTimeout (1s) per stream. For N peers disconnecting during a radius decrease, the outer lock was held for up to N×1s, stalling all queued topology-change notifications for the same duration. When syncer.Sync returned a non-fatal error (connection reset, protocol error, stream timeout), the goroutine fell through to limiter.WaitN with count=0 and looped immediately. Any persistent non-fatal error caused a tight CPU spin until the peer disconnected or context was cancelled. Split syncPeer.stop() into cancelBins() (cancel all per-bin contexts, clear the map, no wait) and stop() (cancel + wait, used only in Close()). disconnectPeer now calls cancelBins(): the peer is removed from the sync map immediately and its goroutines drain in the background. Close() already calls p.wg.Wait(), so shutdown correctness is unchanged. Add syncRetryBackoff (1s) with a ctx.Done()-escape after any non-fatal sync error before the next retry. This bounds the retry rate to ≤1/s per goroutine under persistent errors. --- pkg/puller/puller.go | 21 +++++++++++++--- pkg/puller/puller_test.go | 53 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0bcdcfae9cb..bf0e88f1925 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -42,6 +42,8 @@ const ( maxChunksPerSecond = 1000 // roughly 4 MB/s maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing. + + syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error ) type Options struct { @@ -189,12 +191,13 @@ func (p *Puller) manage(ctx context.Context) { } // disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. +// Goroutines drain in the background; the caller is not blocked waiting for them. // Must be called under lock. func (p *Puller) disconnectPeer(addr swarm.Address) { p.logger.Debug("disconnecting peer", "peer_address", addr) if peer, ok := p.syncPeers[addr.ByteString()]; ok { peer.mtx.Lock() - peer.stop() + peer.cancelBins() peer.mtx.Unlock() } delete(p.syncPeers, addr.ByteString()) @@ -349,6 +352,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint return } p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) + select { + case <-time.After(syncRetryBackoff): + case <-ctx.Done(): + return + } } _ = p.limiter.WaitN(ctx, count) @@ -548,12 +556,19 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { } } -// called when peer disconnects or on shutdown, cleans up ongoing sync operations -func (p *syncPeer) stop() { +// cancelBins cancels all per-bin sync contexts and clears the cancel-func map. +// Does not wait for goroutines to exit. Must be called under peer.mtx. +func (p *syncPeer) cancelBins() { for bin, c := range p.binCancelFuncs { c() delete(p.binCancelFuncs, bin) } +} + +// stop cancels all per-bin sync contexts and waits for goroutines to exit. +// Used on shutdown. Must be called under peer.mtx. +func (p *syncPeer) stop() { + p.cancelBins() p.wg.Wait() } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 29687138b9c..e12007d9a3f 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -507,6 +507,59 @@ func TestContinueSyncing(t *testing.T) { } } +// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a +// backoff before the next retry, bounding the retry rate to roughly 1/s. +func TestSyncErrorBackoff(t *testing.T) { + t.Parallel() + + addr := swarm.RandAddress(t) + + // Use Topmost=0 so that top < start and the interval is never advanced, + // causing the loop to retry with the same start value each time. + // Provide two replies so we can observe two successive sync calls. + _, _, kad, ps := newPuller(t, opts{ + kad: []kadMock.Option{ + kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{100}, 0), + mockps.WithSyncError(errors.New("stream error")), + mockps.WithReplies( + mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr}, + mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr}, + ), + }, + bins: 1, + rs: resMock.NewReserve(resMock.WithRadius(0)), + }) + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + + // wait for the first call + err := spinlock.Wait(2*time.Second, func() bool { + return len(ps.SyncCalls(addr)) >= 1 + }) + if err != nil { + t.Fatal("timed out waiting for first sync call") + } + t1 := time.Now() + + // wait for the second call — must be separated by at least syncRetryBackoff + err = spinlock.Wait(3*time.Second, func() bool { + return len(ps.SyncCalls(addr)) >= 2 + }) + if err != nil { + t.Fatal("timed out waiting for second sync call") + } + elapsed := time.Since(t1) + + const minBackoff = 800 * time.Millisecond // allow 200ms tolerance below syncRetryBackoff + if elapsed < minBackoff { + t.Fatalf("retry happened too fast: elapsed %v, want >= %v", elapsed, minBackoff) + } +} + func TestPeerGone(t *testing.T) { t.Parallel()