Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/puller/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package puller

import "github.com/ethersphere/bee/v2/pkg/swarm"

var PeerIntervalKey = peerIntervalKey
var (
PeerIntervalKey = peerIntervalKey
SyncRetryBackoff = syncRetryBackoff
)

func (p *Puller) IsSyncing(addr swarm.Address) bool {
p.syncPeersMtx.Lock()
Expand Down
21 changes: 18 additions & 3 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
maxChunksPerSecond = 1000 // roughly 4 MB/s

maxPODelta = 2 // the lowest level of proximity order (of peers) subtracted from the storage radius allowed for chunk syncing.

syncRetryBackoff = time.Second // minimum wait between retries after a non-fatal sync error
)

type Options struct {
Expand Down Expand Up @@ -222,12 +224,13 @@ func (p *Puller) manage(ctx context.Context) {
}

// disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map.
// Goroutines drain in the background; the caller is not blocked waiting for them.
// Must be called under lock.
func (p *Puller) disconnectPeer(addr swarm.Address) {
p.logger.Debug("disconnecting peer", "peer_address", addr)
if peer, ok := p.syncPeers[addr.ByteString()]; ok {
peer.mtx.Lock()
peer.stop()
peer.cancelBins()
peer.mtx.Unlock()
}
delete(p.syncPeers, addr.ByteString())
Expand Down Expand Up @@ -383,6 +386,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
}
errCount := countErrors(err)
p.logger.Debug("syncWorker interval failed", "error_count", errCount, "example_error", errors.Unwrap(err), "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
select {
case <-time.After(syncRetryBackoff):
case <-ctx.Done():
return
}
}

_ = p.limiter.WaitN(ctx, count)
Expand Down Expand Up @@ -582,12 +590,19 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
}
}

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) stop() {
// cancelBins cancels all per-bin sync contexts and clears the cancel-func map.
// Does not wait for goroutines to exit. Must be called under peer.mtx.
func (p *syncPeer) cancelBins() {
for bin, c := range p.binCancelFuncs {
c()
delete(p.binCancelFuncs, bin)
}
}

// stop cancels all per-bin sync contexts and waits for goroutines to exit.
// Used on shutdown. Must be called under peer.mtx.
func (p *syncPeer) stop() {
p.cancelBins()
p.wg.Wait()
}

Expand Down
117 changes: 117 additions & 0 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"testing"
"testing/synctest"
"time"

"github.com/ethersphere/bee/v2/pkg/log"
Expand Down Expand Up @@ -507,6 +508,122 @@ func TestContinueSyncing(t *testing.T) {
}
}

// TestSyncErrorBackoff verifies that a non-fatal sync error is followed by a
// backoff before the next retry. Under synctest the worker blocks after one
// failed call; without the backoff it would spin and consume both replies, so
// the "exactly one call" check fails without the fix.
func TestSyncErrorBackoff(t *testing.T) {
addr := swarm.RandAddress(t)

synctest.Test(t, func(t *testing.T) {
// Topmost=0 keeps top < start so the interval never advances and the loop
// retries with the same start value. Two replies cover the failed retry.
ps := mockps.NewPullSync(
mockps.WithCursors([]uint64{100}, 0),
mockps.WithSyncError(errors.New("stream error")),
mockps.WithReplies(
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
mockps.SyncReply{Bin: 0, Start: 1, Topmost: 0, Peer: addr},
),
)
kad := kadMock.NewMockKademlia(
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: addr, PO: 0}),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p := puller.New(swarm.RandAddress(t), mock.NewStateStore(), kad, resMock.NewReserve(resMock.WithRadius(0)), ps, nil, log.Noop, puller.Options{Bins: 1})
p.Start(ctx)

kad.Trigger()

// One failed call, then the worker blocks in the backoff.
synctest.Wait()
if got := len(ps.SyncCalls(addr)); got != 1 {
t.Fatalf("expected worker to back off after 1 failed call, got %d calls", got)
}

// Sleeping out the backoff releases exactly one retry.
time.Sleep(puller.SyncRetryBackoff)
synctest.Wait()
if got := len(ps.SyncCalls(addr)); got != 2 {
t.Fatalf("expected exactly one retry after the backoff, got %d calls", got)
}

if err := p.Close(); err != nil {
t.Errorf("close puller: %v", err)
}
})
}

// TestRadiusDecreaseLiveness verifies that after a radius decrease a new sync
// worker starts without the manager blocking while the disconnected peer's
// in-flight worker drains. Under synctest, a blocking disconnect holds up the
// restart by cancelDelay; a non-blocking one restarts immediately.
func TestRadiusDecreaseLiveness(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const (
bins = 4
initialRadius = 2 // peer within depth: only bins >= 2 sync initially
newRadius = 0 // radius decrease: all bins must be resynced
// cancelDelay models a slow drain of the in-flight Sync after its
// context is cancelled; > 0 and < Close's 10s drain timeout.
cancelDelay = time.Second
)

base := swarm.RandAddress(t)
// peer PO matches initialRadius so it participates both before and after.
peerAddr := swarm.RandAddressAt(t, base, initialRadius)

rs := resMock.NewReserve(resMock.WithRadius(initialRadius))
ps := mockps.NewPullSync(
mockps.WithCursors(make([]uint64, bins), 0),
mockps.WithSyncCancelDelay(cancelDelay),
)
kad := kadMock.NewMockKademlia(
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: peerAddr, PO: initialRadius}),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p := puller.New(base, mock.NewStateStore(), kad, rs, ps, nil, log.Noop, puller.Options{Bins: bins})
p.Start(ctx)

// Wait for syncing to start (peer is within depth at radius=2).
kad.Trigger()
synctest.Wait()
if got := ps.TotalSyncCalls(); got < 1 {
t.Fatalf("expected syncing to start, got %d sync calls", got)
}
snapshot := ps.TotalSyncCalls()

// Radius decrease tears down the peer's workers and restarts them.
rs.SetStorageRadius(newRadius)
start := time.Now()
kad.Trigger()
synctest.Wait()

// A new worker must have started without the manager blocking on the
// in-flight drain. Use Errorf not Fatalf so we still reach Close: on the
// bug the manager is blocked waiting for the drain and Close's cancel
// is what lets the bubble exit.
if ps.TotalSyncCalls() <= snapshot {
t.Errorf("no new sync worker started after radius decrease (still %d); "+
"manager blocked draining the in-flight goroutine during disconnect", ps.TotalSyncCalls())
}
if elapsed := time.Since(start); elapsed >= cancelDelay {
t.Errorf("manager blocked on in-flight drain: elapsed %v, want < %v", elapsed, cancelDelay)
}

// Close inside the bubble so the sleeping goroutines drain.
if err := p.Close(); err != nil {
t.Errorf("close puller: %v", err)
}
})
}

func TestPeerGone(t *testing.T) {
t.Parallel()

Expand Down
27 changes: 27 additions & 0 deletions pkg/pullsync/mock/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/pullsync"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -36,6 +37,15 @@ func WithReplies(replies ...SyncReply) Option {
})
}

// WithSyncCancelDelay adds an artificial delay after context cancellation in
// the blocking Sync path (when no reply is configured), simulating slow TCP
// teardown latency.
func WithSyncCancelDelay(d time.Duration) Option {
return optionFunc(func(p *PullSyncMock) {
p.cancelDelay = d
})
}

func toID(a swarm.Address, bin uint8, start uint64) string {
return fmt.Sprintf("%s-%d-%d", a, bin, start)
}
Expand All @@ -51,11 +61,13 @@ type SyncReply struct {
type PullSyncMock struct {
mtx sync.Mutex
syncCalls []SyncReply
totalSyncCalls int // every Sync() entry, including blocking ones
syncErr error
cursors []uint64
epoch uint64
getCursorsPeers []swarm.Address
replies map[string][]SyncReply
cancelDelay time.Duration // extra delay after ctx cancellation in blocking path

quit chan struct{}
}
Expand All @@ -74,6 +86,8 @@ func NewPullSync(opts ...Option) *PullSyncMock {
func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) {
p.mtx.Lock()

p.totalSyncCalls++

id := toID(peer, bin, start)
replies := p.replies[id]

Expand All @@ -84,11 +98,24 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8,
p.mtx.Unlock()
return reply.Topmost, reply.Count, p.syncErr
}

cancelDelay := p.cancelDelay
p.mtx.Unlock()

<-ctx.Done()
if cancelDelay > 0 {
time.Sleep(cancelDelay)
}
return 0, 0, ctx.Err()
}

// TotalSyncCalls returns the total number of Sync() invocations, including blocking ones.
func (p *PullSyncMock) TotalSyncCalls() int {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.totalSyncCalls
}

func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, uint64, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
Expand Down
Loading