Skip to content

Commit 3c7dc6d

Browse files
authored
feat(p2p): add force direct connections (#260)
* feat: add quic upgrade * feat: add comment about peer addresses * fix: linter * fix: clean up tests * fix: linter * feat: add force direct connections * fix: linter * fix: review comments * fix: review comments * fix: review comments
1 parent 85b9aa9 commit 3c7dc6d

3 files changed

Lines changed: 288 additions & 0 deletions

File tree

crates/p2p/src/force_direct.rs

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
//! Force direct connection behaviour.
2+
3+
use std::{
4+
collections::{HashSet, VecDeque},
5+
convert::Infallible,
6+
task::{Context, Poll},
7+
};
8+
9+
use libp2p::{
10+
Multiaddr, PeerId,
11+
swarm::{
12+
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, ToSwarm,
13+
behaviour::ConnectionEstablished,
14+
dial_opts::{DialOpts, PeerCondition},
15+
dummy,
16+
},
17+
};
18+
use std::time::Duration;
19+
use tokio::time::Interval;
20+
use tracing::{debug, warn};
21+
22+
use crate::{name::peer_name, p2p_context::P2PContext, utils};
23+
24+
const FORCE_DIRECT_INTERVAL: Duration = Duration::from_secs(60);
25+
26+
/// Force direct connection behaviour.
27+
pub struct ForceDirectBehaviour {
28+
/// P2P context for accessing peer store and known peers.
29+
p2p_context: P2PContext,
30+
31+
/// Local peer ID (to skip self).
32+
local_peer_id: PeerId,
33+
34+
/// Pending events to emit.
35+
pending_events: VecDeque<ToSwarm<ForceDirectEvent, Infallible>>,
36+
37+
/// Pending forcings to emit.
38+
pending_forcings: HashSet<PeerId>,
39+
40+
/// Interval timer for running force direct logic periodically.
41+
ticker: Interval,
42+
}
43+
44+
impl std::fmt::Debug for ForceDirectBehaviour {
45+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46+
f.debug_struct("ForceDirectBehaviour")
47+
.field("p2p_context", &self.p2p_context)
48+
.field("local_peer_id", &self.local_peer_id)
49+
.field("pending_events", &self.pending_events.len())
50+
.field("ticker", &"<Interval>")
51+
.finish()
52+
}
53+
}
54+
55+
/// Events emitted by the force direct behaviour.
56+
#[derive(Debug, Clone)]
57+
pub enum ForceDirectEvent {
58+
/// Force direct connection to a peer.
59+
ForceDirectSuccess {
60+
/// The peer to force direct connection to.
61+
peer: PeerId,
62+
},
63+
/// Force direct connection failed.
64+
ForceDirectFailure {
65+
/// The peer to force direct connection to.
66+
peer: PeerId,
67+
/// The reason for the failure.
68+
reason: String,
69+
},
70+
}
71+
72+
impl ForceDirectBehaviour {
73+
/// Creates a new force direct behaviour.
74+
pub fn new(p2p_context: P2PContext, local_peer_id: PeerId) -> Self {
75+
let mut ticker = tokio::time::interval(FORCE_DIRECT_INTERVAL);
76+
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
77+
78+
Self {
79+
p2p_context,
80+
local_peer_id,
81+
pending_events: VecDeque::new(),
82+
ticker,
83+
pending_forcings: HashSet::new(),
84+
}
85+
}
86+
87+
/// Runs force direct connection logic for all known peers.
88+
///
89+
/// For each known peer:
90+
/// 1. Skip if it's the local peer
91+
/// 2. Skip if already attempting to force direct connection
92+
/// 3. Skip if no connections exist
93+
/// 4. Skip if any connection is not through relay
94+
/// 5. Attempt to dial direct addresses
95+
fn force_direct_connections(&mut self) {
96+
let peers = self.p2p_context.known_peers();
97+
98+
for peer in peers {
99+
if *peer == self.local_peer_id {
100+
continue;
101+
}
102+
103+
if self.pending_forcings.contains(peer) {
104+
continue;
105+
}
106+
107+
let (connections, available_addresses): (
108+
Vec<crate::p2p_context::Peer>,
109+
Option<Vec<Multiaddr>>,
110+
) = {
111+
let lock = self.p2p_context.peer_store_lock();
112+
113+
(
114+
lock.connections_to_peer(peer)
115+
.into_iter()
116+
.cloned()
117+
.collect::<Vec<_>>(),
118+
lock.peer_addresses(peer)
119+
.cloned()
120+
.map(|v| v.into_iter().collect()),
121+
)
122+
};
123+
124+
if connections.is_empty() {
125+
warn!(
126+
peer = %peer_name(peer),
127+
"no connections to peer"
128+
);
129+
continue;
130+
}
131+
132+
if connections
133+
.iter()
134+
.any(|c| !utils::is_relay_addr(&c.remote_addr))
135+
{
136+
debug!(
137+
peer = %peer_name(peer),
138+
"not all connections to peer are relay connections, skipping force direct"
139+
);
140+
continue;
141+
}
142+
143+
let Some(addresses) = available_addresses else {
144+
warn!(
145+
peer = %peer_name(peer),
146+
"no known addresses for peer"
147+
);
148+
continue;
149+
};
150+
151+
// Find non-relay addresses
152+
let direct_addresses: Vec<Multiaddr> = addresses
153+
.iter()
154+
.filter(|addr| utils::is_direct_addr(addr))
155+
.cloned()
156+
.collect();
157+
158+
if direct_addresses.is_empty() {
159+
warn!(
160+
peer = %peer_name(peer),
161+
"no direct addresses for peer, cannot force direct connection"
162+
);
163+
continue;
164+
}
165+
166+
debug!(
167+
peer = %peer_name(peer),
168+
direct_addresses = ?direct_addresses,
169+
"forcing direct connection to peer using {} available addresses",
170+
direct_addresses.len()
171+
);
172+
173+
self.pending_forcings.insert(*peer);
174+
175+
self.pending_events.push_back(ToSwarm::Dial {
176+
opts: DialOpts::peer_id(*peer)
177+
.addresses(direct_addresses)
178+
.condition(PeerCondition::Always)
179+
.build(),
180+
});
181+
}
182+
}
183+
184+
fn handle_connection_established(&mut self, event: ConnectionEstablished) {
185+
let addr = match &event.endpoint {
186+
libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
187+
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
188+
};
189+
190+
if self.pending_forcings.contains(&event.peer_id) && utils::is_direct_addr(addr) {
191+
self.pending_forcings.remove(&event.peer_id);
192+
self.pending_events.push_back(ToSwarm::GenerateEvent(
193+
ForceDirectEvent::ForceDirectSuccess {
194+
peer: event.peer_id,
195+
},
196+
));
197+
}
198+
}
199+
200+
fn handle_dial_failure(&mut self, peer_id: Option<PeerId>) {
201+
let Some(peer_id) = peer_id else {
202+
return;
203+
};
204+
205+
if self.pending_forcings.remove(&peer_id) {
206+
self.pending_events.push_back(ToSwarm::GenerateEvent(
207+
ForceDirectEvent::ForceDirectFailure {
208+
peer: peer_id,
209+
reason: "dial failed".to_string(),
210+
},
211+
));
212+
}
213+
}
214+
}
215+
216+
impl NetworkBehaviour for ForceDirectBehaviour {
217+
type ConnectionHandler = dummy::ConnectionHandler;
218+
type ToSwarm = ForceDirectEvent;
219+
220+
fn handle_established_inbound_connection(
221+
&mut self,
222+
_connection_id: ConnectionId,
223+
_peer: PeerId,
224+
_local_addr: &Multiaddr,
225+
_remote_addr: &Multiaddr,
226+
) -> Result<THandler<Self>, ConnectionDenied> {
227+
Ok(dummy::ConnectionHandler)
228+
}
229+
230+
fn handle_established_outbound_connection(
231+
&mut self,
232+
_connection_id: ConnectionId,
233+
_peer: PeerId,
234+
_addr: &Multiaddr,
235+
_role_override: libp2p::core::Endpoint,
236+
_port_use: libp2p::core::transport::PortUse,
237+
) -> Result<THandler<Self>, ConnectionDenied> {
238+
Ok(dummy::ConnectionHandler)
239+
}
240+
241+
fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
242+
match event {
243+
FromSwarm::ConnectionEstablished(event) => {
244+
self.handle_connection_established(event);
245+
}
246+
FromSwarm::DialFailure(event) => {
247+
self.handle_dial_failure(event.peer_id);
248+
}
249+
_ => {}
250+
}
251+
}
252+
253+
fn on_connection_handler_event(
254+
&mut self,
255+
_peer_id: PeerId,
256+
_connection_id: ConnectionId,
257+
_event: libp2p::swarm::THandlerOutEvent<Self>,
258+
) {
259+
// Handler emits Infallible, so this is unreachable
260+
}
261+
262+
fn poll(
263+
&mut self,
264+
cx: &mut Context<'_>,
265+
) -> std::task::Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
266+
if let Some(event) = self.pending_events.pop_front() {
267+
return Poll::Ready(event);
268+
}
269+
270+
if self.ticker.poll_tick(cx).is_ready() {
271+
self.force_direct_connections();
272+
273+
if let Some(event) = self.pending_events.pop_front() {
274+
return Poll::Ready(event);
275+
}
276+
}
277+
278+
Poll::Pending
279+
}
280+
}

crates/p2p/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,6 @@ pub mod p2p_context;
4343

4444
/// QUIC connection upgrade behaviour.
4545
pub mod quic_upgrade;
46+
47+
/// Force direct connection behaviour.
48+
pub mod force_direct;

crates/p2p/src/utils.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,8 @@ pub fn filter_direct_quic_addrs(addrs: impl Iterator<Item = Multiaddr>) -> Vec<M
194194
.filter(|a| is_quic_addr(a) && !is_relay_addr(a))
195195
.collect()
196196
}
197+
198+
/// Returns true if the multiaddr is a direct (non-relay) address.
199+
pub fn is_direct_addr(addr: &Multiaddr) -> bool {
200+
!is_relay_addr(addr)
201+
}

0 commit comments

Comments
 (0)