Skip to content

Commit 6538866

Browse files
committed
feat: add validation for peerinfo
1 parent 7f7ab4e commit 6538866

9 files changed

Lines changed: 337 additions & 74 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ tracing-subscriber = { version = "0.3.9", features = ["env-filter"] }
6161
tracing-loki = "0.2.6"
6262
vise = "0.3.2"
6363
vise-exporter = "0.3.2"
64+
parking_lot = "0.12.2"
6465

6566
# Crates in the workspace
6667
charon = { path = "crates/charon" }

crates/peerinfo/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ tracing.workspace = true
1818
chrono.workspace = true
1919
unsigned-varint = { version = "0.8", features = ["futures"] }
2020
vise.workspace = true
21+
charon-core.workspace = true
22+
charon-p2p.workspace = true
23+
parking_lot.workspace = true
2124

2225
[build-dependencies]
2326
charon-build-proto.workspace = true
@@ -28,6 +31,7 @@ clap.workspace = true
2831
tokio.workspace = true
2932
tracing-subscriber.workspace = true
3033
hex.workspace = true
34+
vise-exporter.workspace = true
3135

3236
[lints]
3337
workspace = true

crates/peerinfo/examples/peerinfo.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//! Terminal 1: `cargo run --example peerinfo -p charon-peerinfo -- --port 4001`
1414
//! Terminal 2: `cargo run --example peerinfo -p charon-peerinfo -- --port 4002`
1515
#![allow(missing_docs)]
16-
use std::time::Duration;
16+
use std::{net::SocketAddr, time::Duration};
1717

1818
use charon_peerinfo::{Behaviour, Config, Event, LocalPeerInfo};
1919
use clap::Parser;
@@ -26,6 +26,7 @@ use libp2p::{
2626
};
2727
use tokio::signal;
2828
use tracing_subscriber::EnvFilter;
29+
use vise_exporter::MetricsExporter;
2930

3031
/// Command line arguments
3132
#[derive(Debug, Parser)]
@@ -73,9 +74,7 @@ fn build_swarm(
7374
)?
7475
.with_behaviour(|key| {
7576
Ok(CombinedBehaviour {
76-
peer_info: Behaviour::new(
77-
Config::new(peerinfo_config, key.public().to_peer_id()).with_interval(interval),
78-
),
77+
peer_info: Behaviour::new(Config::new(peerinfo_config).with_interval(interval)),
7978
identify: identify::Behaviour::new(identify::Config::new(
8079
"/peerinfo-example/1.0.0".to_string(),
8180
key.public(),
@@ -175,6 +174,20 @@ async fn main() -> anyhow::Result<()> {
175174
.with_env_filter(EnvFilter::from_default_env().add_directive("debug".parse()?))
176175
.init();
177176

177+
// Run the metrics exporter
178+
let bind_address = SocketAddr::from(([0, 0, 0, 0], 9465));
179+
180+
let exporter = MetricsExporter::default()
181+
.bind(bind_address)
182+
.await
183+
.expect("Failed to bind metrics exporter");
184+
tokio::spawn(async move {
185+
exporter
186+
.start()
187+
.await
188+
.expect("Failed to start metrics exporter");
189+
});
190+
178191
let args = Args::parse();
179192

180193
// Create local peer info

crates/peerinfo/src/behaviour.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ impl Behaviour {
7777
chrono::Utc::now().timestamp()
7878
};
7979

80-
let started_at = started_at as f64;
81-
8280
PEERINFO_METRICS.start_time_secs[&name].set(started_at);
8381

8482
if config.local_info().builder_api_enabled {
@@ -87,6 +85,11 @@ impl Behaviour {
8785
PEERINFO_METRICS.builder_api_enabled[&name].set(0);
8886
}
8987

88+
for (idx, peer) in config.peers().iter().enumerate() {
89+
let peer_name = charon_p2p::name::peer_name(peer);
90+
PEERINFO_METRICS.index[&peer_name].set(idx);
91+
}
92+
9093
Self {
9194
config,
9295
events: VecDeque::new(),
@@ -106,22 +109,22 @@ impl NetworkBehaviour for Behaviour {
106109
fn handle_established_inbound_connection(
107110
&mut self,
108111
_connection_id: ConnectionId,
109-
_peer: PeerId,
112+
peer: PeerId,
110113
_local_addr: &Multiaddr,
111114
_remote_addr: &Multiaddr,
112115
) -> Result<THandler<Self>, ConnectionDenied> {
113-
Ok(Handler::new(self.config.clone()))
116+
Ok(Handler::new(self.config.clone(), peer))
114117
}
115118

116119
fn handle_established_outbound_connection(
117120
&mut self,
118121
_connection_id: ConnectionId,
119-
_peer: PeerId,
122+
peer: PeerId,
120123
_addr: &Multiaddr,
121124
_role_override: libp2p::core::Endpoint,
122125
_port_use: libp2p::core::transport::PortUse,
123126
) -> Result<THandler<Self>, ConnectionDenied> {
124-
Ok(Handler::new(self.config.clone()))
127+
Ok(Handler::new(self.config.clone(), peer))
125128
}
126129

127130
fn on_swarm_event(&mut self, _event: FromSwarm) {

crates/peerinfo/src/config.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ pub struct Config {
2222
interval: Duration,
2323
/// Local peer info to send to other peers.
2424
local_info: LocalPeerInfo,
25-
/// Peer ID of the local peer.
26-
peer_id: PeerId,
25+
/// Known peers.
26+
peers: Vec<PeerId>,
2727
}
2828

2929
/// Local peer information to be shared with other peers.
@@ -74,7 +74,7 @@ impl LocalPeerInfo {
7474
git_hash: self.git_hash.clone(),
7575
sent_at: Some(Timestamp {
7676
seconds: now.timestamp(),
77-
nanos: 0,
77+
nanos: i32::try_from(now.timestamp_subsec_nanos()).unwrap_or(0),
7878
}),
7979
started_at: self.started_at,
8080
builder_api_enabled: self.builder_api_enabled,
@@ -94,12 +94,12 @@ impl Config {
9494
/// * A peer info request is sent every 60 seconds on a healthy connection.
9595
/// * Every request must yield a response within 20 seconds to be
9696
/// successful.
97-
pub fn new(local_info: LocalPeerInfo, peer_id: PeerId) -> Self {
97+
pub fn new(local_info: LocalPeerInfo) -> Self {
9898
Self {
9999
timeout: DEFAULT_TIMEOUT,
100100
interval: DEFAULT_INTERVAL,
101101
local_info,
102-
peer_id,
102+
peers: Vec::new(),
103103
}
104104
}
105105

@@ -121,9 +121,9 @@ impl Config {
121121
self
122122
}
123123

124-
/// Sets the peer ID.
125-
pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
126-
self.peer_id = peer_id;
124+
/// Sets the known peers.
125+
pub fn with_peers(mut self, peers: Vec<PeerId>) -> Self {
126+
self.peers = peers;
127127
self
128128
}
129129

@@ -132,11 +132,6 @@ impl Config {
132132
&self.local_info
133133
}
134134

135-
/// Returns the peer ID.
136-
pub fn peer_id(&self) -> &PeerId {
137-
&self.peer_id
138-
}
139-
140135
/// Returns the timeout.
141136
pub fn timeout(&self) -> Duration {
142137
self.timeout
@@ -146,4 +141,9 @@ impl Config {
146141
pub fn interval(&self) -> Duration {
147142
self.interval
148143
}
144+
145+
/// Returns the known peers.
146+
pub fn peers(&self) -> &[PeerId] {
147+
&self.peers
148+
}
149149
}

crates/peerinfo/src/handler.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
use std::{
99
collections::VecDeque,
1010
convert::Infallible,
11+
sync::Arc,
1112
task::{Context, Poll},
1213
time::Duration,
1314
};
1415

1516
use futures::{future::BoxFuture, prelude::*};
1617
use futures_timer::Delay;
1718
use libp2p::{
19+
PeerId,
1820
core::upgrade::ReadyUpgrade,
1921
swarm::{
2022
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
@@ -26,7 +28,8 @@ use libp2p::{
2628
};
2729

2830
use crate::{
29-
PROTOCOL_NAME, config::Config, failure::Failure, peerinfopb::v1::peerinfo::PeerInfo, protocol,
31+
PROTOCOL_NAME, config::Config, failure::Failure, peerinfopb::v1::peerinfo::PeerInfo,
32+
protocol::ProtocolState,
3033
};
3134

3235
/// Result of a successful peer info exchange.
@@ -55,6 +58,8 @@ pub struct Handler {
5558
inbound: Option<InboundFuture>,
5659
/// Tracks the state of our handler.
5760
state: State,
61+
/// The protocol state.
62+
protocol: Arc<ProtocolState>,
5863
}
5964

6065
/// Tracks the state of the handler.
@@ -71,8 +76,9 @@ enum State {
7176

7277
impl Handler {
7378
/// Builds a new [`Handler`] with the given configuration.
74-
pub fn new(config: Config) -> Self {
79+
pub fn new(config: Config, peer: PeerId) -> Self {
7580
let interval = config.interval();
81+
let local_info = config.local_info().clone();
7682
Handler {
7783
config,
7884
interval: Delay::new(interval),
@@ -81,6 +87,7 @@ impl Handler {
8187
outbound: None,
8288
inbound: None,
8389
state: State::Active,
90+
protocol: Arc::new(ProtocolState::new(peer, local_info)),
8491
}
8592
}
8693

@@ -158,8 +165,14 @@ impl ConnectionHandler for Handler {
158165
}
159166
Poll::Ready(Ok((stream, _request))) => {
160167
tracing::trace!("Answered inbound peerinfo request from peer");
161-
self.inbound =
162-
Some(recv_peer_info(stream, self.config.local_info().to_proto()).boxed());
168+
self.inbound = Some(
169+
recv_peer_info(
170+
self.protocol.clone(),
171+
stream,
172+
self.config.local_info().to_proto(),
173+
)
174+
.boxed(),
175+
);
163176
}
164177
}
165178
}
@@ -207,6 +220,7 @@ impl ConnectionHandler for Handler {
207220
Poll::Ready(_) => {
208221
self.outbound = Some(OutboundState::Request(
209222
send_peer_info(
223+
self.protocol.clone(),
210224
stream,
211225
self.config.local_info().to_proto(),
212226
self.config.timeout(),
@@ -246,7 +260,8 @@ impl ConnectionHandler for Handler {
246260
}) => {
247261
stream.ignore_for_keep_alive();
248262
let local_info = self.config.local_info().to_proto();
249-
self.inbound = Some(recv_peer_info(stream, local_info).boxed());
263+
self.inbound =
264+
Some(recv_peer_info(self.protocol.clone(), stream, local_info).boxed());
250265
}
251266
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
252267
protocol: mut stream,
@@ -256,7 +271,13 @@ impl ConnectionHandler for Handler {
256271
self.interval.reset(Duration::new(0, 0));
257272
let request = self.config.local_info().to_proto();
258273
self.outbound = Some(OutboundState::Request(
259-
send_peer_info(stream, request, self.config.timeout()).boxed(),
274+
send_peer_info(
275+
self.protocol.clone(),
276+
stream,
277+
request,
278+
self.config.timeout(),
279+
)
280+
.boxed(),
260281
));
261282
}
262283
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
@@ -282,11 +303,12 @@ enum OutboundState {
282303

283304
/// A wrapper around [`protocol::send_peer_info`] that enforces a timeout.
284305
async fn send_peer_info(
306+
protocol: Arc<ProtocolState>,
285307
stream: Stream,
286308
request: PeerInfo,
287309
timeout: Duration,
288310
) -> Result<(Stream, PeerInfo), Failure> {
289-
let send = protocol::send_peer_info(stream, &request);
311+
let send = protocol.send_peer_info(stream, &request);
290312
futures::pin_mut!(send);
291313

292314
match future::select(send, Delay::new(timeout)).await {
@@ -299,8 +321,9 @@ async fn send_peer_info(
299321
/// A wrapper around [`protocol::recv_peer_info`] that returns only the stream
300322
/// and request (for use in inbound handling).
301323
async fn recv_peer_info(
324+
protocol: Arc<ProtocolState>,
302325
stream: Stream,
303326
local_info: PeerInfo,
304327
) -> Result<(Stream, PeerInfo), std::io::Error> {
305-
protocol::recv_peer_info(stream, &local_info).await
328+
protocol.recv_peer_info(stream, &local_info).await
306329
}

crates/peerinfo/src/metrics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use vise::*;
88
pub struct PeerInfoMetrics {
99
/// Peer clock offset in seconds.
1010
#[metrics(labels = ["peer"])]
11-
pub clock_offset_seconds: LabeledFamily<String, Gauge<f64>>,
11+
pub clock_offset_seconds: LabeledFamily<String, Gauge<i64>>,
1212

1313
/// Constant gauge with version label set to peer's charon version.
1414
pub version: Family<PeerVersionLabels, Gauge>,
@@ -18,11 +18,11 @@ pub struct PeerInfoMetrics {
1818

1919
/// Constant gauge set to the peer start time of the binary in unix seconds.
2020
#[metrics(labels = ["peer"])]
21-
pub start_time_secs: LabeledFamily<String, Gauge<f64>>,
21+
pub start_time_secs: LabeledFamily<String, Gauge<i64>>,
2222

2323
/// Constant gauge set to the peer index in the cluster definition.
2424
#[metrics(labels = ["peer"])]
25-
pub index: LabeledFamily<String, Gauge>,
25+
pub index: LabeledFamily<String, Gauge<usize>>,
2626

2727
/// Set to 1 if the peer's version is supported by (compatible with) the
2828
/// current version, else 0 if unsupported.

0 commit comments

Comments
 (0)