|
| 1 | +//! QUIC Upgrade Example |
| 2 | +//! |
| 3 | +//! This example demonstrates the QUIC upgrade behaviour that automatically |
| 4 | +//! upgrades TCP connections to QUIC when both peers support it. |
| 5 | +//! |
| 6 | +//! # Running the Example |
| 7 | +//! |
| 8 | +//! Run two instances in separate terminals: |
| 9 | +//! |
| 10 | +//! ```bash |
| 11 | +//! # Terminal 1: Start the first node (note the peer ID printed) |
| 12 | +//! cargo run -p pluto-p2p --example quic_upgrade -- --port 9000 |
| 13 | +//! # Output: Started node with peer ID: 16Uiu2HAmXXX... |
| 14 | +//! |
| 15 | +//! # Terminal 2: Start the second node with the first node's peer ID as known peer |
| 16 | +//! cargo run -p pluto-p2p --example quic_upgrade -- --port 9001 \ |
| 17 | +//! --dial /ip4/127.0.0.1/tcp/9000 \ |
| 18 | +//! --peer 16Uiu2HAmXXX... |
| 19 | +//! ``` |
| 20 | +//! |
| 21 | +//! # What Happens |
| 22 | +//! |
| 23 | +//! 1. Node 2 connects to Node 1 via TCP |
| 24 | +//! 2. Both nodes exchange identify information (including their QUIC addresses) |
| 25 | +//! 3. The peer addresses are stored in the peer store |
| 26 | +//! 4. The QUIC upgrade behaviour detects the TCP connection to a known peer |
| 27 | +//! 5. After ~1 minute, it attempts to dial the peer's QUIC address |
| 28 | +//! 6. On success: the redundant TCP connection is closed |
| 29 | +//! 7. On failure: exponential backoff is applied before retrying |
| 30 | +//! |
| 31 | +//! # Note |
| 32 | +//! |
| 33 | +//! The QUIC upgrade behaviour only upgrades connections to "known peers" |
| 34 | +//! (cluster members). In production, these are configured at startup. |
| 35 | +//! For this example, use `--peer` to specify the peer ID to upgrade. |
| 36 | +//! |
| 37 | +//! # Expected Output |
| 38 | +//! |
| 39 | +//! ```text |
| 40 | +//! [CONNECTED] 16Uiu2HAm... via TCP at /ip4/127.0.0.1/tcp/9000 |
| 41 | +//! [IDENTIFY] Received from 16Uiu2HAm... |
| 42 | +//! Agent: quic-upgrade-example/1.0.0 |
| 43 | +//! Addresses: |
| 44 | +//! - [TCP] /ip4/127.0.0.1/tcp/9000 |
| 45 | +//! - [QUIC] /ip4/127.0.0.1/udp/9000/quic-v1 |
| 46 | +//! ... (after ~1 minute) ... |
| 47 | +//! [CONNECTED] 16Uiu2HAm... via QUIC at /ip4/127.0.0.1/udp/9000/quic-v1 |
| 48 | +//! [QUIC UPGRADE] Successfully upgraded connection to 16Uiu2HAm...! |
| 49 | +//! [DISCONNECTED] 16Uiu2HAm... TCP connection closed (remaining: 1) |
| 50 | +//! ``` |
| 51 | +
|
| 52 | +use std::str::FromStr; |
| 53 | + |
| 54 | +use anyhow::Result; |
| 55 | +use clap::Parser; |
| 56 | +use k256::elliptic_curve::rand_core::OsRng; |
| 57 | +use libp2p::{Multiaddr, PeerId, futures::StreamExt, relay, swarm::SwarmEvent}; |
| 58 | +use pluto_p2p::{ |
| 59 | + behaviours::pluto::PlutoBehaviourEvent, |
| 60 | + config::P2PConfig, |
| 61 | + p2p::{Node, NodeType}, |
| 62 | + quic_upgrade::QuicUpgradeEvent, |
| 63 | +}; |
| 64 | +use tokio::signal; |
| 65 | + |
| 66 | +/// Command line arguments. |
| 67 | +#[derive(Debug, Parser)] |
| 68 | +#[command(name = "quic_upgrade")] |
| 69 | +#[command(about = "Demonstrates QUIC upgrade behaviour")] |
| 70 | +pub struct Args { |
| 71 | + /// The port to listen on (both TCP and UDP/QUIC). |
| 72 | + #[arg(short, long, default_value = "9000")] |
| 73 | + pub port: u16, |
| 74 | + |
| 75 | + /// Address to dial (e.g., /ip4/127.0.0.1/tcp/9000). |
| 76 | + #[arg(short, long)] |
| 77 | + pub dial: Option<Multiaddr>, |
| 78 | + |
| 79 | + /// Known peer ID(s) to attempt QUIC upgrade for. |
| 80 | + /// The upgrade behaviour only upgrades connections to known peers. |
| 81 | + #[arg(long)] |
| 82 | + pub peer: Vec<String>, |
| 83 | +} |
| 84 | + |
| 85 | +#[tokio::main] |
| 86 | +async fn main() -> Result<()> { |
| 87 | + let args = Args::parse(); |
| 88 | + let key = k256::SecretKey::random(&mut OsRng); |
| 89 | + |
| 90 | + // Create a config with the specified port |
| 91 | + // Note: P2PConfig requires a specific IP (not 0.0.0.0), so we use 127.0.0.1 for |
| 92 | + // local testing |
| 93 | + let config = P2PConfig { |
| 94 | + tcp_addrs: vec![format!("127.0.0.1:{}", args.port)], |
| 95 | + udp_addrs: vec![format!("127.0.0.1:{}", args.port)], |
| 96 | + ..Default::default() |
| 97 | + }; |
| 98 | + |
| 99 | + // Parse known peer IDs from command line |
| 100 | + let known_peers: Vec<PeerId> = args |
| 101 | + .peer |
| 102 | + .iter() |
| 103 | + .filter_map(|s| PeerId::from_str(s).ok()) |
| 104 | + .collect(); |
| 105 | + |
| 106 | + if !known_peers.is_empty() { |
| 107 | + println!("Known peers for QUIC upgrade: {:?}", known_peers); |
| 108 | + } |
| 109 | + |
| 110 | + let mut node = Node::new( |
| 111 | + config, |
| 112 | + key, |
| 113 | + NodeType::QUIC, // Enable QUIC transport |
| 114 | + false, // Don't filter private addresses (for local testing) |
| 115 | + known_peers, |
| 116 | + |builder, _keypair, relay_client| { |
| 117 | + builder |
| 118 | + .with_user_agent("quic-upgrade-example/1.0.0") |
| 119 | + .with_quic_enabled(true) // Enable QUIC upgrade behaviour |
| 120 | + .with_inner(relay_client) |
| 121 | + }, |
| 122 | + )?; |
| 123 | + |
| 124 | + println!("Started node with peer ID: {}", node.local_peer_id()); |
| 125 | + println!("Listening on TCP and QUIC port: {}", args.port); |
| 126 | + |
| 127 | + // Dial the remote peer if specified |
| 128 | + if let Some(dial_addr) = &args.dial { |
| 129 | + println!("Dialing remote peer via TCP: {dial_addr}"); |
| 130 | + node.dial(dial_addr.clone())?; |
| 131 | + } |
| 132 | + |
| 133 | + println!("\nWaiting for events... (Ctrl+C to quit)"); |
| 134 | + if args.peer.is_empty() { |
| 135 | + println!("Note: No --peer specified. QUIC upgrade only works for known peers."); |
| 136 | + println!(" Copy this node's peer ID and pass it to the other node with --peer\n"); |
| 137 | + } else { |
| 138 | + println!("QUIC upgrade will be attempted ~1 minute after TCP connection is established.\n"); |
| 139 | + } |
| 140 | + |
| 141 | + // Event loop |
| 142 | + loop { |
| 143 | + tokio::select! { |
| 144 | + event = node.select_next_some() => { |
| 145 | + handle_event(event); |
| 146 | + } |
| 147 | + _ = signal::ctrl_c() => { |
| 148 | + println!("\nReceived Ctrl+C, shutting down..."); |
| 149 | + break; |
| 150 | + } |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + Ok(()) |
| 155 | +} |
| 156 | + |
| 157 | +fn handle_event(event: SwarmEvent<PlutoBehaviourEvent<relay::client::Behaviour>>) { |
| 158 | + match event { |
| 159 | + // New listen address |
| 160 | + SwarmEvent::NewListenAddr { address, .. } => { |
| 161 | + println!("[LISTEN] {address}"); |
| 162 | + } |
| 163 | + |
| 164 | + // Connection established |
| 165 | + SwarmEvent::ConnectionEstablished { |
| 166 | + peer_id, endpoint, .. |
| 167 | + } => { |
| 168 | + let addr = match &endpoint { |
| 169 | + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, |
| 170 | + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, |
| 171 | + }; |
| 172 | + let transport = if addr.to_string().contains("quic") { |
| 173 | + "QUIC" |
| 174 | + } else { |
| 175 | + "TCP" |
| 176 | + }; |
| 177 | + println!("[CONNECTED] {peer_id} via {transport} at {addr}"); |
| 178 | + } |
| 179 | + |
| 180 | + // Connection closed |
| 181 | + SwarmEvent::ConnectionClosed { |
| 182 | + peer_id, |
| 183 | + endpoint, |
| 184 | + num_established, |
| 185 | + .. |
| 186 | + } => { |
| 187 | + let addr = match &endpoint { |
| 188 | + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, |
| 189 | + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, |
| 190 | + }; |
| 191 | + let transport = if addr.to_string().contains("quic") { |
| 192 | + "QUIC" |
| 193 | + } else { |
| 194 | + "TCP" |
| 195 | + }; |
| 196 | + println!( |
| 197 | + "[DISCONNECTED] {peer_id} {transport} connection closed (remaining: {num_established})" |
| 198 | + ); |
| 199 | + } |
| 200 | + |
| 201 | + // QUIC upgrade events |
| 202 | + SwarmEvent::Behaviour(PlutoBehaviourEvent::QuicUpgrade(event)) => match event { |
| 203 | + QuicUpgradeEvent::Upgraded { peer } => { |
| 204 | + println!("[QUIC UPGRADE] Successfully upgraded connection to {peer}!"); |
| 205 | + } |
| 206 | + QuicUpgradeEvent::UpgradeFailed { peer, reason } => { |
| 207 | + println!("[QUIC UPGRADE FAILED] {peer}: {reason}"); |
| 208 | + } |
| 209 | + }, |
| 210 | + |
| 211 | + // Identify received - shows peer's addresses including QUIC |
| 212 | + SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify( |
| 213 | + libp2p::identify::Event::Received { peer_id, info, .. }, |
| 214 | + )) => { |
| 215 | + println!("[IDENTIFY] Received from {peer_id}"); |
| 216 | + println!(" Agent: {}", info.agent_version); |
| 217 | + println!(" Addresses:"); |
| 218 | + for addr in &info.listen_addrs { |
| 219 | + let transport = if addr.to_string().contains("quic") { |
| 220 | + "QUIC" |
| 221 | + } else if addr.to_string().contains("tcp") { |
| 222 | + "TCP" |
| 223 | + } else { |
| 224 | + "other" |
| 225 | + }; |
| 226 | + println!(" - [{transport}] {addr}"); |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | + // Ping events |
| 231 | + SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(event)) => { |
| 232 | + if let Ok(rtt) = event.result { |
| 233 | + println!("[PING] {} RTT: {:?}", event.peer, rtt); |
| 234 | + } |
| 235 | + } |
| 236 | + |
| 237 | + // Connection errors |
| 238 | + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { |
| 239 | + println!("[ERROR] Outgoing connection to {peer_id:?}: {error}"); |
| 240 | + } |
| 241 | + SwarmEvent::IncomingConnectionError { error, .. } => { |
| 242 | + println!("[ERROR] Incoming connection: {error}"); |
| 243 | + } |
| 244 | + |
| 245 | + // Ignore other events |
| 246 | + _ => {} |
| 247 | + } |
| 248 | +} |
0 commit comments