diff --git a/Cargo.lock b/Cargo.lock index 078f699f3c8..4821daf0bac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2855,8 +2855,10 @@ dependencies = [ "fs2", "hex", "kzg", + "lighthouse_network", "logging", "milhouse", + "network", "rayon", "serde", "serde_json", diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index 2a7fedb53e9..dc45f53c705 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -11,6 +11,7 @@ mod subnet_service; mod sync; pub use lighthouse_network::NetworkConfig; +pub use network_beacon_processor::NetworkBeaconProcessor; pub use service::{ NetworkMessage, NetworkReceivers, NetworkSenders, NetworkService, ValidatorSubscriptionMessage, }; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d34668b1387..3caacb9d76a 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -174,6 +174,17 @@ impl FailedAtt { } } +/// `MessageAcceptance` doesn't implement clone so we do a manual match here. +/// TODO: remove this once `Clone` is available on this type: +/// https://github.com/libp2p/rust-libp2p/pull/6445 +fn clone_message_acceptance(a: &MessageAcceptance) -> MessageAcceptance { + match a { + MessageAcceptance::Accept => MessageAcceptance::Accept, + MessageAcceptance::Reject => MessageAcceptance::Reject, + MessageAcceptance::Ignore => MessageAcceptance::Ignore, + } +} + impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -2190,14 +2201,14 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, proposer_slashing: ProposerSlashing, - ) { + ) -> MessageAcceptance { let validator_index = proposer_slashing.signed_header_1.message.proposer_index; - let slashing = match self + let (validation_result, verified_slashing_opt) = match self .chain .verify_proposer_slashing_for_gossip(proposer_slashing) { - Ok(ObservationOutcome::New(slashing)) => slashing, + Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)), Ok(ObservationOutcome::AlreadyKnown) => { debug!( reason = "Already seen a proposer slashing for that validator", @@ -2205,44 +2216,54 @@ impl NetworkBeaconProcessor { peer = %peer_id, "Dropping proposer slashing" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - return; + (MessageAcceptance::Ignore, None) } Err(e) => { - // This is likely a fault with the beacon chain and not necessarily a - // malicious message from the peer. debug!( validator_index, %peer_id, error = ?e, - "Dropping invalid proposer slashing" + "Dropping proposer slashing due to an error" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - // Penalize peer slightly for invalids. - self.gossip_penalize_peer( - peer_id, - PeerAction::HighToleranceError, - "invalid_gossip_proposer_slashing", - ); - return; + if matches!(e, BeaconChainError::ProposerSlashingValidationError(_)) { + // Penalize peer slightly for invalids. + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "invalid_gossip_proposer_slashing", + ); + (MessageAcceptance::Reject, None) + } else { + // This is likely a fault with the beacon chain and not necessarily a + // malicious message from the peer. + (MessageAcceptance::Ignore, None) + } } }; - metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL); + self.propagate_validation_result( + message_id, + peer_id, + clone_message_acceptance(&validation_result), + ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + if let Some(slashing) = verified_slashing_opt { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL); - // Register the slashing with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_proposer_slashing(slashing.as_inner()); + // Register the slashing with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_proposer_slashing(slashing.as_inner()); + + self.chain.import_proposer_slashing(slashing); + debug!("Successfully imported proposer slashing"); - self.chain.import_proposer_slashing(slashing); - debug!("Successfully imported proposer slashing"); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL); + } - metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_IMPORTED_TOTAL); + validation_result } pub fn process_gossip_attester_slashing( @@ -2250,51 +2271,64 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, attester_slashing: AttesterSlashing, - ) { - let slashing = match self + ) -> MessageAcceptance { + let (validation_result, verified_slashing_opt) = match self .chain .verify_attester_slashing_for_gossip(attester_slashing) { - Ok(ObservationOutcome::New(slashing)) => slashing, + Ok(ObservationOutcome::New(slashing)) => (MessageAcceptance::Accept, Some(slashing)), Ok(ObservationOutcome::AlreadyKnown) => { debug!( reason = "Slashings already known for all slashed validators", peer = %peer_id, "Dropping attester slashing" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - return; + (MessageAcceptance::Ignore, None) } Err(e) => { debug!( %peer_id, error = ?e, - "Dropping invalid attester slashing" + "Dropping attester slashing due to an error" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); - // Penalize peer slightly for invalids. - self.gossip_penalize_peer( - peer_id, - PeerAction::HighToleranceError, - "invalid_gossip_attester_slashing", - ); - return; + + if matches!(e, BeaconChainError::AttesterSlashingValidationError(_)) { + // Penalize peer slightly for invalids. + self.gossip_penalize_peer( + peer_id, + PeerAction::HighToleranceError, + "invalid_gossip_attester_slashing", + ); + (MessageAcceptance::Reject, None) + } else { + // This is likely a fault with the beacon chain and not necessarily a + // malicious message from the peer. + (MessageAcceptance::Ignore, None) + } } }; - metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL); + self.propagate_validation_result( + message_id, + peer_id, + clone_message_acceptance(&validation_result), + ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + if let Some(slashing) = verified_slashing_opt { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL); - // Register the slashing with any monitored validators. - self.chain - .validator_monitor - .read() - .register_gossip_attester_slashing(slashing.as_inner().to_ref()); + // Register the slashing with any monitored validators. + self.chain + .validator_monitor + .read() + .register_gossip_attester_slashing(slashing.as_inner().to_ref()); + + self.chain.import_attester_slashing(slashing); + debug!("Successfully imported attester slashing"); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL); + } - self.chain.import_attester_slashing(slashing); - debug!("Successfully imported attester slashing"); - metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_IMPORTED_TOTAL); + validation_result } pub fn process_gossip_bls_to_execution_change( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7817feb0bdf..d5aacee621c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -7,6 +7,7 @@ use beacon_chain::data_column_verification::{GossipDataColumnError, observe_goss use beacon_chain::fetch_blobs::{ EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs, }; +use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError}; use beacon_processor::{ BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, @@ -20,7 +21,7 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ - Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage, + Client, GossipTopic, MessageId, NetworkConfig, NetworkGlobals, PeerId, PubsubMessage, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, }; use rand::prelude::SliceRandom; @@ -31,6 +32,10 @@ use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, instrument, trace, warn}; use types::*; +use { + beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels, + slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender, +}; pub use sync_methods::ChainSegmentProcessId; use types::data::FixedBlobSidecarList; @@ -353,7 +358,7 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move || { - processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing) + processor.process_gossip_proposer_slashing(message_id, peer_id, *proposer_slashing); }; self.try_send(BeaconWorkEvent { @@ -420,7 +425,7 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = move || { - processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing) + processor.process_gossip_attester_slashing(message_id, peer_id, *attester_slashing); }; self.try_send(BeaconWorkEvent { @@ -1260,17 +1265,9 @@ impl NetworkBeaconProcessor { } } -#[cfg(test)] -use { - beacon_chain::builder::Witness, beacon_processor::BeaconProcessorChannels, - slot_clock::ManualSlotClock, store::MemoryStore, tokio::sync::mpsc::UnboundedSender, -}; - -#[cfg(test)] pub(crate) type TestBeaconChainType = Witness, MemoryStore>; -#[cfg(test)] impl NetworkBeaconProcessor> { // Instantiates a mostly non-functional version of `Self` and returns the // event receiver that would normally go to the beacon processor. This is @@ -1302,4 +1299,22 @@ impl NetworkBeaconProcessor> { (network_beacon_processor, beacon_processor_rx) } + + /// Constructs a mostly non-functional `NetworkBeaconProcessor` from a test harness, + /// suitable for directly calling gossip processing methods in tests. + pub fn null_from_harness(harness: &BeaconChainHarness>) -> Self { + let network_globals = NetworkGlobals::new_test_globals( + vec![], + Arc::new(NetworkConfig::default()), + harness.spec.clone(), + ); + + Self::null_for_testing( + Arc::new(network_globals), + mpsc::unbounded_channel().0, + harness.chain.clone(), + harness.runtime.task_executor.clone(), + ) + .0 + } } diff --git a/testing/ef_tests/Cargo.toml b/testing/ef_tests/Cargo.toml index 9d09c3dfe68..ca472b7187b 100644 --- a/testing/ef_tests/Cargo.toml +++ b/testing/ef_tests/Cargo.toml @@ -26,8 +26,10 @@ fork_choice = { workspace = true } fs2 = { workspace = true } hex = { workspace = true } kzg = { workspace = true } +lighthouse_network = { workspace = true } logging = { workspace = true } milhouse = { workspace = true } +network = { workspace = true } rayon = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index 53fb626e7e6..0a7fd196cf6 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -75,8 +75,14 @@ "tests/.*/compute_challenge/.*", # We don't need these manifest files at the moment. "tests/.*/manifest.yaml", - # TODO: gossip condition tests not implemented yet - "tests/.*/.*/networking/.*", + # TODO: Remaining gossip validation topics not yet implemented + "tests/.*/.*/networking/gossip_beacon_block/.*", + "tests/.*/.*/networking/gossip_beacon_attestation/.*", + "tests/.*/.*/networking/gossip_beacon_aggregate_and_proof/.*", + "tests/.*/.*/networking/gossip_voluntary_exit/.*", + "tests/.*/.*/networking/gossip_bls_to_execution_change/.*", + "tests/.*/.*/networking/gossip_sync_committee_message/.*", + "tests/.*/.*/networking/gossip_sync_committee_contribution_and_proof/.*", # TODO: fast confirmation rule not merged yet "tests/.*/.*/fast_confirmation", ] diff --git a/testing/ef_tests/src/cases.rs b/testing/ef_tests/src/cases.rs index b2e02763539..b2386f6fa50 100644 --- a/testing/ef_tests/src/cases.rs +++ b/testing/ef_tests/src/cases.rs @@ -20,6 +20,7 @@ mod fork_choice; mod genesis_initialization; mod genesis_validity; mod get_custody_groups; +mod gossip_validation; mod kzg_blob_to_kzg_commitment; mod kzg_compute_blob_kzg_proof; mod kzg_compute_cells; @@ -57,6 +58,7 @@ pub use fork::ForkTest; pub use genesis_initialization::*; pub use genesis_validity::*; pub use get_custody_groups::*; +pub use gossip_validation::*; pub use kzg_blob_to_kzg_commitment::*; pub use kzg_compute_blob_kzg_proof::*; pub use kzg_compute_cells::*; diff --git a/testing/ef_tests/src/cases/gossip_validation.rs b/testing/ef_tests/src/cases/gossip_validation.rs new file mode 100644 index 00000000000..3dbbcae5a72 --- /dev/null +++ b/testing/ef_tests/src/cases/gossip_validation.rs @@ -0,0 +1,206 @@ +use super::*; +use crate::bls_setting::BlsSetting; +use crate::decode::{ssz_decode_file, ssz_decode_state, yaml_decode_file}; +use crate::type_name::TypeName; +use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType}; +use lighthouse_network::{MessageAcceptance, MessageId, PeerId}; +use network::NetworkBeaconProcessor; +use serde::Deserialize; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use types::{AttesterSlashing, BeaconState, EthSpec, ForkName, ProposerSlashing}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +enum ExpectedOutcome { + Valid, + Ignore, + Reject, +} + +impl PartialEq for ExpectedOutcome { + fn eq(&self, other: &MessageAcceptance) -> bool { + matches!( + (self, other), + (Self::Valid, MessageAcceptance::Accept) + | (Self::Ignore, MessageAcceptance::Ignore) + | (Self::Reject, MessageAcceptance::Reject) + ) + } +} + +#[derive(Debug, Clone, Deserialize)] +struct Meta { + topic: Topic, + #[serde(default)] + messages: Vec, + #[serde(default)] + bls_setting: Option, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +struct MessageMeta { + message: String, + expected: ExpectedOutcome, + #[serde(default)] + reason: Option, + #[serde(default)] + #[allow(dead_code)] + subnet_id: Option, + #[serde(default)] + #[allow(dead_code)] + offset_ms: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +enum Topic { + ProposerSlashing, + AttesterSlashing, + // TODO: add support for these topics + // VoluntaryExit, + // BlsToExecutionChange, + // SyncCommittee, + // SyncCommitteeContributionAndProof, + // BeaconBlock, + // BeaconAttestation, + // BeaconAggregateAndProof, +} + +#[derive(Debug)] +pub struct GossipValidation { + path: PathBuf, + meta: Meta, + state: BeaconState, +} + +impl LoadCase for GossipValidation { + fn load_from_dir(path: &Path, fork_name: ForkName) -> Result { + let meta: Meta = yaml_decode_file(&path.join("meta.yaml"))?; + let spec = &testing_spec::(fork_name); + let state = ssz_decode_state(&path.join("state.ssz_snappy"), spec)?; + + Ok(Self { + path: path.to_path_buf(), + meta, + state, + }) + } +} + +impl Case for GossipValidation { + fn description(&self) -> String { + self.path + .iter() + .next_back() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_default() + } + + fn result(&self, _case_index: usize, fork_name: ForkName) -> Result<(), Error> { + if let Some(bls_setting) = self.meta.bls_setting { + bls_setting.check()?; + } + + let spec = testing_spec::(fork_name); + let tester = GossipTester::new(self, spec)?; + + for message_meta in &self.meta.messages { + let actual = + tester.validate_message(&self.path, &self.meta.topic, message_meta, fork_name)?; + + if message_meta.expected != actual { + return Err(Error::NotEqual(format!( + "{}: expected {:?}, got {:?}{}", + self.path.display(), + message_meta.expected, + actual, + message_meta + .reason + .as_ref() + .map(|r| format!(" ({r})")) + .unwrap_or_default() + ))); + } + } + + Ok(()) + } +} + +struct GossipTester { + network_beacon_processor: Arc>>, +} + +impl GossipTester { + fn new(case: &GossipValidation, spec: ChainSpec) -> Result { + let genesis_time = case.state.genesis_time(); + let spec = Arc::new(spec); + + let harness = BeaconChainHarness::>::builder(E::default()) + .spec(spec.clone()) + .keypairs(vec![]) + .genesis_state_ephemeral_store(case.state.clone()) + .mock_execution_layer() + .recalculate_fork_times_with_genesis(genesis_time) + .mock_execution_layer_all_payloads_valid() + .build(); + + let network_beacon_processor = NetworkBeaconProcessor::null_from_harness(&harness); + + Ok(Self { + network_beacon_processor: Arc::new(network_beacon_processor), + }) + } + + fn validate_message( + &self, + path: &Path, + topic: &Topic, + message_meta: &MessageMeta, + fork_name: ForkName, + ) -> Result { + match topic { + Topic::ProposerSlashing => self.validate_proposer_slashing(path, message_meta), + Topic::AttesterSlashing => { + self.validate_attester_slashing(path, message_meta, fork_name) + } + } + } + + fn validate_proposer_slashing( + &self, + path: &Path, + message_meta: &MessageMeta, + ) -> Result { + let slashing: ProposerSlashing = + ssz_decode_file(&path.join(format!("{}.ssz_snappy", message_meta.message)))?; + + let message_id = MessageId::new(&[]); + let peer_id = PeerId::random(); + Ok(self + .network_beacon_processor + .process_gossip_proposer_slashing(message_id, peer_id, slashing)) + } + + fn validate_attester_slashing( + &self, + path: &Path, + message_meta: &MessageMeta, + fork_name: ForkName, + ) -> Result { + let ssz_path = path.join(format!("{}.ssz_snappy", message_meta.message)); + let slashing: AttesterSlashing = if fork_name.electra_enabled() { + ssz_decode_file(&ssz_path).map(AttesterSlashing::Electra)? + } else { + ssz_decode_file(&ssz_path).map(AttesterSlashing::Base)? + }; + + let message_id = MessageId::new(&[]); + let peer_id = PeerId::random(); + Ok(self + .network_beacon_processor + .process_gossip_attester_slashing(message_id, peer_id, slashing)) + } +} diff --git a/testing/ef_tests/src/handler.rs b/testing/ef_tests/src/handler.rs index e380f51c0af..cc7a8b0896b 100644 --- a/testing/ef_tests/src/handler.rs +++ b/testing/ef_tests/src/handler.rs @@ -980,6 +980,36 @@ impl Handler for ComputeColumnsForCustodyGroupHandler } } +pub struct GossipValidationHandler { + handler_name: &'static str, + _phantom: PhantomData, +} + +impl GossipValidationHandler { + pub const fn new(handler_name: &'static str) -> Self { + Self { + handler_name, + _phantom: PhantomData, + } + } +} + +impl Handler for GossipValidationHandler { + type Case = cases::GossipValidation; + + fn config_name() -> &'static str { + E::name() + } + + fn runner_name() -> &'static str { + "networking" + } + + fn handler_name(&self) -> String { + self.handler_name.into() + } +} + #[derive(Educe)] #[educe(Default)] pub struct KZGComputeCellsHandler(PhantomData); diff --git a/testing/ef_tests/tests/tests.rs b/testing/ef_tests/tests/tests.rs index ca383efdb04..6c629b9e55f 100644 --- a/testing/ef_tests/tests/tests.rs +++ b/testing/ef_tests/tests/tests.rs @@ -1183,3 +1183,15 @@ fn compute_columns_for_custody_group() { ComputeColumnsForCustodyGroupHandler::::default().run(); ComputeColumnsForCustodyGroupHandler::::default().run(); } + +#[test] +fn gossip_proposer_slashing() { + GossipValidationHandler::::new("gossip_proposer_slashing").run(); + GossipValidationHandler::::new("gossip_proposer_slashing").run(); +} + +#[test] +fn gossip_attester_slashing() { + GossipValidationHandler::::new("gossip_attester_slashing").run(); + GossipValidationHandler::::new("gossip_attester_slashing").run(); +}