Skip to content

Commit adc4389

Browse files
committed
Ported HTTP timeout to the PBS event publisher
1 parent dcf1b0f commit adc4389

4 files changed

Lines changed: 58 additions & 31 deletions

File tree

crates/common/src/config/constants.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ pub const PROXY_DIR_SECRETS_DEFAULT: &str = "/proxy_secrets";
7474

7575
////////////////////////// MUXER //////////////////////////
7676

77-
/// Timeout for Muxer HTTP requests, in seconds
78-
pub const MUXER_HTTP_TIMEOUT_DEFAULT: u64 = 10;
77+
/// Timeout for HTTP requests, in seconds
78+
pub const HTTP_TIMEOUT_SECONDS_ENV: &str = "CB_HTTP_TIMEOUT_SECONDS";
79+
pub const HTTP_TIMEOUT_SECONDS_DEFAULT: u64 = 10;
7980

8081
/// Max content length for Muxer HTTP responses, in bytes
8182
pub const MUXER_HTTP_MAX_LENGTH: u64 = 1024 * 1024 * 1024 * 10; // 10 MiB

crates/common/src/config/mux.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use url::Url;
1818

1919
use super::{load_optional_env_var, PbsConfig, RelayConfig, MUX_PATH_ENV};
2020
use crate::{
21-
config::{safe_read_http_response, MUXER_HTTP_TIMEOUT_DEFAULT},
21+
config::{safe_read_http_response, HTTP_TIMEOUT_SECONDS_DEFAULT, HTTP_TIMEOUT_SECONDS_ENV},
2222
pbs::RelayClient,
2323
types::Chain,
2424
};
@@ -43,13 +43,19 @@ impl PbsMuxes {
4343
chain: Chain,
4444
default_pbs: &PbsConfig,
4545
) -> eyre::Result<HashMap<BlsPublicKey, RuntimeMuxConfig>> {
46+
let http_timeout = match load_optional_env_var(HTTP_TIMEOUT_SECONDS_ENV) {
47+
Some(timeout_str) => Duration::from_secs(timeout_str.parse::<u64>()?),
48+
None => Duration::from_secs(default_pbs.http_timeout_seconds),
49+
};
50+
4651
let mut muxes = self.muxes;
4752

4853
for mux in muxes.iter_mut() {
4954
ensure!(!mux.relays.is_empty(), "mux config {} must have at least one relay", mux.id);
5055

5156
if let Some(loader) = &mux.loader {
52-
let extra_keys = loader.load(&mux.id, chain, default_pbs.rpc_url.clone()).await?;
57+
let extra_keys =
58+
loader.load(&mux.id, chain, default_pbs.rpc_url.clone(), http_timeout).await?;
5359
mux.validator_pubkeys.extend(extra_keys);
5460
}
5561

@@ -147,12 +153,10 @@ pub enum MuxKeysLoader {
147153
File(PathBuf),
148154
HTTP {
149155
url: String,
150-
timeout: Option<u64>,
151156
},
152157
Registry {
153158
registry: NORegistry,
154159
node_operator_id: u64,
155-
timeout: Option<u64>,
156160
},
157161
}
158162

@@ -170,6 +174,7 @@ impl MuxKeysLoader {
170174
mux_id: &str,
171175
chain: Chain,
172176
rpc_url: Option<Url>,
177+
http_timeout: Duration,
173178
) -> eyre::Result<Vec<BlsPublicKey>> {
174179
match self {
175180
Self::File(config_path) => {
@@ -181,21 +186,19 @@ impl MuxKeysLoader {
181186
serde_json::from_str(&file).wrap_err("failed to parse mux keys file")
182187
}
183188

184-
Self::HTTP { url, timeout } => {
189+
Self::HTTP { url } => {
185190
let url = Url::parse(url).wrap_err("failed to parse mux keys URL")?;
186191
if url.scheme() != "https" {
187192
bail!("mux keys URL must use HTTPS");
188193
}
189-
let client = reqwest::ClientBuilder::new()
190-
.timeout(Duration::from_secs(timeout.unwrap_or(MUXER_HTTP_TIMEOUT_DEFAULT)))
191-
.build()?;
194+
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
192195
let response = client.get(url).send().await?;
193196
let pubkeys = safe_read_http_response(response).await?;
194197
serde_json::from_str(&pubkeys)
195198
.wrap_err("failed to fetch mux keys from HTTP endpoint")
196199
}
197200

198-
Self::Registry { registry, node_operator_id, timeout } => match registry {
201+
Self::Registry { registry, node_operator_id } => match registry {
199202
NORegistry::Lido => {
200203
let Some(rpc_url) = rpc_url else {
201204
bail!("Lido registry requires RPC URL to be set in the PBS config");
@@ -204,7 +207,7 @@ impl MuxKeysLoader {
204207
fetch_lido_registry_keys(rpc_url, chain, U256::from(*node_operator_id)).await
205208
}
206209
NORegistry::SSV => {
207-
fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), timeout).await
210+
fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await
208211
}
209212
},
210213
}
@@ -305,7 +308,7 @@ async fn fetch_lido_registry_keys(
305308
async fn fetch_ssv_pubkeys(
306309
chain: Chain,
307310
node_operator_id: U256,
308-
timeout: &Option<u64>,
311+
http_timeout: Duration,
309312
) -> eyre::Result<Vec<BlsPublicKey>> {
310313
const MAX_PER_PAGE: usize = 100;
311314

@@ -325,7 +328,7 @@ async fn fetch_ssv_pubkeys(
325328
chain_name, node_operator_id, MAX_PER_PAGE, page
326329
);
327330

328-
let response = fetch_ssv_pubkeys_from_url(&url, timeout).await?;
331+
let response = fetch_ssv_pubkeys_from_url(&url, http_timeout).await?;
329332
pubkeys.extend(response.validators.iter().map(|v| v.pubkey).collect::<Vec<BlsPublicKey>>());
330333
page += 1;
331334

@@ -346,10 +349,11 @@ async fn fetch_ssv_pubkeys(
346349
Ok(pubkeys)
347350
}
348351

349-
async fn fetch_ssv_pubkeys_from_url(url: &str, timeout: &Option<u64>) -> eyre::Result<SSVResponse> {
350-
let client = reqwest::ClientBuilder::new()
351-
.timeout(Duration::from_secs(timeout.unwrap_or(MUXER_HTTP_TIMEOUT_DEFAULT)))
352-
.build()?;
352+
async fn fetch_ssv_pubkeys_from_url(
353+
url: &str,
354+
http_timeout: Duration,
355+
) -> eyre::Result<SSVResponse> {
356+
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
353357
let response = client.get(url).send().await.map_err(|e| {
354358
if e.is_timeout() {
355359
eyre::eyre!("Request to SSV network API timed out: {e}")
@@ -436,7 +440,9 @@ mod tests {
436440
let port = 30100;
437441
let _server_handle = create_mock_server(port).await?;
438442
let url = format!("http://localhost:{port}/ssv");
439-
let response = fetch_ssv_pubkeys_from_url(&url, &None).await?;
443+
let response =
444+
fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT))
445+
.await?;
440446

441447
// Make sure the response is correct
442448
// NOTE: requires that ssv_data.json dpesn't change
@@ -472,7 +478,7 @@ mod tests {
472478
env::remove_var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV);
473479
let _server_handle = create_mock_server(port).await?;
474480
let url = format!("http://localhost:{port}/big_data");
475-
let response = fetch_ssv_pubkeys_from_url(&url, &Some(120)).await;
481+
let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await;
476482

477483
// The response should fail due to content length being too big
478484
assert!(response.is_err(), "Expected error due to big content length, but got success");
@@ -498,7 +504,8 @@ mod tests {
498504
let port = 30102;
499505
let _server_handle = create_mock_server(port).await?;
500506
let url = format!("http://localhost:{port}/timeout");
501-
let response = fetch_ssv_pubkeys_from_url(&url, &Some(TEST_HTTP_TIMEOUT)).await;
507+
let response =
508+
fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(TEST_HTTP_TIMEOUT)).await;
502509

503510
// The response should fail due to timeout
504511
assert!(response.is_err(), "Expected timeout error, but got success");
@@ -523,7 +530,7 @@ mod tests {
523530
defer! { env::remove_var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV); }
524531
let _server_handle = create_mock_server(port).await?;
525532
let url = format!("http://localhost:{port}/big_data");
526-
let response = fetch_ssv_pubkeys_from_url(&url, &Some(120)).await;
533+
let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await;
527534

528535
// The response should fail due to timeout
529536
assert!(response.is_err(), "Expected error due to body size, but got success");

crates/common/src/config/pbs.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use url::Url;
1717

1818
use super::{
1919
constants::PBS_IMAGE_DEFAULT, load_optional_env_var, CommitBoostConfig, RuntimeMuxConfig,
20-
PBS_ENDPOINT_ENV,
20+
HTTP_TIMEOUT_SECONDS_DEFAULT, PBS_ENDPOINT_ENV,
2121
};
2222
use crate::{
2323
commit::client::SignerClient,
@@ -122,6 +122,9 @@ pub struct PbsConfig {
122122
pub extra_validation_enabled: bool,
123123
/// Execution Layer RPC url to use for extra validation
124124
pub rpc_url: Option<Url>,
125+
/// Timeout for HTTP requests in seconds
126+
#[serde(default = "default_u64::<HTTP_TIMEOUT_SECONDS_DEFAULT>")]
127+
pub http_timeout_seconds: u64,
125128
}
126129

127130
impl PbsConfig {

crates/common/src/pbs/event.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::net::SocketAddr;
1+
use std::{net::SocketAddr, time::Duration};
22

33
use alloy::{primitives::B256, rpc::types::beacon::relay::ValidatorRegistration};
44
use async_trait::async_trait;
@@ -8,7 +8,7 @@ use axum::{
88
routing::post,
99
Json,
1010
};
11-
use eyre::bail;
11+
use eyre::{bail, Result};
1212
use reqwest::StatusCode;
1313
use serde::{Deserialize, Serialize};
1414
use tokio::net::TcpListener;
@@ -19,7 +19,10 @@ use super::{
1919
GetHeaderParams, GetHeaderResponse, SignedBlindedBeaconBlock, SubmitBlindedBlockResponse,
2020
};
2121
use crate::{
22-
config::{load_optional_env_var, BUILDER_URLS_ENV},
22+
config::{
23+
load_optional_env_var, BUILDER_URLS_ENV, HTTP_TIMEOUT_SECONDS_DEFAULT,
24+
HTTP_TIMEOUT_SECONDS_ENV,
25+
},
2326
pbs::BUILDER_EVENTS_PATH,
2427
};
2528

@@ -48,11 +51,24 @@ pub struct BuilderEventPublisher {
4851
}
4952

5053
impl BuilderEventPublisher {
51-
pub fn new(endpoints: Vec<Url>) -> Self {
52-
Self { client: reqwest::Client::new(), endpoints }
54+
pub fn new(endpoints: Vec<Url>, http_timeout: Duration) -> Result<Self> {
55+
for endpoint in &endpoints {
56+
if endpoint.scheme() != "https" {
57+
bail!("BuilderEventPublisher endpoints must use HTTPS (endpoint {endpoint} is invalid)");
58+
}
59+
}
60+
Ok(Self {
61+
client: reqwest::ClientBuilder::new().timeout(http_timeout).build().unwrap(),
62+
endpoints,
63+
})
5364
}
5465

55-
pub fn new_from_env() -> eyre::Result<Option<Self>> {
66+
pub fn new_from_env() -> Result<Option<Self>> {
67+
let http_timeout = match load_optional_env_var(HTTP_TIMEOUT_SECONDS_ENV) {
68+
Some(timeout_str) => Duration::from_secs(timeout_str.parse::<u64>()?),
69+
None => Duration::from_secs(HTTP_TIMEOUT_SECONDS_DEFAULT),
70+
};
71+
5672
load_optional_env_var(BUILDER_URLS_ENV)
5773
.map(|joined| {
5874
let endpoints = joined
@@ -62,9 +78,9 @@ impl BuilderEventPublisher {
6278
let url = base.trim().parse::<Url>()?.join(BUILDER_EVENTS_PATH)?;
6379
Ok(url)
6480
})
65-
.collect::<eyre::Result<Vec<_>>>()?;
81+
.collect::<Result<Vec<_>>>()?;
6682

67-
Ok(Self::new(endpoints))
83+
Self::new(endpoints, http_timeout)
6884
})
6985
.transpose()
7086
}

0 commit comments

Comments
 (0)