Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::peer_manager::peerdb::client::ClientKind;
use crate::types::GossipKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use peerdb::score::{PeerAction, ReportSource};
use peerdb::score::{DisconnectDirection, LastDisconnect, PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::net::IpAddr;
Expand Down Expand Up @@ -150,6 +150,50 @@ pub enum PeerManagerEvent {
DiscoverSubnetPeers(Vec<SubnetDiscovery>),
}

/// Maps an [`RPCError`] (and its inner status code, where applicable) to a
/// stable, granular `'static` tag used as the `msg` for `report_peer` so
/// external consumers can distinguish e.g. a rate limit from a ssz decode
/// failure or an unsupported protocol.
fn rpc_error_msg(err: &RPCError) -> &'static str {
match err {
RPCError::IncompleteStream => "rpc_incomplete_stream",
RPCError::HandlerRejected => "rpc_handler_rejected",
RPCError::InvalidData(_) => "rpc_invalid_data",
RPCError::SszReadError(_) => "rpc_ssz_decode_error",
RPCError::SszWriteError(_) => "rpc_ssz_encode_error",
RPCError::IoError(_) => "rpc_io_error",
RPCError::NegotiationTimeout => "rpc_negotiation_timeout",
RPCError::StreamTimeout => "rpc_stream_timeout",
RPCError::UnsupportedProtocol => "rpc_unsupported_protocol",
RPCError::Disconnected => "rpc_disconnected",
RPCError::InternalError(_) => "rpc_internal_error",
RPCError::ErrorResponse(code, _) => match code {
RpcErrorResponse::Unknown => "rpc_unknown_status",
RpcErrorResponse::ResourceUnavailable => "rpc_resource_unavailable",
RpcErrorResponse::ServerError => "rpc_server_error",
RpcErrorResponse::InvalidRequest => "rpc_invalid_request",
RpcErrorResponse::RateLimited => "rpc_rate_limited",
RpcErrorResponse::BlobsNotFoundForBlock => "rpc_blobs_not_found",
},
}
}

/// Returns a stable `'static` variant name for a [`GoodbyeReason`] suitable
/// for JSON serialization on the `/eth/v1/node/peers` HTTP endpoint.
pub(crate) fn goodbye_reason_name(reason: &GoodbyeReason) -> &'static str {
match reason {
GoodbyeReason::ClientShutdown => "ClientShutdown",
GoodbyeReason::IrrelevantNetwork => "IrrelevantNetwork",
GoodbyeReason::Fault => "Fault",
GoodbyeReason::UnableToVerifyNetwork => "UnableToVerifyNetwork",
GoodbyeReason::TooManyPeers => "TooManyPeers",
GoodbyeReason::BadScore => "BadScore",
GoodbyeReason::Banned => "Banned",
GoodbyeReason::BannedIP => "BannedIP",
GoodbyeReason::Unknown => "Unknown",
}
}

impl PeerManager {
// NOTE: Must be run inside a tokio executor.
pub fn new<P: Preset>(
Expand Down Expand Up @@ -214,12 +258,24 @@ impl PeerManager {
///
/// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
// Capture the goodbye details for the `/eth/v1/node/peers` API before
// `reason` is consumed by `report_peer` below.
let reason_name = goodbye_reason_name(&reason);
let reason_code: u64 = reason.into();
let last_disconnect = LastDisconnect {
reason: reason_name,
code: reason_code,
direction: DisconnectDirection::Sent,
at: Instant::now(),
};

// Update the sync status if required
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(%peer_id, %reason, score = %info.score(), "Sending goodbye to peer");
if matches!(reason, GoodbyeReason::IrrelevantNetwork) {
info.update_sync_status(SyncStatus::IrrelevantPeer);
}
info.set_last_disconnect(last_disconnect);
}

self.report_peer(
Expand Down Expand Up @@ -657,13 +713,8 @@ impl PeerManager {
RPCError::Disconnected => return, // No penalty for a graceful disconnection
};

self.report_peer(
peer_id,
peer_action,
ReportSource::RPC,
None,
"handle_rpc_error",
);
let msg = rpc_error_msg(err);
self.report_peer(peer_id, peer_action, ReportSource::RPC, None, msg);
}

/// A ping request has been received.
Expand Down
31 changes: 30 additions & 1 deletion src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use helper_functions::misc;
use itertools::Itertools as _;
use logging::exception;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use score::{PeerAction, ReportSource, Score, ScoreState};
use score::{LastAction, PeerAction, ReportSource, Score, ScoreState};
use ssz::H256;
use std::net::IpAddr;
use std::time::Instant;
Expand Down Expand Up @@ -133,6 +133,21 @@ impl PeerDB {
self.peers.get_mut(peer_id)
}

/// Records the most recent disconnect event for a peer.
///
/// Exposed to the wider crate (e.g. the RPC service layer when receiving
/// a `Goodbye` request) so the `/eth/v1/node/peers` HTTP endpoint can
/// show *why* a peer disconnected. No-op if the peer is unknown.
pub(crate) fn record_last_disconnect(
&mut self,
peer_id: &PeerId,
last_disconnect: score::LastDisconnect,
) {
if let Some(info) = self.peers.get_mut(peer_id) {
info.set_last_disconnect(last_disconnect);
}
}

/// Returns if the peer is already connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
matches!(
Expand Down Expand Up @@ -648,7 +663,21 @@ impl PeerDB {
match self.peers.get_mut(peer_id) {
Some(info) => {
let previous_state = info.score_state();
let pre_score = info.score().score();
info.apply_peer_action_to_score(action);
let post_score = info.score().score();
// Record the most recent score-affecting event so the HTTP
// API can expose *why* a peer's score moved. Trusted peers
// are skipped because their score is never mutated.
if !info.is_trusted {
info.set_last_action(LastAction {
reason: msg,
source,
action,
delta: post_score - pre_score,
at: Instant::now(),
});
}
crate::common::metrics::inc_counter_vec(
&metrics::PEER_ACTION_EVENTS_PER_CLIENT,
&[info.client().kind.as_ref(), action.as_ref(), source.into()],
Expand Down
40 changes: 39 additions & 1 deletion src/peer_manager/peerdb/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::client::Client;
use super::score::{PeerAction, Score, ScoreState};
use super::score::{LastAction, LastDisconnect, PeerAction, Score, ScoreState};
use super::sync_status::SyncStatus;
use crate::discovery::Eth2Enr;
use crate::{rpc::MetaData, types::Subnet};
Expand Down Expand Up @@ -55,6 +55,19 @@ pub struct PeerInfo {
connection_direction: Option<ConnectionDirection>,
/// The enr of the peer, if known.
enr: Option<Enr>,
/// The most recent score-affecting event for this peer (if any).
///
/// Exposed via the `/eth/v1/node/peers` HTTP endpoint so external tooling
/// can correlate score changes with their cause. Skipped from JSON when
/// the peer has never been scored.
#[serde(skip_serializing_if = "Option::is_none")]
last_action: Option<LastAction>,
/// The most recent disconnect event for this peer (if any).
///
/// Records the goodbye reason and direction (sent/received). Skipped from
/// JSON when the peer has never disconnected.
#[serde(skip_serializing_if = "Option::is_none")]
last_disconnect: Option<LastDisconnect>,
}

impl Default for PeerInfo {
Expand All @@ -73,6 +86,8 @@ impl Default for PeerInfo {
is_trusted: false,
connection_direction: None,
enr: None,
last_action: None,
last_disconnect: None,
}
}
}
Expand Down Expand Up @@ -311,6 +326,29 @@ impl PeerInfo {
self.score.state()
}

/// Returns the most recent score-affecting event recorded for this peer.
pub fn last_action(&self) -> Option<&LastAction> {
self.last_action.as_ref()
}

/// Returns the most recent disconnect event recorded for this peer.
pub fn last_disconnect(&self) -> Option<&LastDisconnect> {
self.last_disconnect.as_ref()
}

/// Records the most recent score-affecting event for this peer.
// VISIBILITY: The peer manager populates this from `report_peer`.
pub(in crate::peer_manager) fn set_last_action(&mut self, last_action: LastAction) {
self.last_action = Some(last_action);
}

/// Records the most recent disconnect event for this peer.
// VISIBILITY: Populated from goodbye send/receive paths in both the peer
// manager and the service layer.
pub(crate) fn set_last_disconnect(&mut self, last_disconnect: LastDisconnect) {
self.last_disconnect = Some(last_disconnect);
}

/// Returns true if the gossipsub score is sufficient.
pub fn is_good_gossipsub_peer(&self) -> bool {
self.score.is_good_gossipsub_peer()
Expand Down
65 changes: 62 additions & 3 deletions src/peer_manager/peerdb/score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! The scoring algorithms are currently experimental.
use crate::service::gossipsub_scoring_parameters::GREYLIST_THRESHOLD as GOSSIPSUB_GREYLIST_THRESHOLD;
use serde::Serialize;
use serde::{Serialize, Serializer};
use std::cmp::Ordering;
use std::sync::LazyLock;
use std::time::Instant;
Expand Down Expand Up @@ -43,7 +43,7 @@ const GOSSIPSUB_POSITIVE_SCORE_WEIGHT: f64 = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT;
/// Each variant has an associated score change.
// To easily assess the behaviour of scores changes the number of variants should stay low, and
// somewhat generic.
#[derive(Debug, Clone, Copy, AsRefStr)]
#[derive(Debug, Clone, Copy, AsRefStr, Serialize)]
#[strum(serialize_all = "snake_case")]
pub enum PeerAction {
/// We should not communicate more with this peer.
Expand All @@ -66,7 +66,7 @@ pub enum PeerAction {
}

/// Service reporting a `PeerAction` for a peer.
#[derive(Debug)]
#[derive(Debug, Clone, Copy, Serialize)]
pub enum ReportSource {
Gossipsub,
RPC,
Expand All @@ -87,6 +87,65 @@ impl From<ReportSource> for &'static str {
}
}

/// Custom serializer for [`Instant`] fields: emits the elapsed time in seconds.
///
/// `Instant` is opaque and has no meaningful wire representation; we serialize
/// it as "how many seconds ago this happened" so consumers can render it
/// without needing wall-clock context.
pub(crate) fn serialize_instant_seconds_ago<S>(
instant: &Instant,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(instant.elapsed().as_secs())
}

/// The most recent score-affecting event applied to a peer.
///
/// Exposed on the `/eth/v1/node/peers` HTTP endpoint so external tools (e.g.
/// dora) can show *why* a peer was scored, not just the resulting number.
#[derive(Debug, Clone, Serialize)]
pub struct LastAction {
/// Static reason tag for the event, e.g. `rpc_invalid_data` or
/// `goodbye_peer`.
pub reason: &'static str,
/// Which subsystem reported the action.
pub source: ReportSource,
/// The action category (Fatal/Low/Mid/High tolerance).
pub action: PeerAction,
/// Signed score change caused by this event (`post_score - pre_score`).
pub delta: f64,
/// When the event happened — serialized as `seconds_ago`.
#[serde(serialize_with = "serialize_instant_seconds_ago", rename = "seconds_ago")]
pub at: Instant,
}

/// The direction of a disconnect event.
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum DisconnectDirection {
/// We sent the goodbye / initiated the disconnect.
Sent,
/// The peer sent us a goodbye.
Received,
}

/// The most recent disconnect event for a peer.
#[derive(Debug, Clone, Serialize)]
pub struct LastDisconnect {
/// Human-readable variant name of the goodbye reason (e.g. `BadScore`).
pub reason: &'static str,
/// Numeric goodbye code on the wire (mirrors libp2p `GoodbyeReason as u64`).
pub code: u64,
/// Whether we sent or received the goodbye.
pub direction: DisconnectDirection,
/// When the disconnect happened — serialized as `seconds_ago`.
#[serde(serialize_with = "serialize_instant_seconds_ago", rename = "seconds_ago")]
pub at: Instant,
}

impl std::fmt::Display for PeerAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
17 changes: 15 additions & 2 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::discovery::{
};
use crate::peer_manager::{
ConnectionDirection, PeerManager, PeerManagerEvent, config::Config as PeerManagerCfg,
peerdb::score::PeerAction, peerdb::score::ReportSource,
goodbye_reason_name,
peerdb::score::{DisconnectDirection, LastDisconnect, PeerAction, ReportSource},
};
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
use crate::rpc::methods::MetadataRequest;
Expand Down Expand Up @@ -40,7 +41,7 @@ use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::usize;
use std_ext::ArcExt as _;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -1534,6 +1535,18 @@ impl<P: Preset> Network<P> {
client = %self.network_globals.client(&peer_id),
"Peer sent Goodbye"
);
// Record the received goodbye for the `/eth/v1/node/peers` API.
let reason_name = goodbye_reason_name(&reason);
let reason_code: u64 = reason.into();
self.network_globals.peers.write().record_last_disconnect(
&peer_id,
LastDisconnect {
reason: reason_name,
code: reason_code,
direction: DisconnectDirection::Received,
at: Instant::now(),
},
);
// NOTE: We currently do not inform the application that we are
// disconnecting here. The RPC handler will automatically
// disconnect for us.
Expand Down