diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 65e1a838402..41e2cd07be6 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -1,5 +1,5 @@ use crate::block_id::BlockId; -use crate::publish_blocks::publish_column_sidecars; +use crate::publish_blocks::{check_slashable, publish_column_sidecars}; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter}; use crate::version::{ @@ -7,18 +7,20 @@ use crate::version::{ execution_optimistic_finalized_beacon_response, }; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; -use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer}; +use beacon_chain::payload_envelope_verification::EnvelopeError; +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer}; use bytes::Bytes; -use eth2::types as api_types; -use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; +use eth2::CONSENSUS_VERSION_HEADER; +use eth2::types::{self as api_types, BroadcastValidation}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use ssz::{Decode, Encode}; use std::future::Future; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error, info, warn}; -use types::{BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope}; +use types::{BlockImportSource, EthSpec, ForkName, SignedExecutionPayloadEnvelope}; use warp::{ Filter, Rejection, Reply, hyper::{Body, Response}, @@ -34,27 +36,36 @@ pub(crate) fn post_beacon_execution_payload_envelope_ssz( eth_v1 .and(warp::path("beacon")) .and(warp::path("execution_payload_envelope")) + .and(warp::query::()) .and(warp::path::end()) - .and(warp::header::exact( - CONTENT_TYPE_HEADER, - SSZ_CONTENT_TYPE_HEADER, - )) .and(warp::body::bytes()) + .and(warp::header::header::(CONSENSUS_VERSION_HEADER)) .and(task_spawner_filter) .and(chain_filter) .and(network_tx_filter) .then( - |body_bytes: Bytes, + |validation_level: api_types::BroadcastValidationQuery, + body: Bytes, + consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let envelope = - SignedExecutionPayloadEnvelope::::from_ssz_bytes(&body_bytes) + SignedExecutionPayloadEnvelope::::from_ssz_bytes(&body) .map_err(|e| { - warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) - })?; - publish_execution_payload_envelope(envelope, chain, &network_tx).await + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + })?; + publish_execution_payload_envelope( + envelope, + validation_level.broadcast_validation, + consensus_version, + chain, + &network_tx, + ) + .await }) }, ) @@ -71,28 +82,42 @@ pub(crate) fn post_beacon_execution_payload_envelope( eth_v1 .and(warp::path("beacon")) .and(warp::path("execution_payload_envelope")) + .and(warp::query::()) .and(warp::path::end()) .and(warp::body::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) + .and(warp::header::header::(CONSENSUS_VERSION_HEADER)) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) .then( - |envelope: SignedExecutionPayloadEnvelope, + |validation_level: api_types::BroadcastValidationQuery, + envelope: SignedExecutionPayloadEnvelope, + consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { - publish_execution_payload_envelope(envelope, chain, &network_tx).await + publish_execution_payload_envelope( + envelope, + validation_level.broadcast_validation, + consensus_version, + chain, + &network_tx, + ) + .await }) }, ) .boxed() } + /// Publishes a signed execution payload envelope to the network. Implements -/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR +/// `POST /eth/v1/beacon/execution_payload_envelope` per beacon-APIs PR /// . pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, + validation_level: BroadcastValidation, + consensus_version: ForkName, chain: Arc>, network_tx: &UnboundedSender>, ) -> Result, Rejection> { @@ -109,9 +134,33 @@ pub async fn publish_execution_payload_envelope( %slot, %beacon_block_root, builder_index = envelope.message.builder_index, + ?consensus_version, + ?validation_level, "Publishing signed execution payload envelope to network" ); + // Pre-load the beacon block for the equivocation check below; needs `(slot, proposer_index)`. + let beacon_block_for_eq_check = + if validation_level == BroadcastValidation::ConsensusAndEquivocation { + let block = chain + .get_block(&beacon_block_root) + .await + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "failed to load beacon block for equivocation check: {e:?}" + )) + })? + .ok_or_else(|| { + warp_utils::reject::custom_bad_request(format!( + "beacon block {beacon_block_root} not known, \ + cannot verify envelope equivocation" + )) + })?; + Some(block) + } else { + None + }; + let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot); // Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before @@ -142,16 +191,46 @@ pub async fn publish_execution_payload_envelope( let network_tx_clone = network_tx.clone(); let envelope_for_gossip = gossip_verified.signed_envelope.as_ref().clone(); - let publish_fn = || { + let publish_envelope_p2p = || -> Result<(), EnvelopeError> { crate::utils::publish_pubsub_message( &network_tx_clone, - PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip)), + PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip.clone())), ) - .map_err(|_| { - beacon_chain::payload_envelope_verification::EnvelopeError::BeaconChainError(Arc::new( - beacon_chain::BeaconChainError::UnableToPublish, - )) - }) + .map_err(|_| EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::UnableToPublish))) + }; + + // Tracks whether broadcast occurred so post-import errors map to 202 vs 400. + let publish_fn_completed = Arc::new(AtomicBool::new(false)); + + // For `gossip` level, broadcast before consensus-verify. + if validation_level == BroadcastValidation::Gossip { + publish_envelope_p2p().map_err(|_| { + warp_utils::reject::custom_server_error("unable to publish to network channel".into()) + })?; + publish_fn_completed.store(true, Ordering::SeqCst); + } + + let publish_fn = || -> Result<(), EnvelopeError> { + match validation_level { + BroadcastValidation::Gossip => Ok(()), + BroadcastValidation::Consensus => { + publish_envelope_p2p()?; + publish_fn_completed.store(true, Ordering::SeqCst); + Ok(()) + } + BroadcastValidation::ConsensusAndEquivocation => { + let block = beacon_block_for_eq_check.as_ref().ok_or_else(|| { + EnvelopeError::InternalError( + "beacon block was not pre-loaded for ConsensusAndEquivocation".into(), + ) + })?; + check_slashable(&chain, beacon_block_root, block) + .map_err(EnvelopeError::BlockError)?; + publish_envelope_p2p()?; + publish_fn_completed.store(true, Ordering::SeqCst); + Ok(()) + } + } }; let import_result = chain @@ -165,10 +244,27 @@ pub async fn publish_execution_payload_envelope( .await; if let Err(e) = import_result { - warn!(%slot, error = ?e, "Failed to import execution payload envelope"); - return Err(warp_utils::reject::custom_server_error(format!( - "envelope import failed: {e}" - ))); + return match &e { + EnvelopeError::BeaconChainError(chain_error) + if matches!(chain_error.as_ref(), BeaconChainError::UnableToPublish) => + { + Err(warp_utils::reject::custom_server_error( + "unable to publish to network channel".into(), + )) + } + _ if publish_fn_completed.load(Ordering::SeqCst) => { + warn!(%slot, error = ?e, "Failed to import execution payload envelope after broadcast"); + Err(warp_utils::reject::broadcast_without_import(format!( + "envelope import failed: {e}" + ))) + } + _ => { + warn!(%slot, error = ?e, "Rejecting execution payload envelope before broadcast"); + Err(warp_utils::reject::custom_bad_request(format!( + "envelope rejected: {e}" + ))) + } + }; } // From here on the envelope is on the wire. `take_blobs` already consumed the cache diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 644ade956af..b1119de65e1 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -821,7 +821,7 @@ fn late_block_logging>( } /// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so. -fn check_slashable( +pub(crate) fn check_slashable( chain_clone: &BeaconChain, block_root: Hash256, block_clone: &SignedBeaconBlock>, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index a7fe34593a7..d13a081fc29 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -4389,7 +4389,7 @@ impl ApiTester { let signed_envelope = self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); self.client - .post_beacon_execution_payload_envelope(&signed_envelope, fork_name) + .post_beacon_execution_payload_envelope(&signed_envelope, fork_name, None) .await .unwrap(); @@ -4450,7 +4450,7 @@ impl ApiTester { let signed_envelope = self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); self.client - .post_beacon_execution_payload_envelope_ssz(&signed_envelope, fork_name) + .post_beacon_execution_payload_envelope_ssz(&signed_envelope, fork_name, None) .await .unwrap(); @@ -4833,6 +4833,199 @@ impl ApiTester { self } + fn advance_to_gloas_slot(&self) -> Option<(Slot, Epoch, ForkName)> { + for _ in 0..E::slots_per_epoch() * 3 { + let slot = self.chain.slot().unwrap(); + let fork_name = self.chain.spec.fork_name_at_slot::(slot); + if fork_name.gloas_enabled() { + return Some((slot, self.chain.epoch().unwrap(), fork_name)); + } + self.chain.slot_clock.set_slot(slot.as_u64() + 1); + } + None + } + + async fn build_and_post_block_for_envelope( + &self, + slot: Slot, + epoch: Epoch, + fork: &Fork, + genesis_validators_root: Hash256, + ) -> (SecretKey, u64, ExecutionPayloadEnvelope) { + let (sk, randao_reveal) = self + .proposer_setup(slot, epoch, fork, genesis_validators_root) + .await; + + let (response, _metadata) = self + .client + .get_validator_blocks_v4::(slot, &randao_reveal, None, None, None, None) + .await + .unwrap(); + let block = response.data; + let proposer_index = block.proposer_index(); + + let signed_block = block.sign(&sk, fork, genesis_validators_root, &self.chain.spec); + let signed_block_request = PublishBlockRequest::try_from(Arc::new(signed_block)).unwrap(); + self.client + .post_beacon_blocks_v2(&signed_block_request, None) + .await + .unwrap(); + + let envelope = self + .client + .get_validator_execution_payload_envelope::(slot) + .await + .unwrap() + .data; + + (sk, proposer_index, envelope) + } + + pub async fn test_envelope_post_consensus_invalid_returns_400_no_broadcast(mut self) -> Self { + if !self.chain.spec.is_gloas_scheduled() { + return self; + } + + let fork = self.chain.canonical_head.cached_head().head_fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let Some((slot, epoch, fork_name)) = self.advance_to_gloas_slot() else { + return self; + }; + + let (sk, _proposer_index, mut envelope) = self + .build_and_post_block_for_envelope(slot, epoch, &fork, genesis_validators_root) + .await; + + // Set `gas_limit` to a value that cannot match the committed bid (consensus-only check). + envelope.payload.gas_limit = envelope.payload.gas_limit.saturating_add(1); + let signed_envelope = + self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); + + while self.network_rx.network_recv.recv().now_or_never().is_some() {} + + let result = self + .client + .post_beacon_execution_payload_envelope( + &signed_envelope, + fork_name, + Some(BroadcastValidation::Consensus), + ) + .await; + + let err = result.expect_err("expected error on consensus-invalid envelope POST"); + assert_eq!(err.status(), Some(StatusCode::BAD_REQUEST)); + assert!( + self.network_rx.network_recv.recv().now_or_never().is_none(), + "envelope must not be broadcast when consensus validation fails" + ); + + self + } + + pub async fn test_envelope_post_gossip_partial_pass_returns_202(mut self) -> Self { + if !self.chain.spec.is_gloas_scheduled() { + return self; + } + + let fork = self.chain.canonical_head.cached_head().head_fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let Some((slot, epoch, fork_name)) = self.advance_to_gloas_slot() else { + return self; + }; + + let (sk, _proposer_index, mut envelope) = self + .build_and_post_block_for_envelope(slot, epoch, &fork, genesis_validators_root) + .await; + + // Set `gas_limit` to a value that cannot match the committed bid (consensus-only check). + envelope.payload.gas_limit = envelope.payload.gas_limit.saturating_add(1); + let signed_envelope = + self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); + + while self.network_rx.network_recv.recv().now_or_never().is_some() {} + + // The eth2 client maps any 2xx (including 202) to `Ok`, which would erase the + // distinction we want to verify. Issue the request directly to assert the raw + // status code. + let url = self + .client + .post_beacon_execution_payload_envelope_path(Some(BroadcastValidation::Gossip)) + .unwrap(); + let response = reqwest::Client::new() + .post(url) + .header("Content-Type", "application/json") + .header(eth2::CONSENSUS_VERSION_HEADER, fork_name.to_string()) + .json(&signed_envelope) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::ACCEPTED); + assert!( + self.network_rx.network_recv.recv().now_or_never().is_some(), + "envelope must be broadcast at gossip level even when consensus fails" + ); + + self + } + + pub async fn test_envelope_post_equivocation_returns_400(mut self) -> Self { + if !self.chain.spec.is_gloas_scheduled() { + return self; + } + + let fork = self.chain.canonical_head.cached_head().head_fork(); + let genesis_validators_root = self.chain.genesis_validators_root; + + let Some((slot, epoch, fork_name)) = self.advance_to_gloas_slot() else { + return self; + }; + + let (sk, proposer_index, envelope) = self + .build_and_post_block_for_envelope(slot, epoch, &fork, genesis_validators_root) + .await; + let signed_envelope = + self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); + + // Simulate an equivocation: a different block_root observed at the same + // (slot, proposer). `is_slashable` will then return true for the real block_root. + self.chain + .observed_slashable + .write() + .observe_slashable(slot, proposer_index, Hash256::repeat_byte(0xee)) + .unwrap(); + + while self.network_rx.network_recv.recv().now_or_never().is_some() {} + + let result = self + .client + .post_beacon_execution_payload_envelope( + &signed_envelope, + fork_name, + Some(BroadcastValidation::ConsensusAndEquivocation), + ) + .await; + + let err = result.expect_err("expected 400 on equivocating envelope"); + assert_eq!(err.status(), Some(StatusCode::BAD_REQUEST)); + match err { + ServerMessage(msg) => assert!( + msg.message.contains("Slashable"), + "expected error to mention Slashable, got: {}", + msg.message, + ), + other => panic!("expected ServerMessage, got {other:?}"), + } + assert!( + self.network_rx.network_recv.recv().now_or_never().is_none(), + "envelope must not be broadcast when equivocation is detected" + ); + + self + } + /// Regression test: publishing an envelope via the HTTP API must import it locally so /// that `produce_payload_attestation_data` returns `payload_present = true`. Without /// local import, the `envelope_times_cache` is never populated and PTC voters on the @@ -4887,7 +5080,7 @@ impl ApiTester { let signed_envelope = self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root); self.client - .post_beacon_execution_payload_envelope(&signed_envelope, fork_name) + .post_beacon_execution_payload_envelope(&signed_envelope, fork_name, None) .await .unwrap(); @@ -8640,6 +8833,39 @@ async fn payload_attestation_present_after_envelope_publish() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn envelope_post_consensus_invalid_returns_400_no_broadcast() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .test_envelope_post_consensus_invalid_returns_400_no_broadcast() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn envelope_post_gossip_partial_pass_returns_202() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .test_envelope_post_gossip_partial_pass_returns_202() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn envelope_post_equivocation_returns_400() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + ApiTester::new_with_hard_forks() + .await + .test_envelope_post_equivocation_returns_400() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_pool_payload_attestations_valid() { if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index e9fb44209ba..cd98af4ea65 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -2790,21 +2790,33 @@ impl BeaconNodeHttpClient { ExecutionPayloadEnvelope::from_ssz_bytes(&response_bytes).map_err(Error::InvalidSsz) } - /// `POST v1/beacon/execution_payload_envelope` - pub async fn post_beacon_execution_payload_envelope( + pub fn post_beacon_execution_payload_envelope_path( &self, - envelope: &SignedExecutionPayloadEnvelope, - fork_name: ForkName, - ) -> Result<(), Error> { + validation_level: Option, + ) -> Result { let mut path = self.eth_path(V1)?; - path.path_segments_mut() .map_err(|()| Error::InvalidUrl(self.server.clone()))? .push("beacon") .push("execution_payload_envelope"); + if let Some(validation_level) = validation_level { + path.query_pairs_mut() + .append_pair("broadcast_validation", &validation_level.to_string()); + } + + Ok(path) + } + + /// `POST v1/beacon/execution_payload_envelope` + pub async fn post_beacon_execution_payload_envelope( + &self, + envelope: &SignedExecutionPayloadEnvelope, + fork_name: ForkName, + validation_level: Option, + ) -> Result<(), Error> { self.post_generic_with_consensus_version( - path, + self.post_beacon_execution_payload_envelope_path(validation_level)?, envelope, Some(self.timeouts.proposal), fork_name, @@ -2819,16 +2831,10 @@ impl BeaconNodeHttpClient { &self, envelope: &SignedExecutionPayloadEnvelope, fork_name: ForkName, + validation_level: Option, ) -> Result<(), Error> { - let mut path = self.eth_path(V1)?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("beacon") - .push("execution_payload_envelope"); - self.post_generic_with_consensus_version_and_ssz_body( - path, + self.post_beacon_execution_payload_envelope_path(validation_level)?, envelope.as_ssz_bytes(), Some(self.timeouts.proposal), fork_name, diff --git a/validator_client/validator_services/src/block_service.rs b/validator_client/validator_services/src/block_service.rs index 1dd1878f4c3..6b32800f016 100644 --- a/validator_client/validator_services/src/block_service.rs +++ b/validator_client/validator_services/src/block_service.rs @@ -702,7 +702,11 @@ impl BlockService { let signed_envelope = signed_envelope.clone(); async move { beacon_node - .post_beacon_execution_payload_envelope_ssz(&signed_envelope, fork_name) + .post_beacon_execution_payload_envelope_ssz( + &signed_envelope, + fork_name, + None, + ) .await .map_err(|e| { BlockError::Recoverable(format!(