diff --git a/Cargo.lock b/Cargo.lock index e4b4700e..bde35f64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5764,6 +5764,7 @@ dependencies = [ "hex", "k256", "libp2p", + "pin-project", "pluto-cluster", "pluto-core", "pluto-eth2util", diff --git a/Cargo.toml b/Cargo.toml index f70094df..9ab3e1f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ aes = "0.8.4" ctr = "0.9.2" cipher = "0.4.4" pbkdf2 = "0.12.2" +pin-project = "1" sha2 = "0.10.9" scrypt = "0.11.0" unicode-normalization = "0.1.25" diff --git a/crates/cli/src/commands/relay.rs b/crates/cli/src/commands/relay.rs index ab86bc54..1c92663a 100644 --- a/crates/cli/src/commands/relay.rs +++ b/crates/cli/src/commands/relay.rs @@ -415,17 +415,29 @@ mod tests { } #[tokio::test] - #[ignore = "/metrics endpoint not implemented"] async fn serve_addr_metrics() { + let monitoring_addr = net::TcpListener::bind("127.0.0.1:0") + .await + .unwrap() + .local_addr() + .unwrap() + .to_string(); + let monitoring_url = format!("http://{monitoring_addr}/metrics"); + with_relay_server( - |_| {}, - async |cfg| { - let response = relay_server_get(cfg, "/metrics").await.unwrap(); + move |args| { + args.debug_monitoring.monitor_addr = Some(monitoring_addr); + }, + async move |_cfg| { + let response = retry_get(&monitoring_url).await.unwrap(); let body = response.text().await.unwrap(); - dbg!(&body); - - assert!(body.contains("libp2p_relaysvc_")); + assert!(body.contains("relay_p2p_connection_total")); + assert!(body.contains("relay_p2p_active_connections")); + assert!(body.contains("relay_p2p_ping_latency")); + assert!(body.contains("relay_p2p_network_sent_bytes")); + assert!(body.contains("relay_p2p_network_receive_bytes")); + assert!(body.ends_with("# EOF\n")); }, ) .await @@ -523,12 +535,11 @@ mod tests { path: &str, ) -> Result { let http_address = cfg.http_addr.unwrap(); - let request = async || { - reqwest::get(format!("http://{}{}", http_address, path)) - .await - .and_then(|r| r.error_for_status()) - }; + retry_get(&format!("http://{}{}", http_address, path)).await + } + async fn retry_get(url: &str) -> Result { + let request = async || reqwest::get(url).await.and_then(|r| r.error_for_status()); let mut backoff = backon::ExponentialBuilder::default() .with_min_delay(time::Duration::from_millis(200)) .with_max_delay(time::Duration::from_secs(2)) diff --git a/crates/dkg/src/bcast/behaviour.rs b/crates/dkg/src/bcast/behaviour.rs index de441def..45a53ed1 100644 --- a/crates/dkg/src/bcast/behaviour.rs +++ b/crates/dkg/src/bcast/behaviour.rs @@ -701,6 +701,7 @@ mod tests { NodeType::TCP, false, p2p_context.clone(), + None, move |builder, _keypair| builder.with_inner(behaviour), )?; let addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", ports[index]).parse()?; diff --git a/crates/dkg/src/exchanger.rs b/crates/dkg/src/exchanger.rs index a48f84d1..b01c477a 100644 --- a/crates/dkg/src/exchanger.rs +++ b/crates/dkg/src/exchanger.rs @@ -434,6 +434,7 @@ mod tests { NodeType::TCP, false, p2p_context, + None, move |builder, _key| builder.with_inner(behaviour), )?; diff --git a/crates/dkg/src/nodesigs.rs b/crates/dkg/src/nodesigs.rs index 322a57b1..641f384f 100644 --- a/crates/dkg/src/nodesigs.rs +++ b/crates/dkg/src/nodesigs.rs @@ -570,6 +570,7 @@ mod tests { NodeType::TCP, false, p2p_context, + None, move |builder, _| builder.with_inner(behaviour), )?; diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index e36cba00..040d77cd 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -28,6 +28,7 @@ serde.workspace = true serde_json.workspace = true pluto-core.workspace = true backon.workspace = true +pin-project.workspace = true reqwest.workspace = true unsigned-varint.workspace = true url.workspace = true diff --git a/crates/p2p/src/bandwidth.rs b/crates/p2p/src/bandwidth.rs new file mode 100644 index 00000000..a79e069f --- /dev/null +++ b/crates/p2p/src/bandwidth.rs @@ -0,0 +1,363 @@ +use std::{ + convert::TryFrom as _, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::{ + future::{MapOk, TryFutureExt as _}, + io::{IoSlice, IoSliceMut}, + prelude::*, + ready, +}; +use libp2p::{ + Multiaddr, PeerId, + core::{ + muxing::{StreamMuxer, StreamMuxerEvent}, + transport::{DialOpts, ListenerId, TransportError, TransportEvent}, + }, +}; +use vise::Counter; + +/// Per-peer bandwidth counters injected into [`PeerBandwidthTransport`]. +pub struct PeerConnectionMetrics { + /// Bytes sent to the peer. + pub sent: Counter, + /// Bytes received from the peer. + pub received: Counter, +} + +/// Factory that creates [`PeerConnectionMetrics`] for a given peer. +pub type BandwidthFactory = Arc PeerConnectionMetrics + Send + Sync>; + +/// Per-peer bandwidth tracking transport wrapper. +/// +/// Calls the supplied [`BandwidthFactory`] for every established connection and +/// records bytes through the returned [`PeerConnectionMetrics`] counters. +#[pin_project::pin_project] +pub(crate) struct PeerBandwidthTransport { + #[pin] + inner: T, + factory: BandwidthFactory, +} + +impl PeerBandwidthTransport { + pub(crate) fn new(inner: T, factory: BandwidthFactory) -> Self { + PeerBandwidthTransport { inner, factory } + } +} + +impl libp2p::core::Transport for PeerBandwidthTransport +where + T: libp2p::core::Transport, + M: StreamMuxer + Send + 'static, + M::Substream: Send + 'static, + M::Error: Send + Sync + 'static, +{ + type Dial = MapOk (PeerId, PeerMuxer) + Send>>; + type Error = T::Error; + type ListenerUpgrade = + MapOk (PeerId, PeerMuxer) + Send>>; + type Output = (PeerId, PeerMuxer); + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.inner.listen_on(id, addr) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + self.inner.remove_listener(id) + } + + fn dial( + &mut self, + addr: Multiaddr, + dial_opts: DialOpts, + ) -> Result> { + let factory = Arc::clone(&self.factory); + Ok(self + .inner + .dial(addr, dial_opts)? + .map_ok(Box::new(move |(peer_id, muxer)| { + let metrics = factory(&peer_id); + (peer_id, PeerMuxer::new(muxer, metrics)) + }))) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let factory = Arc::clone(this.factory); + match this.inner.poll(cx) { + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade, + local_addr, + send_back_addr, + }) => Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: upgrade.map_ok(Box::new(move |(peer_id, muxer)| { + let metrics = factory(&peer_id); + (peer_id, PeerMuxer::new(muxer, metrics)) + })), + local_addr, + send_back_addr, + }), + Poll::Ready(other) => { + Poll::Ready(other.map_upgrade(|_| unreachable!("case already matched"))) + } + Poll::Pending => Poll::Pending, + } + } +} + +#[pin_project::pin_project] +pub(crate) struct PeerMuxer { + #[pin] + inner: M, + metrics: PeerConnectionMetrics, +} + +impl PeerMuxer { + fn new(inner: M, metrics: PeerConnectionMetrics) -> Self { + Self { inner, metrics } + } +} + +impl StreamMuxer for PeerMuxer { + type Error = M::Error; + type Substream = PeerInstrumentedStream; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll(cx) + } + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_inbound(cx)?); + Poll::Ready(Ok(PeerInstrumentedStream { + inner, + sent: this.metrics.sent.clone(), + received: this.metrics.received.clone(), + })) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_outbound(cx)?); + Poll::Ready(Ok(PeerInstrumentedStream { + inner, + sent: this.metrics.sent.clone(), + received: this.metrics.received.clone(), + })) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} + +#[pin_project::pin_project] +pub(crate) struct PeerInstrumentedStream { + #[pin] + inner: S, + sent: Counter, + received: Counter, +} + +impl AsyncRead for PeerInstrumentedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read(cx, buf))?; + this.received + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::MAX)); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; + this.received + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::MAX)); + Poll::Ready(Ok(num_bytes)) + } +} + +impl AsyncWrite for PeerInstrumentedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write(cx, buf))?; + this.sent + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::MAX)); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; + this.sent + .inc_by(u64::try_from(num_bytes).unwrap_or(u64::MAX)); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} + +#[cfg(test)] +#[allow(clippy::arithmetic_side_effects)] +mod tests { + use super::*; + + struct MockStream { + read_data: Vec, + write_buffer: Vec, + read_pos: usize, + } + + impl MockStream { + fn new(read_data: Vec) -> Self { + Self { + read_data, + write_buffer: Vec::new(), + read_pos: 0, + } + } + } + + impl AsyncRead for MockStream { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let remaining = self.read_data.len() - self.read_pos; + let to_read = std::cmp::min(buf.len(), remaining); + if to_read > 0 { + buf[..to_read] + .copy_from_slice(&self.read_data[self.read_pos..self.read_pos + to_read]); + self.read_pos += to_read; + } + Poll::Ready(Ok(to_read)) + } + } + + impl AsyncWrite for MockStream { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.write_buffer.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + fn make_stream() -> (PeerInstrumentedStream, Counter, Counter) { + let sent = Counter::default(); + let received = Counter::default(); + let stream = PeerInstrumentedStream { + inner: MockStream::new(vec![1, 2, 3, 4, 5]), + sent: sent.clone(), + received: received.clone(), + }; + (stream, sent, received) + } + + #[test] + fn bandwidth_received() { + let (mut stream, _, received) = make_stream(); + let initial = received.get(); + + let mut buf = [0u8; 3]; + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + let _ = Pin::new(&mut stream).poll_read(&mut cx, &mut buf); + + assert_eq!(received.get(), initial + 3); + } + + #[test] + fn bandwidth_sent() { + let (mut stream, sent, _) = make_stream(); + let initial = sent.get(); + + let data = b"hello"; + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + let _ = Pin::new(&mut stream).poll_write(&mut cx, data); + + assert_eq!(sent.get(), initial + 5); + } + + #[test] + fn bandwidth_multiple_operations() { + let (mut stream, sent, received) = make_stream(); + let mut stream2 = PeerInstrumentedStream { + inner: MockStream::new(vec![1, 2, 3, 4, 5, 6, 7, 8]), + sent: sent.clone(), + received: received.clone(), + }; + + let initial_recv = received.get(); + let initial_sent = sent.get(); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + let mut buf = [0u8; 3]; + let _ = Pin::new(&mut stream2).poll_read(&mut cx, &mut buf); + + let _ = Pin::new(&mut stream).poll_write(&mut cx, b"hello"); + + let mut buf2 = [0u8; 2]; + let _ = Pin::new(&mut stream2).poll_read(&mut cx, &mut buf2); + + let _ = Pin::new(&mut stream).poll_write(&mut cx, b"test"); + + assert_eq!(received.get(), initial_recv + 5); + assert_eq!(sent.get(), initial_sent + 9); + } +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 91d7ff40..c5ede616 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -5,6 +5,10 @@ //! and communication mechanisms for validator nodes to coordinate and exchange //! information. +/// Per-peer bandwidth tracking transport wrapper. +pub(crate) mod bandwidth; +pub use bandwidth::{BandwidthFactory, PeerConnectionMetrics}; + /// Bootnode and relay resolution. pub mod bootnode; diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index 32e96024..f51ccfa5 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -121,10 +121,6 @@ fn yamux_config() -> yamux::Config { /// P2P error. #[derive(Debug, thiserror::Error)] pub enum P2PError { - /// Failed to build the swarm. - #[error("Failed to build the swarm: {0}")] - FailedToBuildSwarm(Box), - /// Failed to convert the secret key to a libp2p keypair. #[error("Failed to convert the secret key to a libp2p keypair: {0}")] FailedToConvertSecretKeyToLibp2pKeypair(#[from] k256::pkcs8::der::Error), @@ -157,12 +153,58 @@ pub enum P2PError { /// Local peer ID already bound in the shared P2P context. actual: Box, }, + + /// Failed to configure Noise encryption. + #[error("Failed to configure Noise encryption: {0}")] + FailedToConfigureNoise(Box), + + /// Failed to configure DNS transport. + #[error("Failed to configure DNS transport: {0}")] + FailedToConfigureDns(Box), + + /// Failed to configure TCP transport (includes Noise and Yamux). + #[error("Failed to configure TCP transport: {0}")] + FailedToConfigureTcp(Box), + + /// Failed to configure relay client. + #[error("Failed to configure relay client: {0}")] + FailedToConfigureRelayClient(Box), + + /// Failed to build behaviour. + #[error("Failed to build behaviour: {0}")] + FailedToBuildBehaviour(Box), } impl P2PError { - /// Failed to build the swarm. - pub fn failed_to_build_swarm(error: impl std::error::Error + Send + Sync + 'static) -> Self { - Self::FailedToBuildSwarm(Box::new(error)) + /// Failed to configure Noise encryption. + pub fn failed_to_configure_noise( + error: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self::FailedToConfigureNoise(Box::new(error)) + } + + /// Failed to configure DNS transport. + pub fn failed_to_configure_dns(error: impl std::error::Error + Send + Sync + 'static) -> Self { + Self::FailedToConfigureDns(Box::new(error)) + } + + /// Failed to configure TCP transport. + pub fn failed_to_configure_tcp(error: impl std::error::Error + Send + Sync + 'static) -> Self { + Self::FailedToConfigureTcp(Box::new(error)) + } + + /// Failed to configure relay client. + pub fn failed_to_configure_relay_client( + error: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self::FailedToConfigureRelayClient(Box::new(error)) + } + + /// Failed to build behaviour. + pub fn failed_to_build_behaviour( + error: impl std::error::Error + Send + Sync + 'static, + ) -> Self { + Self::FailedToBuildBehaviour(Box::new(error)) } } @@ -259,12 +301,21 @@ impl Node { /// The `behaviour_fn` receives a `PlutoBehaviourBuilder` and keypair. It /// should configure the builder (e.g., set user agent, inner behaviour) /// and return it. + /// + /// Pass a [`crate::BandwidthFactory`] to track per-peer bytes + /// sent/received. The factory is called once per established connection + /// and should return the appropriate [`crate::PeerConnectionMetrics`] + /// counters. Pass `None` to skip bandwidth tracking. + /// + /// Note: upstream rust-libp2p does not yet expose per-peer callbacks + /// natively; see . pub fn new_server( cfg: P2PConfig, key: k256::SecretKey, node_type: NodeType, filter_private_addrs: bool, p2p_context: P2PContext, + bandwidth: Option, behaviour_fn: F, ) -> Result where @@ -274,8 +325,10 @@ impl Node { Self::bind_local_peer_id(&p2p_context, keypair.public().to_peer_id())?; let mut node = match node_type { - NodeType::TCP => Self::build_tcp_server(keypair, p2p_context, behaviour_fn), - NodeType::QUIC => Self::build_quic_server(keypair, p2p_context, behaviour_fn), + NodeType::TCP => Self::build_tcp_server(keypair, p2p_context, bandwidth, behaviour_fn), + NodeType::QUIC => { + Self::build_quic_server(keypair, p2p_context, bandwidth, behaviour_fn) + } }?; node.apply_config(&cfg, filter_private_addrs)?; @@ -357,18 +410,18 @@ impl Node { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_tcp)? .with_quic() .with_dns() - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_dns)? .with_relay_client(noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_relay_client)? .with_behaviour(|key, relay_client| { let builder = PlutoBehaviourBuilder::new(p2p_context.clone()).with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_build_behaviour)? .with_swarm_config(utils::default_swarm_config) .build(); @@ -394,16 +447,16 @@ impl Node { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_tcp)? .with_dns() - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_dns)? .with_relay_client(noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_configure_relay_client)? .with_behaviour(|key, relay_client| { let builder = PlutoBehaviourBuilder::new(p2p_context.clone()); behaviour_fn(builder, key, relay_client).build(key) }) - .map_err(P2PError::failed_to_build_swarm)? + .map_err(P2PError::failed_to_build_behaviour)? .with_swarm_config(utils::default_swarm_config) .build(); @@ -417,26 +470,14 @@ impl Node { fn build_quic_server( keypair: Keypair, p2p_context: P2PContext, + bandwidth: Option, behaviour_fn: F, ) -> Result where F: FnOnce(PlutoBehaviourBuilder, &Keypair) -> PlutoBehaviourBuilder, { - let swarm = SwarmBuilder::with_existing_identity(keypair) - .with_tokio() - .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? - .with_quic() - .with_dns() - .map_err(P2PError::failed_to_build_swarm)? - .with_behaviour(|key| { - let builder = PlutoBehaviourBuilder::new(p2p_context.clone()); - behaviour_fn(builder, key).build(key) - }) - .map_err(P2PError::failed_to_build_swarm)? - .with_swarm_config(utils::default_swarm_config) - .build(); - + let swarm = + Self::build_server_swarm(keypair, p2p_context.clone(), bandwidth, behaviour_fn)?; Ok(Node { swarm, node_type: NodeType::QUIC, @@ -447,26 +488,14 @@ impl Node { fn build_tcp_server( keypair: Keypair, p2p_context: P2PContext, + bandwidth: Option, behaviour_fn: F, ) -> Result where F: FnOnce(PlutoBehaviourBuilder, &Keypair) -> PlutoBehaviourBuilder, { - let swarm = SwarmBuilder::with_existing_identity(keypair) - .with_tokio() - .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) - .map_err(P2PError::failed_to_build_swarm)? - .with_quic() - .with_dns() - .map_err(P2PError::failed_to_build_swarm)? - .with_behaviour(|key| { - let builder = PlutoBehaviourBuilder::new(p2p_context.clone()); - behaviour_fn(builder, key).build(key) - }) - .map_err(P2PError::failed_to_build_swarm)? - .with_swarm_config(utils::default_swarm_config) - .build(); - + let swarm = + Self::build_server_swarm(keypair, p2p_context.clone(), bandwidth, behaviour_fn)?; Ok(Node { swarm, node_type: NodeType::TCP, @@ -474,6 +503,57 @@ impl Node { }) } + fn build_server_swarm( + keypair: Keypair, + p2p_context: P2PContext, + bandwidth: Option, + behaviour_fn: F, + ) -> Result>> + where + F: FnOnce(PlutoBehaviourBuilder, &Keypair) -> PlutoBehaviourBuilder, + { + use libp2p::{ + core::{Transport as _, muxing::StreamMuxerBox, upgrade::Version}, + dns, quic, + }; + let local_peer_id = keypair.public().to_peer_id(); + + let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default()) + .upgrade(Version::V1Lazy) + .authenticate( + noise::Config::new(&keypair).map_err(P2PError::failed_to_configure_noise)?, + ) + .multiplex(yamux_config()) + .map(|(p, c), _| (p, StreamMuxerBox::new(c))); + + let quic_transport = quic::tokio::Transport::new(quic::Config::new(&keypair)) + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))); + + let combined = tcp_transport + .or_transport(quic_transport) + .map(|either, _| either.into_inner()); + + let dns = + dns::tokio::Transport::system(combined).map_err(P2PError::failed_to_configure_dns)?; + + let transport = match bandwidth { + Some(factory) => crate::bandwidth::PeerBandwidthTransport::new(dns, factory) + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) + .boxed(), + None => dns.boxed(), + }; + + let behaviour = + behaviour_fn(PlutoBehaviourBuilder::new(p2p_context), &keypair).build(&keypair); + + Ok(Swarm::new( + transport, + behaviour, + local_peer_id, + utils::default_swarm_config(libp2p::swarm::Config::with_tokio_executor()), + )) + } + /// Returns the node type. pub fn node_type(&self) -> NodeType { self.node_type diff --git a/crates/relay-server/Cargo.toml b/crates/relay-server/Cargo.toml index 5623b9eb..f508bc1b 100644 --- a/crates/relay-server/Cargo.toml +++ b/crates/relay-server/Cargo.toml @@ -12,6 +12,7 @@ bon.workspace = true futures.workspace = true libp2p.workspace = true thiserror.workspace = true +vise-exporter.workspace = true k256.workspace = true pluto-eth2util.workspace = true vise.workspace = true @@ -24,7 +25,6 @@ pluto-p2p.workspace = true pluto-core.workspace = true [dev-dependencies] -vise-exporter.workspace = true [lints] workspace = true diff --git a/crates/relay-server/src/error.rs b/crates/relay-server/src/error.rs index 9be220f5..8cd118f3 100644 --- a/crates/relay-server/src/error.rs +++ b/crates/relay-server/src/error.rs @@ -28,6 +28,10 @@ pub enum RelayP2PError { /// Failed to parse multiaddress. #[error("Failed to parse multiaddress: {0}")] FailedToParseMultiaddr(#[from] multiaddr::Error), + + /// Failed to parse monitoring address. + #[error("Failed to parse monitoring address: {0}")] + FailedToParseMonitoringAddr(String), } /// Relay P2P result. diff --git a/crates/relay-server/src/metrics.rs b/crates/relay-server/src/metrics.rs index 7b741479..136e66a9 100644 --- a/crates/relay-server/src/metrics.rs +++ b/crates/relay-server/src/metrics.rs @@ -7,27 +7,39 @@ use pluto_p2p::metrics::BUCKETS; #[metrics(prefix = "relay_p2p")] pub struct RelayMetrics { /// Total number of new connections by peer and cluster. - connection_total: Family, + pub connection_total: Family, /// Current number of active connections by peer and cluster. - active_connections: Family, + pub active_connections: Family, + + /// Ping latency by peer and cluster. + #[metrics(buckets = &BUCKETS)] + pub ping_latency: Family, /// Total number of network bytes sent to the peer and cluster. - network_sent_bytes_total: Family, + pub network_sent_bytes_total: Family, /// Total number of network bytes received from the peer and cluster. - network_received_bytes_total: Family, - - /// Ping latency by peer and cluster. - #[metrics(buckets = &BUCKETS)] - ping_latency: Family, + pub network_receive_bytes_total: Family, } /// Labels for peer with peer cluster. #[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)] pub struct PeerWithPeerClusterLabels { - peer: String, - peer_cluster: String, + /// Peer name. + pub peer: String, + /// Peer cluster identifier (empty when unknown). + pub peer_cluster: String, +} + +impl PeerWithPeerClusterLabels { + /// Creates a new label set with the given peer name and cluster. + pub fn new(peer: impl Into, peer_cluster: impl Into) -> Self { + Self { + peer: peer.into(), + peer_cluster: peer_cluster.into(), + } + } } /// Global metrics for the relay P2P layer. diff --git a/crates/relay-server/src/p2p.rs b/crates/relay-server/src/p2p.rs index 60973a41..0a5eaea2 100644 --- a/crates/relay-server/src/p2p.rs +++ b/crates/relay-server/src/p2p.rs @@ -1,10 +1,11 @@ -#![allow(missing_docs)] +//! Relay P2P node implementation. -use std::{sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use futures::StreamExt; use k256::SecretKey; -use libp2p::{relay, swarm::SwarmEvent}; +use libp2p::{PeerId, relay, swarm::SwarmEvent}; +use pluto_p2p::{behaviours::pluto::PlutoBehaviourEvent, name::peer_name}; use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, warn}; @@ -12,9 +13,12 @@ use tracing::{debug, info, instrument, warn}; use crate::{ Result, config::{Config, create_relay_config}, - web::enr_server, + error::RelayP2PError, + metrics::{PeerWithPeerClusterLabels, RELAY_METRICS}, + web::{enr_server, monitoring_server}, }; use pluto_p2p::{ + BandwidthFactory, PeerConnectionMetrics, p2p::{Node, NodeType}, p2p_context::P2PContext, }; @@ -27,6 +31,10 @@ pub async fn run_relay_p2p_node( ct: CancellationToken, ) -> Result> { let relay_config = create_relay_config(config); + let bandwidth: BandwidthFactory = std::sync::Arc::new(|peer_id| PeerConnectionMetrics { + sent: RELAY_METRICS.network_sent_bytes_total[&relay_labels(peer_id)].clone(), + received: RELAY_METRICS.network_receive_bytes_total[&relay_labels(peer_id)].clone(), + }); let mut node = Node::new_server( config.p2p_config.clone(), key.clone(), @@ -34,6 +42,7 @@ pub async fn run_relay_p2p_node( false, // Relay servers don't track cluster peers - they serve all connections. P2PContext::default(), + Some(bandwidth), |builder, keypair| { builder.with_inner(relay::Behaviour::new( keypair.public().to_peer_id(), @@ -42,12 +51,19 @@ pub async fn run_relay_p2p_node( }, )?; - // todo: change to version::log_info - info!("Pluto relay starting"); + let (git_hash, build_time) = pluto_core::version::git_commit(); + info!( + version = %*pluto_core::version::VERSION, + git_hash = %git_hash, + build_time = %build_time, + "Pluto relay starting" + ); - // todo: configure libp2p log level + for udp_addr in config.p2p_config.udp_multiaddrs()? { + debug!("Listening on UDP address {}", udp_addr); + node.listen_on(udp_addr)?; + } - // todo: monitor connections let (server_errors, mut server_errors_receiver) = mpsc::channel(3); let listeners = Arc::new(RwLock::new(Vec::new())); @@ -62,11 +78,22 @@ pub async fn run_relay_p2p_node( )); if let Some(http_addr) = config.http_addr.clone() { - info!("Runtime multiaddrs available via http at {http_addr}",); + info!("Runtime multiaddrs available via http at {http_addr}"); } else { info!("Runtime multiaddrs not available via http, since http-address flag is not set"); } + // Start monitoring server if configured + let monitoring_handle = if let Some(monitoring_addr) = config.monitoring_addr.clone() { + let bind_addr = monitoring_addr + .parse::() + .map_err(|_| RelayP2PError::FailedToParseMonitoringAddr(monitoring_addr))?; + Some(tokio::spawn(monitoring_server(bind_addr, ct.child_token()))) + } else { + info!("Prometheus monitoring not available, since monitoring-address flag is not set"); + None + }; + loop { tokio::select! { biased; @@ -81,23 +108,23 @@ pub async fn run_relay_p2p_node( } }, event = node.select_next_some() => { - // todo: handle swarm events - debug!(?event, "Swarm event"); + let address_update = handle_swarm_event(&event); - match event { - SwarmEvent::NewListenAddr { address, .. } => { - let mut listeners = listeners.write().await; - listeners.push(address); + // Update listener address list + match address_update { + AddrUpdate::Add(address) => { + listeners.write().await.push(address); } - SwarmEvent::ListenerClosed { addresses, .. } => { - let mut listeners = listeners.write().await; - listeners.retain(|addr| !addresses.contains(addr)); + AddrUpdate::Remove(address) => { + listeners.write().await.retain(|a| *a != address); } - SwarmEvent::ExpiredListenAddr { address, .. } => { - let mut listeners = listeners.write().await; - listeners.retain(|addr| *addr != address); + AddrUpdate::RemoveAll(addresses) => { + listeners + .write() + .await + .retain(|a| !addresses.contains(a)); } - _ => {} + AddrUpdate::None => {} } } } @@ -117,5 +144,143 @@ pub async fn run_relay_p2p_node( } } + if let Some(handle) = monitoring_handle { + match tokio::time::timeout(Duration::from_secs(2), handle).await { + Ok(Ok(())) => { + info!("Monitoring server shutdown complete"); + } + Ok(Err(e)) => { + warn!("Monitoring server shutdown error: {}", e); + } + Err(_) => { + warn!("Monitoring server shutdown timeout"); + } + } + } + Ok(node) } + +/// Result of a swarm event that may require updating the listener address list. +enum AddrUpdate { + /// Add an address. + Add(libp2p::Multiaddr), + /// Remove a specific address. + Remove(libp2p::Multiaddr), + /// Remove all addresses in the list. + RemoveAll(Vec), + /// No address update needed. + None, +} + +/// Handles a relay swarm event, updating metrics and logging. +/// +/// Returns an [`AddrUpdate`] describing any change to the listener address +/// list that the caller should apply. +fn handle_swarm_event(event: &SwarmEvent>) -> AddrUpdate { + match event { + // Track listener address changes + SwarmEvent::NewListenAddr { address, .. } => { + debug!(%address, "listening on new address"); + AddrUpdate::Add(address.clone()) + } + SwarmEvent::ListenerClosed { addresses, .. } => { + for address in addresses { + debug!(%address, "listener closed"); + } + AddrUpdate::RemoveAll(addresses.clone()) + } + SwarmEvent::ExpiredListenAddr { address, .. } => { + debug!(%address, "listen address expired"); + AddrUpdate::Remove(address.clone()) + } + + // Track connections for metrics + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + debug!(peer = %peer_name(peer_id), "connection established"); + let labels = relay_labels(peer_id); + RELAY_METRICS.connection_total[&labels].inc(); + RELAY_METRICS.active_connections[&labels].inc_by(1); + AddrUpdate::None + } + SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + debug!(peer = %peer_name(peer_id), cause = ?cause, "connection closed"); + let labels = relay_labels(peer_id); + RELAY_METRICS.active_connections[&labels].dec_by(1); + AddrUpdate::None + } + + // Relay-specific events + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + relay::Event::ReservationReqAccepted { + src_peer_id, + renewed, + }, + )) => { + info!(peer = %peer_name(src_peer_id), renewed, "relay reservation accepted"); + AddrUpdate::None + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(relay::Event::ReservationReqDenied { + src_peer_id, + .. + })) => { + warn!(peer = %peer_name(src_peer_id), "relay reservation denied"); + AddrUpdate::None + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(relay::Event::ReservationTimedOut { + src_peer_id, + })) => { + debug!(peer = %peer_name(src_peer_id), "relay reservation timed out"); + AddrUpdate::None + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(relay::Event::CircuitReqAccepted { + src_peer_id, + dst_peer_id, + })) => { + info!( + src = %peer_name(src_peer_id), + dst = %peer_name(dst_peer_id), + "relay circuit accepted" + ); + AddrUpdate::None + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(relay::Event::CircuitReqDenied { + src_peer_id, + dst_peer_id, + .. + })) => { + warn!( + src = %peer_name(src_peer_id), + dst = %peer_name(dst_peer_id), + "relay circuit denied" + ); + AddrUpdate::None + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(relay::Event::CircuitClosed { + src_peer_id, + dst_peer_id, + error, + })) => { + debug!( + src = %peer_name(src_peer_id), + dst = %peer_name(dst_peer_id), + error = ?error, + "relay circuit closed" + ); + AddrUpdate::None + } + SwarmEvent::ListenerError { listener_id, error } => { + warn!(?listener_id, ?error, "listener error"); + AddrUpdate::None + } + _ => AddrUpdate::None, + } +} + +/// Returns relay metric labels for a peer. +/// +/// The `peer_cluster` label is left empty since the relay server does not +/// track cluster membership. +fn relay_labels(peer_id: &PeerId) -> PeerWithPeerClusterLabels { + PeerWithPeerClusterLabels::new(peer_name(peer_id), "") +} diff --git a/crates/relay-server/src/web.rs b/crates/relay-server/src/web.rs index 215ac4d0..97576a0a 100644 --- a/crates/relay-server/src/web.rs +++ b/crates/relay-server/src/web.rs @@ -1,5 +1,5 @@ use std::{ - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, }; @@ -19,6 +19,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, warn}; +use vise_exporter::MetricsExporter; use crate::{ config::{Config, EXTERNAL_HOST_RESOLVE_INTERVAL}, @@ -142,6 +143,18 @@ pub async fn enr_server( } } +/// Starts the Prometheus monitoring server on the given address. +#[instrument(skip(ct))] +pub async fn monitoring_server(bind_addr: SocketAddr, ct: CancellationToken) { + info!("Starting monitoring server on {bind_addr}"); + + MetricsExporter::default() + .with_graceful_shutdown(ct.cancelled_owned()) + .start(bind_addr) + .await + .unwrap_or_else(|e| warn!("Monitoring server error: {e}")); +} + /// Error response for HTTP handlers. pub struct HandlerError { status: StatusCode, @@ -216,8 +229,18 @@ pub async fn enr_handler( } (tcp_ip, tcp_p, udp_p) } - (Some((ip, tcp_p)), None) => (ip, tcp_p, 9999), // Dummy UDP port - (None, Some((ip, udp_p))) => (ip, 9999, udp_p), // Dummy TCP port + (Some(_), None) => { + return Err(HandlerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "no udp address available".to_string(), + }); + } + (None, Some(_)) => { + return Err(HandlerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "no tcp address available".to_string(), + }); + } (None, None) => { return Err(HandlerError { status: StatusCode::INTERNAL_SERVER_ERROR,