Skip to content

Commit 06a3092

Browse files
committed
Merge branch 'lt/get-payload-v2-on-all' into add-ssz-to-pbs
2 parents a287e96 + de99bb9 commit 06a3092

10 files changed

Lines changed: 85 additions & 48 deletions

File tree

crates/common/src/config/mux.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ async fn fetch_lido_registry_keys(
369369
}
370370

371371
async fn fetch_ssv_pubkeys(
372-
api_url: Url,
372+
mut api_url: Url,
373373
chain: Chain,
374374
node_operator_id: U256,
375375
http_timeout: Duration,
@@ -386,6 +386,15 @@ async fn fetch_ssv_pubkeys(
386386
let mut pubkeys: Vec<BlsPublicKey> = vec![];
387387
let mut page = 1;
388388

389+
// Validate the URL - this appends a trailing slash if missing as efficiently as
390+
// possible
391+
if !api_url.path().ends_with('/') {
392+
match api_url.path_segments_mut() {
393+
Ok(mut segments) => segments.push(""), // Analogous to a trailing slash
394+
Err(_) => bail!("SSV API URL is not a valid base URL"),
395+
};
396+
}
397+
389398
loop {
390399
let route = format!(
391400
"{chain_name}/validators/in_operator/{node_operator_id}?perPage={MAX_PER_PAGE}&page={page}",

crates/common/src/config/pbs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,5 +404,5 @@ pub async fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleC
404404

405405
/// Default URL for the SSV network API
406406
fn default_ssv_api_url() -> Url {
407-
Url::parse("https://api.ssv.network/api/v4").expect("default URL is valid")
407+
Url::parse("https://api.ssv.network/api/v4/").expect("default URL is valid")
408408
}

crates/common/src/pbs/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub enum PbsError {
2525

2626
#[error("URL parsing error: {0}")]
2727
UrlParsing(#[from] url::ParseError),
28+
29+
#[error("tokio join error: {0}")]
30+
TokioJoinError(#[from] tokio::task::JoinError),
2831
}
2932

3033
impl PbsError {

crates/pbs/src/api.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use async_trait::async_trait;
24
use axum::{Router, http::HeaderMap};
35
use cb_common::pbs::{
@@ -34,10 +36,10 @@ pub trait BuilderApi<S: BuilderApiState>: 'static {
3436
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock and
3537
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2
3638
async fn submit_block(
37-
signed_blinded_block: SignedBlindedBeaconBlock,
39+
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
3840
req_headers: HeaderMap,
3941
state: PbsState<S>,
40-
api_version: &BuilderApiVersion,
42+
api_version: BuilderApiVersion,
4143
) -> eyre::Result<Option<SubmitBlindedBlockResponse>> {
4244
mev_boost::submit_block(signed_blinded_block, req_headers, state, api_version).await
4345
}

crates/pbs/src/mev_boost/register_validator.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use cb_common::{
77
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
88
};
99
use eyre::bail;
10-
use futures::future::{join_all, select_ok};
10+
use futures::{
11+
FutureExt,
12+
future::{join_all, select_ok},
13+
};
1114
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
1215
use tracing::{Instrument, debug, error};
1316
use url::Url;
@@ -49,32 +52,38 @@ pub async fn register_validator<S: BuilderApiState>(
4952

5053
for (n_regs, body) in bodies {
5154
for relay in state.all_relays().iter().cloned() {
52-
handles.push(tokio::spawn(
53-
send_register_validator_with_timeout(
54-
n_regs,
55-
body.clone(),
56-
relay,
57-
send_headers.clone(),
58-
state.pbs_config().timeout_register_validator_ms,
59-
state.pbs_config().register_validator_retry_limit,
55+
handles.push(
56+
tokio::spawn(
57+
send_register_validator_with_timeout(
58+
n_regs,
59+
body.clone(),
60+
relay,
61+
send_headers.clone(),
62+
state.pbs_config().timeout_register_validator_ms,
63+
state.pbs_config().register_validator_retry_limit,
64+
)
65+
.in_current_span(),
6066
)
61-
.in_current_span(),
62-
));
67+
.map(|join_result| match join_result {
68+
Ok(res) => res,
69+
Err(err) => Err(PbsError::TokioJoinError(err)),
70+
}),
71+
);
6372
}
6473
}
6574

6675
if state.pbs_config().wait_all_registrations {
6776
// wait for all relays registrations to complete
6877
let results = join_all(handles).await;
69-
if results.into_iter().any(|res| res.is_ok_and(|res| res.is_ok())) {
78+
if results.into_iter().any(|res| res.is_ok()) {
7079
Ok(())
7180
} else {
7281
bail!("No relay passed register_validator successfully")
7382
}
7483
} else {
7584
// return once first completes, others proceed in background
76-
let result = select_ok(handles).await?;
77-
match result.0 {
85+
let result = select_ok(handles).await;
86+
match result {
7887
Ok(_) => Ok(()),
7988
Err(_) => bail!("No relay passed register_validator successfully"),
8089
}

crates/pbs/src/mev_boost/submit_block.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
str::FromStr,
3+
sync::Arc,
34
time::{Duration, Instant},
45
};
56

@@ -14,7 +15,7 @@ use cb_common::{
1415
},
1516
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
1617
};
17-
use futures::future::select_ok;
18+
use futures::{FutureExt, future::select_ok};
1819
use reqwest::header::USER_AGENT;
1920
use tracing::{debug, warn};
2021
use url::Url;
@@ -31,10 +32,10 @@ use crate::{
3132
/// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to
3233
/// distinguish between the two.
3334
pub async fn submit_block<S: BuilderApiState>(
34-
signed_blinded_block: SignedBlindedBeaconBlock,
35+
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
3536
req_headers: HeaderMap,
3637
state: PbsState<S>,
37-
api_version: &BuilderApiVersion,
38+
api_version: BuilderApiVersion,
3839
) -> eyre::Result<Option<SubmitBlindedBlockResponse>> {
3940
debug!(?req_headers, "received headers");
4041

@@ -58,17 +59,22 @@ pub async fn submit_block<S: BuilderApiState>(
5859
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);
5960
send_headers.insert(HEADER_CONSENSUS_VERSION, consensus_version);
6061

61-
let relays = state.all_relays();
62-
let mut handles = Vec::with_capacity(relays.len());
63-
for relay in relays.iter() {
64-
handles.push(Box::pin(submit_block_with_timeout(
65-
&signed_blinded_block,
66-
relay,
67-
send_headers.clone(),
68-
state.pbs_config().timeout_get_payload_ms,
69-
api_version,
70-
fork_name,
71-
)));
62+
let mut handles = Vec::with_capacity(state.all_relays().len());
63+
for relay in state.all_relays().iter().cloned() {
64+
handles.push(
65+
tokio::spawn(submit_block_with_timeout(
66+
signed_blinded_block.clone(),
67+
relay,
68+
send_headers.clone(),
69+
state.pbs_config().timeout_get_payload_ms,
70+
api_version,
71+
fork_name,
72+
))
73+
.map(|join_result| match join_result {
74+
Ok(res) => res,
75+
Err(err) => Err(PbsError::TokioJoinError(err)),
76+
}),
77+
);
7278
}
7379

7480
let results = select_ok(handles).await;
@@ -81,14 +87,14 @@ pub async fn submit_block<S: BuilderApiState>(
8187
/// Submit blinded block to relay, retry connection errors until the
8288
/// given timeout has passed
8389
async fn submit_block_with_timeout(
84-
signed_blinded_block: &SignedBlindedBeaconBlock,
85-
relay: &RelayClient,
90+
signed_blinded_block: Arc<SignedBlindedBeaconBlock>,
91+
relay: RelayClient,
8692
headers: HeaderMap,
8793
timeout_ms: u64,
88-
api_version: &BuilderApiVersion,
94+
api_version: BuilderApiVersion,
8995
fork_name: ForkName,
9096
) -> Result<Option<SubmitBlindedBlockResponse>, PbsError> {
91-
let mut url = relay.submit_block_url(*api_version)?;
97+
let mut url = relay.submit_block_url(api_version)?;
9298
let mut remaining_timeout_ms = timeout_ms;
9399
let mut retry = 0;
94100
let mut backoff = Duration::from_millis(250);
@@ -97,12 +103,12 @@ async fn submit_block_with_timeout(
97103
let start_request = Instant::now();
98104
match send_submit_block(
99105
url.clone(),
100-
signed_blinded_block,
101-
relay,
106+
&signed_blinded_block,
107+
&relay,
102108
headers.clone(),
103109
remaining_timeout_ms,
104110
retry,
105-
api_version,
111+
&api_version,
106112
fork_name,
107113
)
108114
.await

crates/pbs/src/routes/submit_block.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use axum::{
24
Json,
35
extract::State,
@@ -45,11 +47,11 @@ async fn handle_submit_block_impl<S: BuilderApiState, A: BuilderApi<S>>(
4547
raw_request: RawRequest,
4648
api_version: BuilderApiVersion,
4749
) -> Result<impl IntoResponse, PbsClientError> {
48-
let signed_blinded_block =
50+
let signed_blinded_block = Arc::new(
4951
deserialize_body(&req_headers, raw_request.body_bytes).await.map_err(|e| {
5052
error!(%e, "failed to deserialize signed blinded block");
5153
PbsClientError::DecodeError(format!("failed to deserialize body: {e}"))
52-
})?;
54+
})?);
5355
tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64);
5456
tracing::Span::current()
5557
.record("block_hash", tracing::field::debug(signed_blinded_block.block_hash()));
@@ -75,7 +77,7 @@ async fn handle_submit_block_impl<S: BuilderApiState, A: BuilderApi<S>>(
7577

7678
info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request");
7779

78-
match A::submit_block(signed_blinded_block, req_headers, state, &api_version).await {
80+
match A::submit_block(signed_blinded_block, req_headers, state, api_version).await {
7981
Ok(res) => match res {
8082
Some(payload_and_blobs) => {
8183
trace!(?payload_and_blobs);

tests/src/mock_ssv.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ pub async fn create_mock_ssv_server(
3737
force_timeout: Arc::new(RwLock::new(false)),
3838
});
3939
let router = axum::Router::new()
40-
.route("/{chain_name}/validators/in_operator/{node_operator_id}", get(handle_validators))
40+
.route(
41+
"/api/v4/{chain_name}/validators/in_operator/{node_operator_id}",
42+
get(handle_validators),
43+
)
4144
.route("/big_data", get(handle_big_data))
4245
.with_state(state)
4346
.into_make_service();

tests/tests/pbs_mux.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ async fn test_ssv_network_fetch() -> Result<()> {
2929
// Start the mock server
3030
let port = 30100;
3131
let _server_handle = create_mock_ssv_server(port, None).await?;
32-
let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1"))
33-
.unwrap();
32+
let url =
33+
Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1"))
34+
.unwrap();
3435
let response =
3536
fetch_ssv_pubkeys_from_url(url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT)).await?;
3637

@@ -100,8 +101,9 @@ async fn test_ssv_network_fetch_timeout() -> Result<()> {
100101
force_timeout: Arc::new(RwLock::new(true)),
101102
};
102103
let server_handle = create_mock_ssv_server(port, Some(state)).await?;
103-
let url = Url::parse(&format!("http://localhost:{port}/test_chain/validators/in_operator/1"))
104-
.unwrap();
104+
let url =
105+
Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1"))
106+
.unwrap();
105107
let response = fetch_ssv_pubkeys_from_url(url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await;
106108

107109
// The response should fail due to timeout

tests/tests/pbs_mux_refresh.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ async fn test_auto_refresh() -> Result<()> {
4343

4444
// Start the mock SSV API server
4545
let ssv_api_port = pbs_port + 1;
46-
let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}"))?;
46+
// Intentionally missing a trailing slash to ensure this is handled properly
47+
let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}/api/v4"))?;
4748
let mock_ssv_state = SsvMockState {
4849
validators: Arc::new(RwLock::new(vec![SSVValidator {
4950
pubkey: existing_mux_pubkey.clone(),

0 commit comments

Comments
 (0)