Skip to content

Commit 85b9aa9

Browse files
authored
feat(p2p): add upgrade to quic (#259)
* feat: add quic upgrade * feat: add comment about peer addresses * fix: linter * fix: clean up tests * fix: linter * fix: review comments * fix: review comments
1 parent e0838ac commit 85b9aa9

8 files changed

Lines changed: 794 additions & 11 deletions

File tree

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
//! QUIC Upgrade Example
2+
//!
3+
//! This example demonstrates the QUIC upgrade behaviour that automatically
4+
//! upgrades TCP connections to QUIC when both peers support it.
5+
//!
6+
//! # Running the Example
7+
//!
8+
//! Run two instances in separate terminals:
9+
//!
10+
//! ```bash
11+
//! # Terminal 1: Start the first node (note the peer ID printed)
12+
//! cargo run -p pluto-p2p --example quic_upgrade -- --port 9000
13+
//! # Output: Started node with peer ID: 16Uiu2HAmXXX...
14+
//!
15+
//! # Terminal 2: Start the second node with the first node's peer ID as known peer
16+
//! cargo run -p pluto-p2p --example quic_upgrade -- --port 9001 \
17+
//! --dial /ip4/127.0.0.1/tcp/9000 \
18+
//! --peer 16Uiu2HAmXXX...
19+
//! ```
20+
//!
21+
//! # What Happens
22+
//!
23+
//! 1. Node 2 connects to Node 1 via TCP
24+
//! 2. Both nodes exchange identify information (including their QUIC addresses)
25+
//! 3. The peer addresses are stored in the peer store
26+
//! 4. The QUIC upgrade behaviour detects the TCP connection to a known peer
27+
//! 5. After ~1 minute, it attempts to dial the peer's QUIC address
28+
//! 6. On success: the redundant TCP connection is closed
29+
//! 7. On failure: exponential backoff is applied before retrying
30+
//!
31+
//! # Note
32+
//!
33+
//! The QUIC upgrade behaviour only upgrades connections to "known peers"
34+
//! (cluster members). In production, these are configured at startup.
35+
//! For this example, use `--peer` to specify the peer ID to upgrade.
36+
//!
37+
//! # Expected Output
38+
//!
39+
//! ```text
40+
//! [CONNECTED] 16Uiu2HAm... via TCP at /ip4/127.0.0.1/tcp/9000
41+
//! [IDENTIFY] Received from 16Uiu2HAm...
42+
//! Agent: quic-upgrade-example/1.0.0
43+
//! Addresses:
44+
//! - [TCP] /ip4/127.0.0.1/tcp/9000
45+
//! - [QUIC] /ip4/127.0.0.1/udp/9000/quic-v1
46+
//! ... (after ~1 minute) ...
47+
//! [CONNECTED] 16Uiu2HAm... via QUIC at /ip4/127.0.0.1/udp/9000/quic-v1
48+
//! [QUIC UPGRADE] Successfully upgraded connection to 16Uiu2HAm...!
49+
//! [DISCONNECTED] 16Uiu2HAm... TCP connection closed (remaining: 1)
50+
//! ```
51+
52+
use std::str::FromStr;
53+
54+
use anyhow::Result;
55+
use clap::Parser;
56+
use k256::elliptic_curve::rand_core::OsRng;
57+
use libp2p::{Multiaddr, PeerId, futures::StreamExt, relay, swarm::SwarmEvent};
58+
use pluto_p2p::{
59+
behaviours::pluto::PlutoBehaviourEvent,
60+
config::P2PConfig,
61+
p2p::{Node, NodeType},
62+
quic_upgrade::QuicUpgradeEvent,
63+
};
64+
use tokio::signal;
65+
66+
/// Command line arguments.
67+
#[derive(Debug, Parser)]
68+
#[command(name = "quic_upgrade")]
69+
#[command(about = "Demonstrates QUIC upgrade behaviour")]
70+
pub struct Args {
71+
/// The port to listen on (both TCP and UDP/QUIC).
72+
#[arg(short, long, default_value = "9000")]
73+
pub port: u16,
74+
75+
/// Address to dial (e.g., /ip4/127.0.0.1/tcp/9000).
76+
#[arg(short, long)]
77+
pub dial: Option<Multiaddr>,
78+
79+
/// Known peer ID(s) to attempt QUIC upgrade for.
80+
/// The upgrade behaviour only upgrades connections to known peers.
81+
#[arg(long)]
82+
pub peer: Vec<String>,
83+
}
84+
85+
#[tokio::main]
86+
async fn main() -> Result<()> {
87+
let args = Args::parse();
88+
let key = k256::SecretKey::random(&mut OsRng);
89+
90+
// Create a config with the specified port
91+
// Note: P2PConfig requires a specific IP (not 0.0.0.0), so we use 127.0.0.1 for
92+
// local testing
93+
let config = P2PConfig {
94+
tcp_addrs: vec![format!("127.0.0.1:{}", args.port)],
95+
udp_addrs: vec![format!("127.0.0.1:{}", args.port)],
96+
..Default::default()
97+
};
98+
99+
// Parse known peer IDs from command line
100+
let known_peers: Vec<PeerId> = args
101+
.peer
102+
.iter()
103+
.filter_map(|s| PeerId::from_str(s).ok())
104+
.collect();
105+
106+
if !known_peers.is_empty() {
107+
println!("Known peers for QUIC upgrade: {:?}", known_peers);
108+
}
109+
110+
let mut node = Node::new(
111+
config,
112+
key,
113+
NodeType::QUIC, // Enable QUIC transport
114+
false, // Don't filter private addresses (for local testing)
115+
known_peers,
116+
|builder, _keypair, relay_client| {
117+
builder
118+
.with_user_agent("quic-upgrade-example/1.0.0")
119+
.with_quic_enabled(true) // Enable QUIC upgrade behaviour
120+
.with_inner(relay_client)
121+
},
122+
)?;
123+
124+
println!("Started node with peer ID: {}", node.local_peer_id());
125+
println!("Listening on TCP and QUIC port: {}", args.port);
126+
127+
// Dial the remote peer if specified
128+
if let Some(dial_addr) = &args.dial {
129+
println!("Dialing remote peer via TCP: {dial_addr}");
130+
node.dial(dial_addr.clone())?;
131+
}
132+
133+
println!("\nWaiting for events... (Ctrl+C to quit)");
134+
if args.peer.is_empty() {
135+
println!("Note: No --peer specified. QUIC upgrade only works for known peers.");
136+
println!(" Copy this node's peer ID and pass it to the other node with --peer\n");
137+
} else {
138+
println!("QUIC upgrade will be attempted ~1 minute after TCP connection is established.\n");
139+
}
140+
141+
// Event loop
142+
loop {
143+
tokio::select! {
144+
event = node.select_next_some() => {
145+
handle_event(event);
146+
}
147+
_ = signal::ctrl_c() => {
148+
println!("\nReceived Ctrl+C, shutting down...");
149+
break;
150+
}
151+
}
152+
}
153+
154+
Ok(())
155+
}
156+
157+
fn handle_event(event: SwarmEvent<PlutoBehaviourEvent<relay::client::Behaviour>>) {
158+
match event {
159+
// New listen address
160+
SwarmEvent::NewListenAddr { address, .. } => {
161+
println!("[LISTEN] {address}");
162+
}
163+
164+
// Connection established
165+
SwarmEvent::ConnectionEstablished {
166+
peer_id, endpoint, ..
167+
} => {
168+
let addr = match &endpoint {
169+
libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
170+
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
171+
};
172+
let transport = if addr.to_string().contains("quic") {
173+
"QUIC"
174+
} else {
175+
"TCP"
176+
};
177+
println!("[CONNECTED] {peer_id} via {transport} at {addr}");
178+
}
179+
180+
// Connection closed
181+
SwarmEvent::ConnectionClosed {
182+
peer_id,
183+
endpoint,
184+
num_established,
185+
..
186+
} => {
187+
let addr = match &endpoint {
188+
libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
189+
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
190+
};
191+
let transport = if addr.to_string().contains("quic") {
192+
"QUIC"
193+
} else {
194+
"TCP"
195+
};
196+
println!(
197+
"[DISCONNECTED] {peer_id} {transport} connection closed (remaining: {num_established})"
198+
);
199+
}
200+
201+
// QUIC upgrade events
202+
SwarmEvent::Behaviour(PlutoBehaviourEvent::QuicUpgrade(event)) => match event {
203+
QuicUpgradeEvent::Upgraded { peer } => {
204+
println!("[QUIC UPGRADE] Successfully upgraded connection to {peer}!");
205+
}
206+
QuicUpgradeEvent::UpgradeFailed { peer, reason } => {
207+
println!("[QUIC UPGRADE FAILED] {peer}: {reason}");
208+
}
209+
},
210+
211+
// Identify received - shows peer's addresses including QUIC
212+
SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify(
213+
libp2p::identify::Event::Received { peer_id, info, .. },
214+
)) => {
215+
println!("[IDENTIFY] Received from {peer_id}");
216+
println!(" Agent: {}", info.agent_version);
217+
println!(" Addresses:");
218+
for addr in &info.listen_addrs {
219+
let transport = if addr.to_string().contains("quic") {
220+
"QUIC"
221+
} else if addr.to_string().contains("tcp") {
222+
"TCP"
223+
} else {
224+
"other"
225+
};
226+
println!(" - [{transport}] {addr}");
227+
}
228+
}
229+
230+
// Ping events
231+
SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(event)) => {
232+
if let Ok(rtt) = event.result {
233+
println!("[PING] {} RTT: {:?}", event.peer, rtt);
234+
}
235+
}
236+
237+
// Connection errors
238+
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
239+
println!("[ERROR] Outgoing connection to {peer_id:?}: {error}");
240+
}
241+
SwarmEvent::IncomingConnectionError { error, .. } => {
242+
println!("[ERROR] Incoming connection: {error}");
243+
}
244+
245+
// Ignore other events
246+
_ => {}
247+
}
248+
}

crates/p2p/src/behaviours/pluto.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
conn_logger::{ConnectionLoggerBehaviour, DefaultConnectionLoggerMetrics},
1313
gater::ConnGater,
1414
p2p_context::P2PContext,
15+
quic_upgrade::QuicUpgradeBehaviour,
1516
};
1617

1718
pub use super::optional::OptionalBehaviour;
@@ -39,6 +40,7 @@ pub const DEFAULT_IDENTIFY_CACHE_SIZE: usize = 100;
3940
/// - **Identify**: Exchanges peer information and supported protocols
4041
/// - **Ping**: Measures latency and keeps connections alive
4142
/// - **AutoNAT**: Detects NAT status and public reachability
43+
/// - **QUIC upgrade**: Periodically upgrades TCP connections to QUIC
4244
#[derive(NetworkBehaviour)]
4345
pub struct PlutoBehaviour<B: NetworkBehaviour> {
4446
/// Connection logger behaviour - MUST be first so peer store is updated
@@ -52,6 +54,8 @@ pub struct PlutoBehaviour<B: NetworkBehaviour> {
5254
pub ping: ping::Behaviour,
5355
/// AutoNAT behaviour for NAT detection.
5456
pub autonat: autonat::Behaviour,
57+
/// QUIC upgrade behaviour for upgrading TCP to QUIC connections.
58+
pub quic_upgrade: QuicUpgradeBehaviour,
5559
/// Inner behaviour.
5660
pub inner: OptionalBehaviour<B>,
5761
}
@@ -70,6 +74,7 @@ impl<B: NetworkBehaviour> PlutoBehaviour<B> {
7074
/// - **Identify**: Protocol and agent identification
7175
/// - **Ping**: Latency measurement and keepalive
7276
/// - **AutoNAT**: NAT traversal detection
77+
/// - **QUIC upgrade**: Periodic TCP to QUIC connection upgrades
7378
#[derive(Debug, Clone)]
7479
pub struct PlutoBehaviourBuilder<B> {
7580
// Gater config
@@ -84,6 +89,9 @@ pub struct PlutoBehaviourBuilder<B> {
8489

8590
p2p_context: P2PContext,
8691

92+
// QUIC upgrade config
93+
quic_enabled: bool,
94+
8795
// Inner behaviour
8896
inner: Option<B>,
8997
}
@@ -96,6 +104,7 @@ impl<B> Default for PlutoBehaviourBuilder<B> {
96104
user_agent: DEFAULT_USER_AGENT.clone(),
97105
autonat_config: autonat::Config::default(),
98106
p2p_context: P2PContext::default(),
107+
quic_enabled: false,
99108
inner: None,
100109
}
101110
}
@@ -166,27 +175,43 @@ impl<B: NetworkBehaviour> PlutoBehaviourBuilder<B> {
166175
self
167176
}
168177

178+
/// Sets whether QUIC is enabled.
179+
///
180+
/// When enabled, the behaviour will periodically attempt to upgrade
181+
/// TCP connections to QUIC connections.
182+
pub fn with_quic_enabled(mut self, enabled: bool) -> Self {
183+
self.quic_enabled = enabled;
184+
self
185+
}
186+
169187
/// Builds the [`PlutoBehaviour`] with the provided keypair.
170188
///
171189
/// # Arguments
172190
///
173191
/// * `key` - The keypair for this node, used for identify and autonat
174192
pub fn build(self, key: &Keypair) -> PlutoBehaviour<B> {
193+
let local_peer_id = key.public().to_peer_id();
194+
175195
let identify_config = identify::Config::new(self.identify_protocol, key.public())
176196
.with_agent_version(self.user_agent)
177197
.with_interval(DEFAULT_IDENTIFY_INTERVAL)
178198
.with_cache_size(DEFAULT_IDENTIFY_CACHE_SIZE);
179199

180200
PlutoBehaviour {
181-
conn_logger: ConnectionLoggerBehaviour::new(self.p2p_context),
201+
conn_logger: ConnectionLoggerBehaviour::new(self.p2p_context.clone()),
182202
gater: self.gater.unwrap_or_else(ConnGater::new_open_gater),
183203
identify: identify::Behaviour::new(identify_config),
184204
ping: ping::Behaviour::new(
185205
ping::Config::new()
186206
.with_interval(DEFAULT_PING_INTERVAL)
187207
.with_timeout(DEFAULT_PING_TIMEOUT),
188208
),
189-
autonat: autonat::Behaviour::new(key.public().to_peer_id(), self.autonat_config),
209+
autonat: autonat::Behaviour::new(local_peer_id, self.autonat_config),
210+
quic_upgrade: QuicUpgradeBehaviour::new(
211+
self.p2p_context,
212+
local_peer_id,
213+
self.quic_enabled,
214+
),
190215
inner: self.inner.into(),
191216
}
192217
}

crates/p2p/src/conn_logger.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,19 @@ impl<M: ConnectionLoggerMetrics + 'static> NetworkBehaviour for ConnectionLogger
219219
other_established = event.other_established,
220220
"connection established"
221221
);
222+
// Extract remote address from the endpoint
223+
let remote_addr = match &event.endpoint {
224+
libp2p::core::ConnectedPoint::Dialer { address, .. } => address.clone(),
225+
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
226+
send_back_addr.clone()
227+
}
228+
};
222229
// Update peer store - this is done here so all other behaviours
223230
// see the updated peer store immediately
224231
self.p2p_context.peer_store_write_lock().add_peer(Peer {
225232
id: event.peer_id,
226233
connection_id: event.connection_id,
234+
remote_addr,
227235
});
228236
}
229237
libp2p::swarm::FromSwarm::ConnectionClosed(event) => {
@@ -233,17 +241,21 @@ impl<M: ConnectionLoggerMetrics + 'static> NetworkBehaviour for ConnectionLogger
233241
num_established = event.remaining_established,
234242
"connection closed"
235243
);
244+
// Extract remote address from the endpoint
245+
let addr = match &event.endpoint {
246+
libp2p::core::ConnectedPoint::Dialer { address, .. } => address.clone(),
247+
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
248+
send_back_addr.clone()
249+
}
250+
};
236251
// Update peer store
237252
self.p2p_context.peer_store_write_lock().remove_peer(Peer {
238253
id: event.peer_id,
239254
connection_id: event.connection_id,
255+
remote_addr: addr.clone(),
240256
});
241257
// Decrement the connection count based on the endpoint address
242-
let addr = match &event.endpoint {
243-
libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
244-
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
245-
};
246-
self.decrement_connection(event.peer_id, addr);
258+
self.decrement_connection(event.peer_id, &addr);
247259
}
248260
_ => {}
249261
}

crates/p2p/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,6 @@ pub mod conn_logger;
4040

4141
/// Global context.
4242
pub mod p2p_context;
43+
44+
/// QUIC connection upgrade behaviour.
45+
pub mod quic_upgrade;

0 commit comments

Comments
 (0)