Skip to content

Commit aae59a5

Browse files
authored
Merge pull request #2679 from CortexFoundation/dev
p2p: track in-progress inbound node IDs
2 parents ecdb898 + 2f236df commit aae59a5

3 files changed

Lines changed: 137 additions & 18 deletions

File tree

p2p/dial.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
errSelf = errors.New("is self")
7777
errAlreadyDialing = errors.New("already dialing")
7878
errAlreadyConnected = errors.New("already connected")
79+
errPendingInbound = errors.New("peer has pending inbound connection")
7980
errRecentlyDialed = errors.New("recently dialed")
8081
errNetRestrict = errors.New("not contained in netrestrict list")
8182
errNoPort = errors.New("node does not provide TCP port")
@@ -104,12 +105,15 @@ type dialScheduler struct {
104105
remStaticCh chan *enode.Node
105106
addPeerCh chan *conn
106107
remPeerCh chan *conn
108+
addPendingCh chan enode.ID
109+
remPendingCh chan enode.ID
107110

108111
// Everything below here belongs to loop and
109112
// should only be accessed by code on the loop goroutine.
110-
dialing map[enode.ID]*dialTask // active tasks
111-
peers map[enode.ID]struct{} // all connected peers
112-
dialPeers int // current number of dialed peers
113+
dialing map[enode.ID]*dialTask // active tasks
114+
peers map[enode.ID]struct{} // all connected peers
115+
pendingInbound map[enode.ID]struct{} // in-progress inbound connections
116+
dialPeers int // current number of dialed peers
113117

114118
// The static map tracks all static dial tasks. The subset of usable static dial tasks
115119
// (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
@@ -163,19 +167,22 @@ func (cfg dialConfig) withDefaults() dialConfig {
163167
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
164168
cfg := config.withDefaults()
165169
d := &dialScheduler{
166-
dialConfig: cfg,
167-
historyTimer: mclock.NewAlarm(cfg.clock),
168-
setupFunc: setupFunc,
169-
dnsLookupFunc: net.DefaultResolver.LookupNetIP,
170-
dialing: make(map[enode.ID]*dialTask),
171-
static: make(map[enode.ID]*dialTask),
172-
peers: make(map[enode.ID]struct{}),
173-
doneCh: make(chan *dialTask),
174-
nodesIn: make(chan *enode.Node),
175-
addStaticCh: make(chan *enode.Node),
176-
remStaticCh: make(chan *enode.Node),
177-
addPeerCh: make(chan *conn),
178-
remPeerCh: make(chan *conn),
170+
dialConfig: cfg,
171+
historyTimer: mclock.NewAlarm(cfg.clock),
172+
setupFunc: setupFunc,
173+
dnsLookupFunc: net.DefaultResolver.LookupNetIP,
174+
dialing: make(map[enode.ID]*dialTask),
175+
static: make(map[enode.ID]*dialTask),
176+
peers: make(map[enode.ID]struct{}),
177+
pendingInbound: make(map[enode.ID]struct{}),
178+
doneCh: make(chan *dialTask),
179+
nodesIn: make(chan *enode.Node),
180+
addStaticCh: make(chan *enode.Node),
181+
remStaticCh: make(chan *enode.Node),
182+
addPeerCh: make(chan *conn),
183+
remPeerCh: make(chan *conn),
184+
addPendingCh: make(chan enode.ID),
185+
remPendingCh: make(chan enode.ID),
179186
}
180187
d.lastStatsLog = d.clock.Now()
181188
d.ctx, d.cancel = context.WithCancel(context.Background())
@@ -223,6 +230,22 @@ func (d *dialScheduler) peerRemoved(c *conn) {
223230
}
224231
}
225232

233+
// inboundPending notifies the scheduler about a pending inbound connection.
234+
func (d *dialScheduler) inboundPending(id enode.ID) {
235+
select {
236+
case d.addPendingCh <- id:
237+
case <-d.ctx.Done():
238+
}
239+
}
240+
241+
// inboundCompleted notifies the scheduler that an inbound connection completed or failed.
242+
func (d *dialScheduler) inboundCompleted(id enode.ID) {
243+
select {
244+
case d.remPendingCh <- id:
245+
case <-d.ctx.Done():
246+
}
247+
}
248+
226249
// loop is the main loop of the dialer.
227250
func (d *dialScheduler) loop(it enode.Iterator) {
228251
var (
@@ -276,6 +299,15 @@ loop:
276299
delete(d.peers, c.node.ID())
277300
d.updateStaticPool(c.node.ID())
278301

302+
case id := <-d.addPendingCh:
303+
d.pendingInbound[id] = struct{}{}
304+
d.log.Trace("Marked node as pending inbound", "id", id)
305+
306+
case id := <-d.remPendingCh:
307+
delete(d.pendingInbound, id)
308+
d.updateStaticPool(id)
309+
d.log.Trace("Unmarked node as pending inbound", "id", id)
310+
279311
case node := <-d.addStaticCh:
280312
id := node.ID()
281313
_, exists := d.static[id]
@@ -390,6 +422,9 @@ func (d *dialScheduler) checkDial(n *enode.Node) error {
390422
if _, ok := d.peers[n.ID()]; ok {
391423
return errAlreadyConnected
392424
}
425+
if _, ok := d.pendingInbound[n.ID()]; ok {
426+
return errPendingInbound
427+
}
393428
if d.netRestrict != nil && !d.netRestrict.ContainsAddr(n.IPAddr()) {
394429
return errNetRestrict
395430
}

p2p/dial_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,82 @@ func TestDialSchedDNSHostname(t *testing.T) {
423423
})
424424
}
425425

426+
// This test checks that nodes with pending inbound connections are not dialed.
427+
func TestDialSchedPendingInbound(t *testing.T) {
428+
t.Parallel()
429+
430+
config := dialConfig{
431+
maxActiveDials: 5,
432+
maxDialPeers: 4,
433+
}
434+
runDialTest(t, config, []dialTestRound{
435+
// 2 peers are connected, leaving 2 dial slots.
436+
// Node 0x03 has a pending inbound connection.
437+
// Discovered nodes 0x03, 0x04, 0x05 but only 0x04 and 0x05 should be dialed.
438+
{
439+
peersAdded: []*conn{
440+
{flags: dynDialedConn, node: newNode(uintID(0x01), "127.0.0.1:30303")},
441+
{flags: dynDialedConn, node: newNode(uintID(0x02), "127.0.0.2:30303")},
442+
},
443+
update: func(d *dialScheduler) {
444+
d.inboundPending(uintID(0x03))
445+
},
446+
discovered: []*enode.Node{
447+
newNode(uintID(0x03), "127.0.0.3:30303"), // not dialed because pending inbound
448+
newNode(uintID(0x04), "127.0.0.4:30303"),
449+
newNode(uintID(0x05), "127.0.0.5:30303"),
450+
},
451+
wantNewDials: []*enode.Node{
452+
newNode(uintID(0x04), "127.0.0.4:30303"),
453+
newNode(uintID(0x05), "127.0.0.5:30303"),
454+
},
455+
},
456+
// Pending inbound connection for 0x03 completes successfully.
457+
// Node 0x03 becomes a connected peer.
458+
// One dial slot remains, node 0x06 is dialed.
459+
{
460+
update: func(d *dialScheduler) {
461+
// Pending inbound completes
462+
d.inboundCompleted(uintID(0x03))
463+
},
464+
peersAdded: []*conn{
465+
{flags: inboundConn, node: newNode(uintID(0x03), "127.0.0.3:30303")},
466+
},
467+
succeeded: []enode.ID{
468+
uintID(0x04),
469+
},
470+
failed: []enode.ID{
471+
uintID(0x05),
472+
},
473+
discovered: []*enode.Node{
474+
newNode(uintID(0x03), "127.0.0.3:30303"), // not dialed, now connected
475+
newNode(uintID(0x06), "127.0.0.6:30303"),
476+
},
477+
wantNewDials: []*enode.Node{
478+
newNode(uintID(0x06), "127.0.0.6:30303"),
479+
},
480+
},
481+
// Inbound peer 0x03 disconnects.
482+
// Another pending inbound starts for 0x07.
483+
// Only 0x03 should be dialed, not 0x07.
484+
{
485+
peersRemoved: []enode.ID{
486+
uintID(0x03),
487+
},
488+
update: func(d *dialScheduler) {
489+
d.inboundPending(uintID(0x07))
490+
},
491+
discovered: []*enode.Node{
492+
newNode(uintID(0x03), "127.0.0.3:30303"),
493+
newNode(uintID(0x07), "127.0.0.7:30303"), // not dialed because pending inbound
494+
},
495+
wantNewDials: []*enode.Node{
496+
newNode(uintID(0x03), "127.0.0.3:30303"),
497+
},
498+
},
499+
})
500+
}
501+
426502
// -------
427503
// Code below here is the framework for the tests above.
428504

p2p/server.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,8 +686,11 @@ running:
686686
// Ensure that the trusted flag is set before checking against MaxPeers.
687687
c.flags |= trustedConn
688688
}
689-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
690-
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
689+
err := srv.postHandshakeChecks(peers, inboundCount, c)
690+
if err == nil && c.flags&inboundConn != 0 {
691+
srv.dialsched.inboundPending(c.node.ID())
692+
}
693+
c.cont <- err
691694

692695
case c := <-srv.checkpointAddPeer:
693696
// At this point the connection is past the protocol handshake.
@@ -870,6 +873,11 @@ func (srv *Server) checkInboundConn(remoteIP netip.Addr) error {
870873
// or the handshakes have failed.
871874
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
872875
c := &conn{fd: fd, flags: flags, cont: make(chan error)}
876+
defer func() {
877+
if c.is(inboundConn) && c.node != nil {
878+
srv.dialsched.inboundCompleted(c.node.ID())
879+
}
880+
}()
873881
if dialDest == nil {
874882
c.transport = srv.newTransport(fd, nil)
875883
} else {

0 commit comments

Comments
 (0)