Skip to content

Commit 184af1e

Browse files
committed
fix(puller): non-blocking peer disconnect
When onChange held syncPeersMtx and called disconnectPeer, the inner peer.stop() cancelled per-bin goroutine contexts and then called peer.wg.Wait(). Live goroutines blocked in Sync() -> ReadMsgWithContext and only unblocked after pageTimeout (1s) per stream. For N peers disconnecting during a radius decrease, the outer lock was held for up to N x 1s, stalling all queued topology-change notifications for the same duration. Split syncPeer.stop() into cancelBins() (cancel all per-bin contexts, clear the map, no wait) and stop() (cancel + wait, used only in Close()). disconnectPeer now calls cancelBins(): the peer is removed from the sync map immediately and its goroutines drain in the background. Close() already calls p.wg.Wait(), so shutdown correctness is unchanged. Covered by TestRadiusDecreaseLiveness (synctest).
1 parent 025d9d4 commit 184af1e

3 files changed

Lines changed: 106 additions & 3 deletions

File tree

pkg/puller/puller.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,13 @@ func (p *Puller) manage(ctx context.Context) {
222222
}
223223

224224
// disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map.
225+
// Goroutines drain in the background; the caller is not blocked waiting for them.
225226
// Must be called under lock.
226227
func (p *Puller) disconnectPeer(addr swarm.Address) {
227228
p.logger.Debug("disconnecting peer", "peer_address", addr)
228229
if peer, ok := p.syncPeers[addr.ByteString()]; ok {
229230
peer.mtx.Lock()
230-
peer.stop()
231+
peer.cancelBins()
231232
peer.mtx.Unlock()
232233
}
233234
delete(p.syncPeers, addr.ByteString())
@@ -582,12 +583,19 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
582583
}
583584
}
584585

585-
// called when peer disconnects or on shutdown, cleans up ongoing sync operations
586-
func (p *syncPeer) stop() {
586+
// cancelBins cancels all per-bin sync contexts and clears the cancel-func map.
587+
// Does not wait for goroutines to exit. Must be called under peer.mtx.
588+
func (p *syncPeer) cancelBins() {
587589
for bin, c := range p.binCancelFuncs {
588590
c()
589591
delete(p.binCancelFuncs, bin)
590592
}
593+
}
594+
595+
// stop cancels all per-bin sync contexts and waits for goroutines to exit.
596+
// Used on shutdown. Must be called under peer.mtx.
597+
func (p *syncPeer) stop() {
598+
p.cancelBins()
591599
p.wg.Wait()
592600
}
593601

pkg/puller/puller_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"fmt"
1111
"testing"
12+
"testing/synctest"
1213
"time"
1314

1415
"github.com/ethersphere/bee/v2/pkg/log"
@@ -507,6 +508,73 @@ func TestContinueSyncing(t *testing.T) {
507508
}
508509
}
509510

511+
// TestRadiusDecreaseLiveness verifies that after a radius decrease a new sync
512+
// worker starts without the manager blocking while the disconnected peer's
513+
// in-flight worker drains. Under synctest, a blocking disconnect holds up the
514+
// restart by cancelDelay; a non-blocking one restarts immediately.
515+
func TestRadiusDecreaseLiveness(t *testing.T) {
516+
synctest.Test(t, func(t *testing.T) {
517+
const (
518+
bins = 4
519+
initialRadius = 2 // peer within depth: only bins >= 2 sync initially
520+
newRadius = 0 // radius decrease: all bins must be resynced
521+
// cancelDelay models a slow drain of the in-flight Sync after its
522+
// context is cancelled; > 0 and < Close's 10s drain timeout.
523+
cancelDelay = time.Second
524+
)
525+
526+
base := swarm.RandAddress(t)
527+
// peer PO matches initialRadius so it participates both before and after.
528+
peerAddr := swarm.RandAddressAt(t, base, initialRadius)
529+
530+
rs := resMock.NewReserve(resMock.WithRadius(initialRadius))
531+
ps := mockps.NewPullSync(
532+
mockps.WithCursors(make([]uint64, bins), 0),
533+
mockps.WithSyncCancelDelay(cancelDelay),
534+
)
535+
kad := kadMock.NewMockKademlia(
536+
kadMock.WithEachPeerRevCalls(kadMock.AddrTuple{Addr: peerAddr, PO: initialRadius}),
537+
)
538+
539+
ctx, cancel := context.WithCancel(context.Background())
540+
defer cancel()
541+
542+
p := puller.New(base, mock.NewStateStore(), kad, rs, ps, nil, log.Noop, puller.Options{Bins: bins})
543+
p.Start(ctx)
544+
545+
// Wait for syncing to start (peer is within depth at radius=2).
546+
kad.Trigger()
547+
synctest.Wait()
548+
if got := ps.TotalSyncCalls(); got < 1 {
549+
t.Fatalf("expected syncing to start, got %d sync calls", got)
550+
}
551+
snapshot := ps.TotalSyncCalls()
552+
553+
// Radius decrease tears down the peer's workers and restarts them.
554+
rs.SetStorageRadius(newRadius)
555+
start := time.Now()
556+
kad.Trigger()
557+
synctest.Wait()
558+
559+
// A new worker must have started without the manager blocking on the
560+
// in-flight drain. Use Errorf not Fatalf so we still reach Close: on the
561+
// bug the manager is blocked waiting for the drain and Close's cancel
562+
// is what lets the bubble exit.
563+
if ps.TotalSyncCalls() <= snapshot {
564+
t.Errorf("no new sync worker started after radius decrease (still %d); "+
565+
"manager blocked draining the in-flight goroutine during disconnect", ps.TotalSyncCalls())
566+
}
567+
if elapsed := time.Since(start); elapsed >= cancelDelay {
568+
t.Errorf("manager blocked on in-flight drain: elapsed %v, want < %v", elapsed, cancelDelay)
569+
}
570+
571+
// Close inside the bubble so the sleeping goroutines drain.
572+
if err := p.Close(); err != nil {
573+
t.Errorf("close puller: %v", err)
574+
}
575+
})
576+
}
577+
510578
func TestPeerGone(t *testing.T) {
511579
t.Parallel()
512580

pkg/pullsync/mock/pullsync.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"fmt"
1010
"sync"
11+
"time"
1112

1213
"github.com/ethersphere/bee/v2/pkg/pullsync"
1314
"github.com/ethersphere/bee/v2/pkg/swarm"
@@ -36,6 +37,15 @@ func WithReplies(replies ...SyncReply) Option {
3637
})
3738
}
3839

40+
// WithSyncCancelDelay adds an artificial delay after context cancellation in
41+
// the blocking Sync path (when no reply is configured), simulating slow TCP
42+
// teardown latency.
43+
func WithSyncCancelDelay(d time.Duration) Option {
44+
return optionFunc(func(p *PullSyncMock) {
45+
p.cancelDelay = d
46+
})
47+
}
48+
3949
func toID(a swarm.Address, bin uint8, start uint64) string {
4050
return fmt.Sprintf("%s-%d-%d", a, bin, start)
4151
}
@@ -51,11 +61,13 @@ type SyncReply struct {
5161
type PullSyncMock struct {
5262
mtx sync.Mutex
5363
syncCalls []SyncReply
64+
totalSyncCalls int // every Sync() entry, including blocking ones
5465
syncErr error
5566
cursors []uint64
5667
epoch uint64
5768
getCursorsPeers []swarm.Address
5869
replies map[string][]SyncReply
70+
cancelDelay time.Duration // extra delay after ctx cancellation in blocking path
5971

6072
quit chan struct{}
6173
}
@@ -74,6 +86,8 @@ func NewPullSync(opts ...Option) *PullSyncMock {
7486
func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8, start uint64) (topmost uint64, count int, err error) {
7587
p.mtx.Lock()
7688

89+
p.totalSyncCalls++
90+
7791
id := toID(peer, bin, start)
7892
replies := p.replies[id]
7993

@@ -84,11 +98,24 @@ func (p *PullSyncMock) Sync(ctx context.Context, peer swarm.Address, bin uint8,
8498
p.mtx.Unlock()
8599
return reply.Topmost, reply.Count, p.syncErr
86100
}
101+
102+
cancelDelay := p.cancelDelay
87103
p.mtx.Unlock()
104+
88105
<-ctx.Done()
106+
if cancelDelay > 0 {
107+
time.Sleep(cancelDelay)
108+
}
89109
return 0, 0, ctx.Err()
90110
}
91111

112+
// TotalSyncCalls returns the total number of Sync() invocations, including blocking ones.
113+
func (p *PullSyncMock) TotalSyncCalls() int {
114+
p.mtx.Lock()
115+
defer p.mtx.Unlock()
116+
return p.totalSyncCalls
117+
}
118+
92119
func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, uint64, error) {
93120
p.mtx.Lock()
94121
defer p.mtx.Unlock()

0 commit comments

Comments
 (0)