Skip to content

Commit 7445717

Browse files
apollo_network: track connection id in DialPeerStream dial state (#13488)
1 parent 4277bee commit 7445717

2 files changed

Lines changed: 64 additions & 41 deletions

File tree

crates/apollo_network/src/discovery/behaviours/dialing/dial_peer.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::task::{Context, Poll, Waker};
88
use futures::Stream;
99
use libp2p::swarm::behaviour::ConnectionEstablished;
1010
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
11-
use libp2p::swarm::{dummy, ConnectionHandler, DialFailure, FromSwarm, ToSwarm};
11+
use libp2p::swarm::{dummy, ConnectionHandler, ConnectionId, DialFailure, FromSwarm, ToSwarm};
1212
use libp2p::{Multiaddr, PeerId};
1313
use tokio::time::{Instant, Sleep};
1414
use tokio_retry::strategy::ExponentialBackoff;
@@ -34,8 +34,8 @@ pub struct DialPeerStream {
3434
enum DialState {
3535
/// Waiting to dial (immediately or after backoff).
3636
PendingDial,
37-
/// A dial attempt is in progress.
38-
Dialing,
37+
/// A dial attempt is in progress with the given connection id.
38+
Dialing(ConnectionId),
3939
/// Terminal state - connection was established after the request, no guarantee if it's still
4040
/// connected.
4141
Done,
@@ -72,10 +72,10 @@ impl DialPeerStream {
7272
self.state = DialState::Done;
7373
self.wake();
7474
}
75-
FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), .. })
76-
if peer_id == self.peer_id =>
77-
{
78-
if self.state != DialState::Dialing {
75+
FromSwarm::DialFailure(DialFailure {
76+
peer_id: Some(peer_id), connection_id, ..
77+
}) if peer_id == self.peer_id => {
78+
if self.state != DialState::Dialing(connection_id) {
7979
return;
8080
}
8181
let backoff = self
@@ -100,14 +100,13 @@ impl DialPeerStream {
100100

101101
fn emit_dial<T, W>(&mut self) -> ToSwarm<T, W> {
102102
self.sleeper = None;
103-
self.state = DialState::Dialing;
103+
let opts = DialOpts::peer_id(self.peer_id)
104+
.addresses(self.addresses.clone())
105+
.condition(PeerCondition::DisconnectedAndNotDialing)
106+
.build();
107+
self.state = DialState::Dialing(opts.connection_id());
104108
debug!(?self.peer_id, addresses = ?self.addresses, "Dialing peer");
105-
ToSwarm::Dial {
106-
opts: DialOpts::peer_id(self.peer_id)
107-
.addresses(self.addresses.clone())
108-
.condition(PeerCondition::DisconnectedAndNotDialing)
109-
.build(),
110-
}
109+
ToSwarm::Dial { opts }
111110
}
112111
}
113112

@@ -122,7 +121,7 @@ impl Stream for DialPeerStream {
122121

123122
match self.state {
124123
DialState::Done => Poll::Ready(None),
125-
DialState::Dialing => Poll::Pending,
124+
DialState::Dialing(_) => Poll::Pending,
126125
DialState::PendingDial => {
127126
let now = Instant::now();
128127
if self.next_dial_time <= now {

crates/apollo_network/src/discovery/discovery_test.rs

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ impl Stream for Behaviour {
9696
// In case we have a bug when we return pending and then return an event.
9797
const TIMES_TO_CHECK_FOR_PENDING_EVENT: usize = 5;
9898

99+
async fn expect_dial_from_peer(
100+
behaviour: &mut Behaviour,
101+
expected_peer_id: PeerId,
102+
) -> ConnectionId {
103+
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
104+
let ToSwarm::Dial { opts } = event else {
105+
panic!("Expected Dial event");
106+
};
107+
assert_eq!(opts.get_peer_id(), Some(expected_peer_id));
108+
opts.connection_id()
109+
}
110+
99111
fn assert_no_event(behaviour: &mut Behaviour) {
100112
for _ in 0..TIMES_TO_CHECK_FOR_PENDING_EVENT {
101113
assert!(behaviour.next().now_or_never().is_none());
@@ -150,16 +162,12 @@ async fn discovery_redials_on_dial_failure(
150162
vec![(bootstrap_peer_id, bootstrap_peer_address.clone())],
151163
);
152164

153-
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
154-
assert_matches!(
155-
event,
156-
ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id)
157-
);
165+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, bootstrap_peer_id).await;
158166

159167
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
160168
peer_id: Some(bootstrap_peer_id),
161169
error: &DialError::Aborted,
162-
connection_id: ConnectionId::new_unchecked(0),
170+
connection_id: dial_connection_id,
163171
}));
164172

165173
let event = check_event_happens_after_given_duration(
@@ -173,6 +181,34 @@ async fn discovery_redials_on_dial_failure(
173181
);
174182
}
175183

184+
#[rstest::rstest]
185+
#[tokio::test]
186+
async fn discovery_ignores_dial_failure_from_other_connection_id(
187+
#[values(CONFIG_WITH_LARGE_HEARTBEAT_AND_SMALL_BOOTSTRAP_SLEEP)] config: DiscoveryConfig,
188+
bootstrap_peer_id: PeerId,
189+
bootstrap_peer_address: Multiaddr,
190+
) {
191+
let mut behaviour = Behaviour::new(
192+
dummy_local_peer_id(),
193+
config.clone(),
194+
vec![(bootstrap_peer_id, bootstrap_peer_address.clone())],
195+
);
196+
197+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, bootstrap_peer_id).await;
198+
199+
// Send a dial failure with a different connection id — should be ignored.
200+
let other_connection_id =
201+
ConnectionId::new_unchecked(format!("{dial_connection_id}").parse::<usize>().unwrap() + 1);
202+
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
203+
peer_id: Some(bootstrap_peer_id),
204+
error: &DialError::Aborted,
205+
connection_id: other_connection_id,
206+
}));
207+
208+
// No retry should be scheduled since the failure was for a different connection.
209+
assert_no_event(&mut behaviour);
210+
}
211+
176212
#[rstest::rstest]
177213
#[tokio::test]
178214
async fn discovery_redials_when_all_connections_closed(
@@ -328,13 +364,13 @@ async fn discovery_performs_queries_even_if_not_connected_to_bootstrap_peer(
328364
);
329365

330366
// Consume the initial dial event.
331-
timeout(TIMEOUT, behaviour.next()).await.unwrap();
367+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, bootstrap_peer_id).await;
332368

333369
// Simulate dial failure.
334370
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
335371
peer_id: Some(bootstrap_peer_id),
336372
error: &DialError::Aborted,
337-
connection_id: ConnectionId::new_unchecked(0),
373+
connection_id: dial_connection_id,
338374
}));
339375

340376
// Set a peer to request so the heartbeat has something to query.
@@ -386,17 +422,13 @@ async fn set_target_peers_cancels_dials_for_removed_peers(
386422
behaviour.kad_requesting.handle_kad_response(&[(peer_a, vec![peer_a_address])]);
387423

388424
// Poll to trigger dial for peer_a.
389-
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
390-
assert_matches!(
391-
event,
392-
ToSwarm::Dial { opts } if opts.get_peer_id() == Some(peer_a)
393-
);
425+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, peer_a).await;
394426

395427
// Simulate dial failure — DialPeerStream schedules retry after some time.
396428
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
397429
peer_id: Some(peer_a),
398430
error: &DialError::Aborted,
399-
connection_id: ConnectionId::new_unchecked(0),
431+
connection_id: dial_connection_id,
400432
}));
401433

402434
// Remove peer_a from target set — this cancels its DialPeerStream.
@@ -426,17 +458,13 @@ async fn set_target_peers_does_not_cancel_bootstrap_dials(
426458
);
427459

428460
// Consume the initial Dial event for bootstrap peer.
429-
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
430-
assert_matches!(
431-
event,
432-
ToSwarm::Dial { opts } if opts.get_peer_id() == Some(bootstrap_peer_id)
433-
);
461+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, bootstrap_peer_id).await;
434462

435463
// Simulate dial failure — DialPeerStream schedules retry after some time.
436464
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
437465
peer_id: Some(bootstrap_peer_id),
438466
error: &DialError::Aborted,
439-
connection_id: ConnectionId::new_unchecked(0),
467+
connection_id: dial_connection_id,
440468
}));
441469

442470
// Set empty target peers — bootstrap peer was never in the target set, so its dial persists.
@@ -473,17 +501,13 @@ async fn set_target_peers_cancels_bootstrap_dial_when_bootstrap_peer_overlaps_wi
473501
);
474502

475503
// Consume the initial Dial event for bootstrap peer.
476-
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
477-
assert_matches!(
478-
event,
479-
ToSwarm::Dial { opts } if opts.get_peer_id() == Some(bootstrap_peer_id)
480-
);
504+
let dial_connection_id = expect_dial_from_peer(&mut behaviour, bootstrap_peer_id).await;
481505

482506
// Simulate dial failure — DialPeerStream schedules retry after some time.
483507
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
484508
peer_id: Some(bootstrap_peer_id),
485509
error: &DialError::Aborted,
486-
connection_id: ConnectionId::new_unchecked(0),
510+
connection_id: dial_connection_id,
487511
}));
488512

489513
// Add bootstrap peer to the target set, then remove it.

0 commit comments

Comments
 (0)