Skip to content

Commit f0875cd

Browse files
committed
Merge remote-tracking branch 'eserilev/add-builder-ssz-flow' into add-ssz-to-pbs
2 parents 24fabca + 7005422 commit f0875cd

9 files changed

Lines changed: 297 additions & 16 deletions

File tree

crates/cli/src/docker_init.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,10 @@ pub async fn handle_docker_init(config_path: PathBuf, output_dir: PathBuf) -> Re
154154

155155
// depends_on
156156
let mut module_dependencies = IndexMap::new();
157-
module_dependencies.insert("cb_signer".into(), DependsCondition {
158-
condition: "service_healthy".into(),
159-
});
157+
module_dependencies.insert(
158+
"cb_signer".into(),
159+
DependsCondition { condition: "service_healthy".into() },
160+
);
160161

161162
Service {
162163
container_name: Some(module_cid.clone()),

crates/common/src/signer/store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ impl ProxyStore {
302302
let entry = entry?;
303303
let path = entry.path();
304304

305-
if !path.is_file() ||
306-
path.extension().is_none_or(|ext| ext != "json")
305+
if !path.is_file()
306+
|| path.extension().is_none_or(|ext| ext != "json")
307307
{
308308
continue;
309309
}
@@ -376,8 +376,8 @@ impl ProxyStore {
376376
let entry = entry?;
377377
let path = entry.path();
378378

379-
if !path.is_file() ||
380-
path.extension().is_none_or(|ext| ext != "json")
379+
if !path.is_file()
380+
|| path.extension().is_none_or(|ext| ext != "json")
381381
{
382382
continue;
383383
}

crates/common/src/types.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,11 +358,14 @@ mod tests {
358358
fn test_load_custom() {
359359
let s = r#"chain = { genesis_time_secs = 1, slot_time_secs = 2, genesis_fork_version = "0x01000000" }"#;
360360
let decoded: MockConfig = toml::from_str(s).unwrap();
361-
assert_eq!(decoded.chain, Chain::Custom {
362-
genesis_time_secs: 1,
363-
slot_time_secs: 2,
364-
genesis_fork_version: [1, 0, 0, 0]
365-
})
361+
assert_eq!(
362+
decoded.chain,
363+
Chain::Custom {
364+
genesis_time_secs: 1,
365+
slot_time_secs: 2,
366+
genesis_fork_version: [1, 0, 0, 0]
367+
}
368+
)
366369
}
367370

368371
#[test]

crates/common/src/utils.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,192 @@ where
603603
}
604604
}
605605

606+
/// Parse ACCEPT header, default to JSON if missing or mal-formatted
607+
pub fn get_accept_header(req_headers: &HeaderMap) -> Accept {
608+
Accept::from_str(
609+
req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or("application/json"),
610+
)
611+
.unwrap_or(Accept::Json)
612+
}
613+
614+
/// Parse CONTENT TYPE header, default to JSON if missing or mal-formatted
615+
pub fn get_content_type_header(req_headers: &HeaderMap) -> ContentType {
616+
ContentType::from_str(
617+
req_headers
618+
.get(CONTENT_TYPE)
619+
.and_then(|value| value.to_str().ok())
620+
.unwrap_or("application/json"),
621+
)
622+
.unwrap_or(ContentType::Json)
623+
}
624+
625+
/// Parse CONSENSUS_VERSION header
626+
pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option<ForkName> {
627+
ForkName::from_str(
628+
req_headers
629+
.get(CONSENSUS_VERSION_HEADER)
630+
.and_then(|value| value.to_str().ok())
631+
.unwrap_or(""),
632+
)
633+
.ok()
634+
}
635+
636+
#[derive(Debug, Clone, Copy, PartialEq)]
637+
pub enum ForkName {
638+
Deneb,
639+
Electra,
640+
}
641+
642+
impl std::fmt::Display for ForkName {
643+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
644+
match self {
645+
ForkName::Deneb => write!(f, "deneb"),
646+
ForkName::Electra => write!(f, "electra"),
647+
}
648+
}
649+
}
650+
651+
impl FromStr for ForkName {
652+
type Err = String;
653+
fn from_str(value: &str) -> Result<Self, Self::Err> {
654+
match value {
655+
"deneb" => Ok(ForkName::Deneb),
656+
"electra" => Ok(ForkName::Electra),
657+
_ => Err(format!("Invalid fork name {}", value)),
658+
}
659+
}
660+
}
661+
662+
#[derive(Debug, Clone, Copy, PartialEq)]
663+
pub enum ContentType {
664+
Json,
665+
Ssz,
666+
}
667+
668+
impl std::fmt::Display for ContentType {
669+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670+
match self {
671+
ContentType::Json => write!(f, "application/json"),
672+
ContentType::Ssz => write!(f, "application/octet-stream"),
673+
}
674+
}
675+
}
676+
677+
impl FromStr for ContentType {
678+
type Err = String;
679+
fn from_str(value: &str) -> Result<Self, Self::Err> {
680+
match value {
681+
"application/json" => Ok(ContentType::Json),
682+
"application/octet-stream" => Ok(ContentType::Ssz),
683+
_ => Ok(ContentType::Json),
684+
}
685+
}
686+
}
687+
688+
#[derive(Debug, Clone, Copy, PartialEq)]
689+
pub enum Accept {
690+
Json,
691+
Ssz,
692+
Any,
693+
}
694+
695+
impl fmt::Display for Accept {
696+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697+
match self {
698+
Accept::Ssz => write!(f, "application/octet-stream"),
699+
Accept::Json => write!(f, "application/json"),
700+
Accept::Any => write!(f, "*/*"),
701+
}
702+
}
703+
}
704+
705+
impl FromStr for Accept {
706+
type Err = String;
707+
708+
fn from_str(s: &str) -> Result<Self, Self::Err> {
709+
let media_type_list = MediaTypeList::new(s);
710+
711+
// [q-factor weighting]: https://datatracker.ietf.org/doc/html/rfc7231#section-5.3.2
712+
// find the highest q-factor supported accept type
713+
let mut highest_q = 0_u16;
714+
let mut accept_type = None;
715+
716+
const APPLICATION: &str = names::APPLICATION.as_str();
717+
const OCTET_STREAM: &str = names::OCTET_STREAM.as_str();
718+
const JSON: &str = names::JSON.as_str();
719+
const STAR: &str = names::_STAR.as_str();
720+
const Q: &str = names::Q.as_str();
721+
722+
media_type_list.into_iter().for_each(|item| {
723+
if let Ok(MediaType { ty, subty, suffix: _, params }) = item {
724+
let q_accept = match (ty.as_str(), subty.as_str()) {
725+
(APPLICATION, OCTET_STREAM) => Some(Accept::Ssz),
726+
(APPLICATION, JSON) => Some(Accept::Json),
727+
(STAR, STAR) => Some(Accept::Any),
728+
_ => None,
729+
}
730+
.map(|item_accept_type| {
731+
let q_val = params
732+
.iter()
733+
.find_map(|(n, v)| match n.as_str() {
734+
Q => {
735+
Some((v.as_str().parse::<f32>().unwrap_or(0_f32) * 1000_f32) as u16)
736+
}
737+
_ => None,
738+
})
739+
.or(Some(1000_u16));
740+
741+
(q_val.unwrap(), item_accept_type)
742+
});
743+
744+
match q_accept {
745+
Some((q, accept)) if q > highest_q => {
746+
highest_q = q;
747+
accept_type = Some(accept);
748+
}
749+
_ => (),
750+
}
751+
}
752+
});
753+
accept_type.ok_or_else(|| "accept header is not supported".to_string())
754+
}
755+
}
756+
757+
#[must_use]
758+
#[derive(Debug, Clone, Copy, Default)]
759+
pub struct JsonOrSsz<T>(pub T);
760+
761+
impl<T, S> FromRequest<S> for JsonOrSsz<T>
762+
where
763+
T: serde::de::DeserializeOwned + ssz::Decode + 'static,
764+
S: Send + Sync,
765+
{
766+
type Rejection = Response;
767+
768+
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
769+
let headers = req.headers().clone();
770+
let content_type = headers.get(CONTENT_TYPE).and_then(|value| value.to_str().ok());
771+
772+
let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?;
773+
774+
if let Some(content_type) = content_type {
775+
if content_type.starts_with(&ContentType::Json.to_string()) {
776+
let payload: T = serde_json::from_slice(&bytes)
777+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
778+
return Ok(Self(payload));
779+
}
780+
781+
if content_type.starts_with(&ContentType::Ssz.to_string()) {
782+
let payload = T::from_ssz_bytes(&bytes)
783+
.map_err(|_| StatusCode::BAD_REQUEST.into_response())?;
784+
return Ok(Self(payload));
785+
}
786+
}
787+
788+
Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
789+
}
790+
}
791+
606792
#[cfg(unix)]
607793
pub async fn wait_for_signal() -> eyre::Result<()> {
608794
use tokio::signal::unix::{SignalKind, signal};

crates/pbs/src/mev_boost/submit_block.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ fn validate_unblinded_block_electra(
208208
let blobs = &block_response.blobs_bundle;
209209

210210
let expected_commitments = &signed_blinded_block.body.blob_kzg_commitments;
211-
if expected_commitments.len() != blobs.blobs.len() ||
212-
expected_commitments.len() != blobs.commitments.len() ||
213-
expected_commitments.len() != blobs.proofs.len()
211+
if expected_commitments.len() != blobs.blobs.len()
212+
|| expected_commitments.len() != blobs.commitments.len()
213+
|| expected_commitments.len() != blobs.proofs.len()
214214
{
215215
return Err(PbsError::Validation(ValidationError::KzgCommitments {
216216
expected_blobs: expected_commitments.len(),

crates/pbs/src/routes/get_header.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(
4141
Ok(res) => {
4242
if let Some(max_bid) = res {
4343
info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header");
44-
4544
BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc();
4645
let response = match accept_header {
4746
Accept::Ssz => {

tests/src/mock_validator.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ impl MockValidator {
115115
ContentType::Ssz => signed_blinded_block.as_ssz_bytes(),
116116
};
117117

118+
let signed_blinded_block = signed_blinded_block_opt.unwrap_or_default();
119+
120+
let body = match content_type {
121+
ContentType::Json => serde_json::to_vec(&signed_blinded_block).unwrap(),
122+
ContentType::Ssz => signed_blinded_block.as_ssz_bytes(),
123+
};
124+
118125
Ok(self
119126
.comm_boost
120127
.client

tests/tests/pbs_get_header.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,50 @@ async fn test_get_header_ssz() -> Result<()> {
108108
Ok(())
109109
}
110110

111+
#[tokio::test]
112+
async fn test_get_header_ssz() -> Result<()> {
113+
setup_test_env();
114+
let signer = random_secret();
115+
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();
116+
117+
let chain = Chain::Holesky;
118+
let pbs_port = 3200;
119+
let relay_port = pbs_port + 1;
120+
121+
// Run a mock relay
122+
let mock_state = Arc::new(MockRelayState::new(chain, signer));
123+
let mock_relay = generate_mock_relay(relay_port, *pubkey)?;
124+
tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port));
125+
126+
// Run the PBS service
127+
let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]);
128+
let state = PbsState::new(config);
129+
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));
130+
131+
// leave some time to start servers
132+
tokio::time::sleep(Duration::from_millis(100)).await;
133+
134+
let mock_validator = MockValidator::new(pbs_port)?;
135+
info!("Sending get header");
136+
let res = mock_validator.do_get_header(None, Some(Accept::Ssz), ForkName::Electra).await?;
137+
assert_eq!(res.status(), StatusCode::OK);
138+
139+
let res: SignedExecutionPayloadHeader<ExecutionPayloadHeaderMessageElectra> =
140+
SignedExecutionPayloadHeader::from_ssz_bytes(&res.bytes().await?).unwrap();
141+
142+
assert_eq!(mock_state.received_get_header(), 1);
143+
assert_eq!(res.message.header.block_hash.0[0], 1);
144+
assert_eq!(res.message.header.parent_hash, B256::ZERO);
145+
assert_eq!(res.message.value, U256::from(10));
146+
assert_eq!(res.message.pubkey, blst_pubkey_to_alloy(&mock_state.signer.sk_to_pk()));
147+
assert_eq!(res.message.header.timestamp, timestamp_of_slot_start_sec(0, chain));
148+
assert_eq!(
149+
res.signature,
150+
sign_builder_root(chain, &mock_state.signer, res.message.tree_hash_root().0)
151+
);
152+
Ok(())
153+
}
154+
111155
#[tokio::test]
112156
async fn test_get_header_returns_204_if_relay_down() -> Result<()> {
113157
setup_test_env();

tests/tests/pbs_post_blinded_blocks.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,47 @@ async fn test_submit_block_v2_ssz() -> Result<()> {
5757
Ok(())
5858
}
5959

60+
#[tokio::test]
61+
async fn test_submit_block_ssz() -> Result<()> {
62+
setup_test_env();
63+
let signer = random_secret();
64+
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();
65+
66+
let chain = Chain::Holesky;
67+
let pbs_port = 3800;
68+
69+
// Run a mock relay
70+
let relays = vec![generate_mock_relay(pbs_port + 1, *pubkey)?];
71+
let mock_state = Arc::new(MockRelayState::new(chain, signer));
72+
tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1));
73+
74+
// Run the PBS service
75+
let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays);
76+
let state = PbsState::new(config);
77+
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));
78+
79+
// leave some time to start servers
80+
tokio::time::sleep(Duration::from_millis(100)).await;
81+
82+
let mock_validator = MockValidator::new(pbs_port)?;
83+
info!("Sending submit block");
84+
let res = mock_validator
85+
.do_submit_block(
86+
Some(SignedBlindedBeaconBlock::default()),
87+
Accept::Ssz,
88+
ContentType::Ssz,
89+
ForkName::Electra,
90+
)
91+
.await?;
92+
93+
assert_eq!(res.status(), StatusCode::OK);
94+
assert_eq!(mock_state.received_submit_block(), 1);
95+
96+
let response_body = PayloadAndBlobsElectra::from_ssz_bytes(&res.bytes().await?).unwrap();
97+
assert_eq!(response_body.block_hash(), SubmitBlindedBlockResponse::default().block_hash());
98+
Ok(())
99+
}
100+
60101
#[tokio::test]
61102
async fn test_submit_block_too_large() -> Result<()> {
62103
setup_test_env();

0 commit comments

Comments
 (0)