Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::PathBuf;

use eyre::Result;
use eyre::{Result, bail};
use serde::{Deserialize, Serialize};

use crate::types::{Chain, ChainLoader, ForkVersion, load_chain_from_file};
Expand Down Expand Up @@ -45,6 +45,13 @@ impl CommitBoostConfig {
if let Some(signer) = &self.signer {
signer.validate().await?;
}

if self.relays.iter().any(|r| r.validator_registration_batch_size.is_some()) {
bail!(
"validator_registration_batch_size is now deprecated on a per-relay basis. Please use validator_registration_batch_size in the [pbs] section instead"
Comment thread
ltitanb marked this conversation as resolved.
Outdated
)
}

Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
};

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct RelayConfig {
/// Relay ID, if missing will default to the URL hostname from the entry
pub id: Option<String>,
Expand Down Expand Up @@ -128,6 +129,10 @@ pub struct PbsConfig {
/// Maximum number of retries for validator registration request per relay
#[serde(default = "default_u32::<REGISTER_VALIDATOR_RETRY_LIMIT>")]
pub register_validator_retry_limit: u32,
/// Maximum number of validators to send to relays in a single registration
/// request
#[serde(deserialize_with = "empty_string_as_none", default)]
pub validator_registration_batch_size: Option<usize>,
}

impl PbsConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/pbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy_static.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tower-http.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions crates/pbs/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use async_trait::async_trait;
use axum::{Router, http::HeaderMap};
use cb_common::pbs::{
Expand Down Expand Up @@ -45,7 +44,7 @@ pub trait BuilderApi<S: BuilderApiState>: 'static {

/// https://ethereum.github.io/builder-specs/#/Builder/registerValidator
async fn register_validator(
registrations: Vec<ValidatorRegistration>,
registrations: Vec<serde_json::Value>,
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<()> {
Expand Down
57 changes: 31 additions & 26 deletions crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::time::{Duration, Instant};

use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use alloy::primitives::Bytes;
use axum::http::{HeaderMap, HeaderValue};
use cb_common::{
pbs::{HEADER_START_TIME_UNIX_MS, RelayClient, error::PbsError},
utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms},
};
use eyre::bail;
use futures::future::{join_all, select_ok};
use reqwest::header::USER_AGENT;
use reqwest::header::{CONTENT_TYPE, USER_AGENT};
use tracing::{Instrument, debug, error};
use url::Url;

Expand All @@ -21,7 +21,7 @@ use crate::{
/// Implements https://ethereum.github.io/builder-specs/#/Builder/registerValidator
/// Returns 200 if at least one relay returns 200, else 503
pub async fn register_validator<S: BuilderApiState>(
registrations: Vec<ValidatorRegistration>,
registrations: Vec<serde_json::Value>,
req_headers: HeaderMap,
state: PbsState<S>,
) -> eyre::Result<()> {
Expand All @@ -31,27 +31,29 @@ pub async fn register_validator<S: BuilderApiState>(
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let relays = state.all_relays().to_vec();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays.clone() {
if let Some(batch_size) = relay.config.validator_registration_batch_size {
for batch in registrations.chunks(batch_size) {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
batch.to_vec(),
relay.clone(),
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
)
.in_current_span(),
));
}
// prepare the body in advance, ugly dyn
let bodies: Box<dyn Iterator<Item = (usize, Bytes)>> =
if let Some(batch_size) = state.config.pbs_config.validator_registration_batch_size {
Box::new(registrations.chunks(batch_size).map(|batch| {
// SAFETY: unwrap is ok because we're serializing a &[serde_json::Value]
let body = serde_json::to_vec(batch).unwrap();
(batch.len(), Bytes::from(body))
}))
} else {
let body = serde_json::to_vec(&registrations).unwrap();
Box::new(std::iter::once((registrations.len(), Bytes::from(body))))
};
send_headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

let mut handles = Vec::with_capacity(state.all_relays().len());

for (n_regs, body) in bodies {
for relay in state.all_relays().iter().cloned() {
handles.push(tokio::spawn(
send_register_validator_with_timeout(
registrations.clone(),
relay.clone(),
n_regs,
body.clone(),
relay,
send_headers.clone(),
state.pbs_config().timeout_register_validator_ms,
state.pbs_config().register_validator_retry_limit,
Expand Down Expand Up @@ -82,7 +84,8 @@ pub async fn register_validator<S: BuilderApiState>(
/// Register validator to relay, retry connection errors until the
/// given timeout has passed
async fn send_register_validator_with_timeout(
registrations: Vec<ValidatorRegistration>,
n_regs: usize,
body: Bytes,
relay: RelayClient,
headers: HeaderMap,
timeout_ms: u64,
Expand All @@ -97,7 +100,8 @@ async fn send_register_validator_with_timeout(
let start_request = Instant::now();
match send_register_validator(
url.clone(),
&registrations,
n_regs,
body.clone(),
&relay,
headers.clone(),
remaining_timeout_ms,
Expand Down Expand Up @@ -134,7 +138,8 @@ async fn send_register_validator_with_timeout(

async fn send_register_validator(
url: Url,
registrations: &[ValidatorRegistration],
n_regs: usize,
body: Bytes,
relay: &RelayClient,
headers: HeaderMap,
timeout_ms: u64,
Expand All @@ -146,7 +151,7 @@ async fn send_register_validator(
.post(url)
.timeout(Duration::from_millis(timeout_ms))
.headers(headers)
.json(&registrations)
.body(body.0)
.send()
.await
{
Expand Down Expand Up @@ -189,7 +194,7 @@ async fn send_register_validator(
retry,
?code,
latency = ?request_latency,
num_registrations = registrations.len(),
num_registrations = n_regs,
"registration successful"
);

Expand Down
3 changes: 1 addition & 2 deletions crates/pbs/src/routes/register_validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use alloy::rpc::types::beacon::relay::ValidatorRegistration;
use axum::{Json, extract::State, http::HeaderMap, response::IntoResponse};
use cb_common::utils::get_user_agent;
use reqwest::StatusCode;
Expand All @@ -15,7 +14,7 @@ use crate::{
pub async fn handle_register_validator<S: BuilderApiState, A: BuilderApi<S>>(
State(state): State<PbsStateGuard<S>>,
req_headers: HeaderMap,
Json(registrations): Json<Vec<ValidatorRegistration>>,
Json(registrations): Json<Vec<serde_json::Value>>,
) -> Result<impl IntoResponse, PbsClientError> {
let state = state.read().clone();

Expand Down
1 change: 1 addition & 0 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig {
rpc_url: None,
http_timeout_seconds: 10,
register_validator_retry_limit: u32::MAX,
validator_registration_batch_size: None,
}
}

Expand Down
140 changes: 0 additions & 140 deletions tests/tests/pbs_post_validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,146 +61,6 @@ async fn test_register_validators() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_register_validators_returns_422_if_request_is_malformed() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey: BlsPublicKey = signer.public_key();

let chain = Chain::Holesky;
let pbs_port = 4100;

// Run a mock relay
let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?];
let mock_state = Arc::new(MockRelayState::new(chain, signer));
tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1));

// Run the PBS service
let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays);
let state = PbsState::new(config);
tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state));

// leave some time to start servers
tokio::time::sleep(Duration::from_millis(100)).await;

let mock_validator = MockValidator::new(pbs_port)?;
let url = mock_validator.comm_boost.register_validator_url().unwrap();
info!("Sending register validator");

// Bad fee recipient
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// Bad pubkey
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbb"
},
"signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// Bad signature
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "100000",
"timestamp": "1000000",
"pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// gas limit too high
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "10000000000000000000000000000000000000000000000000000000",
"timestamp": "1000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

// timestamp too high
let bad_json = r#"[{
"message": {
"fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gas_limit": "1000000",
"timestamp": "10000000000000000000000000000000000000000000000000000000",
"pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"
},
"signature": "0xcccc"
}]"#;

let res = mock_validator
.comm_boost
.client
.post(url.clone())
.header("Content-Type", "application/json")
.body(bad_json)
.send()
.await?;

assert_eq!(res.status(), StatusCode::UNPROCESSABLE_ENTITY);

assert_eq!(mock_state.received_register_validator(), 0);
Ok(())
}

#[tokio::test]
async fn test_register_validators_does_not_retry_on_429() -> Result<()> {
setup_test_env();
Expand Down
Loading