Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod subnet_service;
mod sync;

pub use lighthouse_network::NetworkConfig;
pub use network_beacon_processor::NetworkBeaconProcessor;
pub use service::{
NetworkMessage, NetworkReceivers, NetworkSenders, NetworkService, ValidatorSubscriptionMessage,
};
134 changes: 84 additions & 50 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ impl<E: EthSpec> FailedAtt<E> {
}
}

/// `MessageAcceptance` doesn't implement clone so we do a manual match here.
/// TODO: remove this once `Clone` is available on this type:
/// https://github.com/libp2p/rust-libp2p/pull/6445
fn clone_message_acceptance(a: &MessageAcceptance) -> MessageAcceptance {
match a {
MessageAcceptance::Accept => MessageAcceptance::Accept,
MessageAcceptance::Reject => MessageAcceptance::Reject,
MessageAcceptance::Ignore => MessageAcceptance::Ignore,
}
}
Comment on lines +177 to +186
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. I opened libp2p/rust-libp2p#6445, let's add a comment here to remove this fn once that is available.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, added comment


impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/* Auxiliary functions */

Expand Down Expand Up @@ -2190,111 +2201,134 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
message_id: MessageId,
peer_id: PeerId,
proposer_slashing: ProposerSlashing,
) {
) -> MessageAcceptance {
let validator_index = proposer_slashing.signed_header_1.message.proposer_index;

let slashing = match self
let (validation_result, verified_slashing_opt) = match self
.chain
.verify_proposer_slashing_for_gossip(proposer_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
reason = "Already seen a proposer slashing for that validator",
validator_index,
peer = %peer_id,
"Dropping proposer slashing"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
(MessageAcceptance::Ignore, None)
}
Err(e) => {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
debug!(
validator_index,
%peer_id,
error = ?e,
"Dropping invalid proposer slashing"
"Dropping proposer slashing due to an error"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);

// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_proposer_slashing",
);
return;
if matches!(e, BeaconChainError::ProposerSlashingValidationError(_)) {
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_proposer_slashing",
);
Comment on lines +2230 to +2235
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this now only happens on ProposerSlashingValidationError?

My intuition would actually keep or remove a HighToleranceError for the ignore case but upgrade to a LowToleranceError for the reject case.

Dito for attestatiion slashings.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original intention was to not change any scoring logic in this PR unless it's not compliant with the spec. However, it doesn't really make sense to penalize peer if the node fails to load the head state at current slot (peer has nothing to do with it) - so I removed that but maintain the current HighToleranceError for potential invalids.

verify_proposer_slashing_for_gossip can return error from one of these sources:

  1. BeaconChainError from wall_clock_state(): Error loading head state at current slot - this is an internal error and can't be triggered by peer, so it doesn't make sense to penalize peers
  2. ProposerSlashingValidationError are mostly caused by peers, how could also be internal errors like BeaconStateError - we could try to categorise them but it gives us little gain - slashing processing is pretty low priority in the beacon processor queue so they just get dropped if malicious peers try to spam us with invalids, therefore I think the current high tolerance makes sense.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the penalty was originally introduced here a long while ago: #1602

(MessageAcceptance::Reject, None)
} else {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
(MessageAcceptance::Ignore, None)
}
}
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
self.propagate_validation_result(
message_id,
peer_id,
clone_message_acceptance(&validation_result),
);

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Some(slashing) = verified_slashing_opt {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);

// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_proposer_slashing(slashing.as_inner());
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_proposer_slashing(slashing.as_inner());

self.chain.import_proposer_slashing(slashing);
debug!("Successfully imported proposer slashing");

self.chain.import_proposer_slashing(slashing);
debug!("Successfully imported proposer slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
}

metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL);
validation_result
}

pub fn process_gossip_attester_slashing(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
attester_slashing: AttesterSlashing<T::EthSpec>,
) {
let slashing = match self
) -> MessageAcceptance {
let (validation_result, verified_slashing_opt) = match self
.chain
.verify_attester_slashing_for_gossip(attester_slashing)
{
Ok(ObservationOutcome::New(slashing)) => slashing,
Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)),
Ok(ObservationOutcome::AlreadyKnown) => {
debug!(
reason = "Slashings already known for all slashed validators",
peer = %peer_id,
"Dropping attester slashing"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
return;
(MessageAcceptance::Ignore, None)
}
Err(e) => {
debug!(
%peer_id,
error = ?e,
"Dropping invalid attester slashing"
"Dropping attester slashing due to an error"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_attester_slashing",
);
return;

if matches!(e, BeaconChainError::AttesterSlashingValidationError(_)) {
// Penalize peer slightly for invalids.
self.gossip_penalize_peer(
peer_id,
PeerAction::HighToleranceError,
"invalid_gossip_attester_slashing",
);
(MessageAcceptance::Reject, None)
} else {
// This is likely a fault with the beacon chain and not necessarily a
// malicious message from the peer.
(MessageAcceptance::Ignore, None)
}
}
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
self.propagate_validation_result(
message_id,
peer_id,
clone_message_acceptance(&validation_result),
);

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
if let Some(slashing) = verified_slashing_opt {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);

// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_attester_slashing(slashing.as_inner().to_ref());
// Register the slashing with any monitored validators.
self.chain
.validator_monitor
.read()
.register_gossip_attester_slashing(slashing.as_inner().to_ref());

self.chain.import_attester_slashing(slashing);
debug!("Successfully imported attester slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
}

self.chain.import_attester_slashing(slashing);
debug!("Successfully imported attester slashing");
metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL);
validation_result
}

pub fn process_gossip_bls_to_execution_change(
Expand Down
37 changes: 26 additions & 11 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss
use beacon_chain::fetch_blobs::{
EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs,
};
use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use beacon_processor::{
BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
Expand All @@ -20,7 +21,7 @@ use lighthouse_network::rpc::methods::{
};
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use lighthouse_network::{
Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage,
Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage,
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
};
use rand::prelude::SliceRandom;
Expand All @@ -31,6 +32,10 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, instrument, trace, warn};
use types::*;
use {
beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels,
slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender,
};

pub use sync_methods::ChainSegmentProcessId;
use types::data::FixedBlobSidecarList;
Expand Down Expand Up @@ -353,7 +358,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing)
processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing);
};

self.try_send(BeaconWorkEvent {
Expand Down Expand Up @@ -420,7 +425,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || {
processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing)
processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing);
};

self.try_send(BeaconWorkEvent {
Expand Down Expand Up @@ -1260,17 +1265,9 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

#[cfg(test)]
use {
beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels,
slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender,
};

#[cfg(test)]
pub(crate) type TestBeaconChainType<E> =
Witness<ManualSlotClock, E, MemoryStore<E>, MemoryStore<E>>;

#[cfg(test)]
impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
// Instantiates a mostly non-functional version of `Self` and returns the
// event receiver that would normally go to the beacon processor. This is
Expand Down Expand Up @@ -1302,4 +1299,22 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {

(network_beacon_processor, beacon_processor_rx)
}

/// Constructs a mostly non-functional `NetworkBeaconProcessor` from a test harness,
/// suitable for directly calling gossip processing methods in tests.
pub fn null_from_harness(harness: &BeaconChainHarness<EphemeralHarnessType<E>>) -> Self {
let network_globals = NetworkGlobals::new_test_globals(
vec![],
Arc::new(NetworkConfig::default()),
harness.spec.clone(),
);

Self::null_for_testing(
Arc::new(network_globals),
mpsc::unbounded_channel().0,
harness.chain.clone(),
harness.runtime.task_executor.clone(),
)
.0
}
}
2 changes: 2 additions & 0 deletions testing/ef_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ fork_choice = { workspace = true }
fs2 = { workspace = true }
hex = { workspace = true }
kzg = { workspace = true }
lighthouse_network = { workspace = true }
logging = { workspace = true }
milhouse = { workspace = true }
network = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
10 changes: 8 additions & 2 deletions testing/ef_tests/check_all_files_accessed.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@
"tests/.*/compute_challenge/.*",
# We don't need these manifest files at the moment.
"tests/.*/manifest.yaml",
# TODO: gossip condition tests not implemented yet
"tests/.*/.*/networking/.*",
# TODO: Remaining gossip validation topics not yet implemented
"tests/.*/.*/networking/gossip_beacon_block/.*",
"tests/.*/.*/networking/gossip_beacon_attestation/.*",
"tests/.*/.*/networking/gossip_beacon_aggregate_and_proof/.*",
"tests/.*/.*/networking/gossip_voluntary_exit/.*",
"tests/.*/.*/networking/gossip_bls_to_execution_change/.*",
"tests/.*/.*/networking/gossip_sync_committee_message/.*",
"tests/.*/.*/networking/gossip_sync_committee_contribution_and_proof/.*",
# TODO: fast confirmation rule not merged yet
"tests/.*/.*/fast_confirmation",
]
Expand Down
2 changes: 2 additions & 0 deletions testing/ef_tests/src/cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod fork_choice;
mod genesis_initialization;
mod genesis_validity;
mod get_custody_groups;
mod gossip_validation;
mod kzg_blob_to_kzg_commitment;
mod kzg_compute_blob_kzg_proof;
mod kzg_compute_cells;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub use fork::ForkTest;
pub use genesis_initialization::*;
pub use genesis_validity::*;
pub use get_custody_groups::*;
pub use gossip_validation::*;
pub use kzg_blob_to_kzg_commitment::*;
pub use kzg_compute_blob_kzg_proof::*;
pub use kzg_compute_cells::*;
Expand Down
Loading
Loading