Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/caddyserver/certmagic v0.21.6
github.com/coreos/go-semver v0.3.0
github.com/ethereum/go-ethereum v1.15.11
github.com/ethersphere/batch-archive v0.0.5
github.com/ethersphere/batch-archive v0.0.6
github.com/ethersphere/go-price-oracle-abi v0.6.9
github.com/ethersphere/go-storage-incentives-abi v0.9.4
github.com/ethersphere/go-sw3-abi v0.6.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ github.com/ethereum/go-ethereum v1.15.11 h1:JK73WKeu0WC0O1eyX+mdQAVHUV+UR1a9VB/d
github.com/ethereum/go-ethereum v1.15.11/go.mod h1:mf8YiHIb0GR4x4TipcvBUPxJLw1mFdmxzoDi11sDRoI=
github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8=
github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
github.com/ethersphere/batch-archive v0.0.5 h1:SM3g7Tuge4KhOn+NKgPcg2Uz2p8a/MLKZvZpmkKCyU4=
github.com/ethersphere/batch-archive v0.0.5/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC438/FFZdONY=
github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q=
github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU=
github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk=
github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0=
Expand Down
21 changes: 18 additions & 3 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
maxChunksPerSecond = 1000 // roughly 4 MB/s

maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing.

syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error
)

type Options struct {
Expand Down Expand Up @@ -189,12 +191,13 @@ func (p *Puller) manage(ctx context.Context) {
}

// disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map.
// Goroutines drain in the background; the caller is not blocked waiting for them.
// Must be called under lock.
func (p *Puller) disconnectPeer(addr swarm.Address) {
p.logger.Debug("disconnecting peer", "peer_address", addr)
if peer, ok := p.syncPeers[addr.ByteString()]; ok {
peer.mtx.Lock()
peer.stop()
peer.cancelBins()
peer.mtx.Unlock()
}
delete(p.syncPeers, addr.ByteString())
Expand Down Expand Up @@ -349,6 +352,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
return
}
p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
select {
case <-time.After(syncRetryBackoff):
case <-ctx.Done():
return
}
}

_ = p.limiter.WaitN(ctx, count)
Expand Down Expand Up @@ -548,12 +556,19 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
}
}

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) stop() {
// cancelBins cancels all per-bin sync contexts and clears the cancel-func map.
// Does not wait for goroutines to exit. Must be called under peer.mtx.
func (p *syncPeer) cancelBins() {
for bin, c := range p.binCancelFuncs {
c()
delete(p.binCancelFuncs, bin)
}
}

// stop cancels all per-bin sync contexts and waits for goroutines to exit.
// Used on shutdown. Must be called under peer.mtx.
func (p *syncPeer) stop() {
p.cancelBins()
p.wg.Wait()
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,59 @@ func TestContinueSyncing(t *testing.T) {
}
}

// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a
// backoff before the next retry, bounding the retry rate to roughly 1/s.
func TestSyncErrorBackoff(t *testing.T) {
t.Parallel()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if we could use synctest here instead of the sleeps, spinlock etc.


addr := swarm.RandAddress(t)

// Use Topmost=0 so that top < start and the interval is never advanced,
// causing the loop to retry with the same start value each time.
// Provide two replies so we can observe two successive sync calls.
_, _, kad, ps := newPuller(t, opts{
kad: []kadMock.Option{
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}),
},
pullSync: []mockps.Option{
mockps.WithCursors([]uint64{100}, 0),
mockps.WithSyncError(errors.New("stream error")),
mockps.WithReplies(
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
),
},
bins: 1,
rs: resMock.NewReserve(resMock.WithRadius(0)),
})

time.Sleep(100 * time.Millisecond)
kad.Trigger()

// wait for the first call
err := spinlock.Wait(2*time.Second, func() bool {
return len(ps.SyncCalls(addr)) >= 1
})
if err != nil {
t.Fatal("timed out waiting for first sync call")
}
t1 := time.Now()

// wait for the second call — must be separated by at least syncRetryBackoff
err = spinlock.Wait(3*time.Second, func() bool {
return len(ps.SyncCalls(addr)) >= 2
})
if err != nil {
t.Fatal("timed out waiting for second sync call")
}
elapsed := time.Since(t1)

const minBackoff = 800 * time.Millisecond // allow 200ms tolerance below syncRetryBackoff
if elapsed < minBackoff {
t.Fatalf("retry happened too fast: elapsed %v, want >= %v", elapsed, minBackoff)
}
}

func TestPeerGone(t *testing.T) {
t.Parallel()

Expand Down