Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 127 additions & 31 deletions beacon_node/http_api/src/beacon/execution_payload_envelope.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use crate::block_id::BlockId;
use crate::publish_blocks::publish_column_sidecars;
use crate::publish_blocks::{check_slashable, publish_column_sidecars};
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use crate::version::{
ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_beacon_response,
};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
use beacon_chain::payload_envelope_verification::EnvelopeError;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer};
use bytes::Bytes;
use eth2::types as api_types;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use eth2::CONSENSUS_VERSION_HEADER;
use eth2::types::{self as api_types, BroadcastValidation};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::{Decode, Encode};
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, warn};
use types::{BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope};
use types::{BlockImportSource, EthSpec, ForkName, SignedExecutionPayloadEnvelope};
use warp::{
Filter, Rejection, Reply,
hyper::{Body, Response},
Expand All @@ -34,27 +36,36 @@ pub(crate) fn post_beacon_execution_payload_envelope_ssz<T: BeaconChainTypes>(
eth_v1
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::header::exact(
CONTENT_TYPE_HEADER,
SSZ_CONTENT_TYPE_HEADER,
))
.and(warp::body::bytes())
.and(warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER))
.and(task_spawner_filter)
.and(chain_filter)
.and(network_tx_filter)
.then(
|body_bytes: Bytes,
|validation_level: api_types::BroadcastValidationQuery,
body: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let envelope =
SignedExecutionPayloadEnvelope::<T::EthSpec>::from_ssz_bytes(&body_bytes)
SignedExecutionPayloadEnvelope::<T::EthSpec>::from_ssz_bytes(&body)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_execution_payload_envelope(envelope, chain, &network_tx).await
warp_utils::reject::custom_bad_request(format!(
"invalid SSZ: {e:?}"
))
})?;
publish_execution_payload_envelope(
envelope,
validation_level.broadcast_validation,
consensus_version,
chain,
&network_tx,
)
.await
})
},
)
Expand All @@ -71,28 +82,42 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
eth_v1
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER))
.and(task_spawner_filter)
.and(chain_filter)
.and(network_tx_filter)
.then(
|envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
|validation_level: api_types::BroadcastValidationQuery,
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_execution_payload_envelope(envelope, chain, &network_tx).await
publish_execution_payload_envelope(
envelope,
validation_level.broadcast_validation,
consensus_version,
chain,
&network_tx,
)
.await
})
},
)
.boxed()
}

/// Publishes a signed execution payload envelope to the network. Implements
/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR
/// `POST /eth/v1/beacon/execution_payload_envelope` per beacon-APIs PR
/// <https://github.com/ethereum/beacon-APIs/pull/580>.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
validation_level: BroadcastValidation,
consensus_version: ForkName,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) -> Result<Response<Body>, Rejection> {
Expand All @@ -109,9 +134,33 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
%slot,
%beacon_block_root,
builder_index = envelope.message.builder_index,
?consensus_version,
?validation_level,
"Publishing signed execution payload envelope to network"
);

// Pre-load the beacon block for the equivocation check below; needs `(slot, proposer_index)`.
let beacon_block_for_eq_check =
if validation_level == BroadcastValidation::ConsensusAndEquivocation {
let block = chain
.get_block(&beacon_block_root)
.await
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"failed to load beacon block for equivocation check: {e:?}"
))
})?
.ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!(
"beacon block {beacon_block_root} not known, \
cannot verify envelope equivocation"
))
})?;
Some(block)
} else {
None
};

let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot);

// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
Expand Down Expand Up @@ -142,16 +191,46 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(

let network_tx_clone = network_tx.clone();
let envelope_for_gossip = gossip_verified.signed_envelope.as_ref().clone();
let publish_fn = || {
let publish_envelope_p2p = || -> Result<(), EnvelopeError> {
crate::utils::publish_pubsub_message(
&network_tx_clone,
PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip)),
PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip.clone())),
)
.map_err(|_| {
beacon_chain::payload_envelope_verification::EnvelopeError::BeaconChainError(Arc::new(
beacon_chain::BeaconChainError::UnableToPublish,
))
})
.map_err(|_| EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::UnableToPublish)))
};

// Tracks whether broadcast occurred so post-import errors map to 202 vs 400.
let publish_fn_completed = Arc::new(AtomicBool::new(false));

// For `gossip` level, broadcast before consensus-verify.
if validation_level == BroadcastValidation::Gossip {
publish_envelope_p2p().map_err(|_| {
warp_utils::reject::custom_server_error("unable to publish to network channel".into())
})?;
publish_fn_completed.store(true, Ordering::SeqCst);
}

let publish_fn = || -> Result<(), EnvelopeError> {
match validation_level {
BroadcastValidation::Gossip => Ok(()),
BroadcastValidation::Consensus => {
publish_envelope_p2p()?;
publish_fn_completed.store(true, Ordering::SeqCst);
Ok(())
}
BroadcastValidation::ConsensusAndEquivocation => {
let block = beacon_block_for_eq_check.as_ref().ok_or_else(|| {
EnvelopeError::InternalError(
"beacon block was not pre-loaded for ConsensusAndEquivocation".into(),
)
})?;
check_slashable(&chain, beacon_block_root, block)
.map_err(EnvelopeError::BlockError)?;
publish_envelope_p2p()?;
publish_fn_completed.store(true, Ordering::SeqCst);
Ok(())
}
}
};

let import_result = chain
Expand All @@ -165,10 +244,27 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
.await;

if let Err(e) = import_result {
warn!(%slot, error = ?e, "Failed to import execution payload envelope");
return Err(warp_utils::reject::custom_server_error(format!(
"envelope import failed: {e}"
)));
return match &e {
EnvelopeError::BeaconChainError(chain_error)
if matches!(chain_error.as_ref(), BeaconChainError::UnableToPublish) =>
{
Err(warp_utils::reject::custom_server_error(
"unable to publish to network channel".into(),
))
}
_ if publish_fn_completed.load(Ordering::SeqCst) => {
warn!(%slot, error = ?e, "Failed to import execution payload envelope after broadcast");
Err(warp_utils::reject::broadcast_without_import(format!(
"envelope import failed: {e}"
)))
}
_ => {
warn!(%slot, error = ?e, "Rejecting execution payload envelope before broadcast");
Err(warp_utils::reject::custom_bad_request(format!(
"envelope rejected: {e}"
)))
}
};
}

// From here on the envelope is on the wire. `take_blobs` already consumed the cache
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ fn late_block_logging<T: BeaconChainTypes, P: AbstractExecPayload<T::EthSpec>>(
}

/// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so.
fn check_slashable<T: BeaconChainTypes>(
pub(crate) fn check_slashable<T: BeaconChainTypes>(
chain_clone: &BeaconChain<T>,
block_root: Hash256,
block_clone: &SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
Expand Down
Loading
Loading