Skip to content

Commit 72941ec

Browse files
committed
Refactored read_chunked_body_with_max, added http_timeout to Lido key retrieval
1 parent fdc3cc3 commit 72941ec

14 files changed

Lines changed: 137 additions & 171 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,9 @@ prost = "0.13.4"
5555
rand = { version = "0.9", features = ["os_rng"] }
5656
rayon = "1.10.0"
5757
reqwest = { version = "0.12.4", features = ["json", "stream"] }
58-
scopeguard = "1.2.0"
5958
serde = { version = "1.0.202", features = ["derive"] }
6059
serde_json = "1.0.117"
6160
serde_yaml = "0.9.33"
62-
serial_test = "3.2.0"
6361
sha2 = "0.10.8"
6462
ssz_types = "0.10"
6563
tempfile = "3.20.0"

crates/common/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ reqwest.workspace = true
3232
serde.workspace = true
3333
serde_json.workspace = true
3434
serde_yaml.workspace = true
35-
serial_test.workspace = true
3635
sha2.workspace = true
3736
ssz_types.workspace = true
3837
thiserror.workspace = true
@@ -46,6 +45,3 @@ tree_hash.workspace = true
4645
tree_hash_derive.workspace = true
4746
unicode-normalization.workspace = true
4847
url.workspace = true
49-
50-
[dev-dependencies]
51-
scopeguard.workspace = true

crates/common/src/config/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub const PROXY_DIR_SECRETS_DEFAULT: &str = "/proxy_secrets";
7878
pub const HTTP_TIMEOUT_SECONDS_DEFAULT: u64 = 10;
7979

8080
/// Max content length for Muxer HTTP responses, in bytes
81-
pub const MUXER_HTTP_MAX_LENGTH: u64 = 1024 * 1024 * 1024 * 10; // 10 MiB
81+
pub const MUXER_HTTP_MAX_LENGTH: usize = 1024 * 1024 * 10; // 10 MiB
8282

8383
///////////////////////// MODULES /////////////////////////
8484

crates/common/src/config/mux.rs

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ use std::{
88
use alloy::{
99
primitives::{address, Address, U256},
1010
providers::ProviderBuilder,
11-
rpc::types::beacon::BlsPublicKey,
11+
rpc::{client::RpcClient, types::beacon::BlsPublicKey},
1212
sol,
13+
transports::http::Http,
1314
};
1415
use eyre::{bail, ensure, Context};
16+
use reqwest::Client;
1517
use serde::{Deserialize, Serialize};
1618
use tracing::{debug, info, warn};
1719
use url::Url;
@@ -193,8 +195,8 @@ impl MuxKeysLoader {
193195
}
194196
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
195197
let response = client.get(url).send().await?;
196-
let pubkeys = safe_read_http_response(response).await?;
197-
serde_json::from_str(&pubkeys)
198+
let pubkey_bytes = safe_read_http_response(response).await?;
199+
serde_json::from_slice(&pubkey_bytes)
198200
.wrap_err("failed to fetch mux keys from HTTP endpoint")
199201
}
200202

@@ -204,7 +206,13 @@ impl MuxKeysLoader {
204206
bail!("Lido registry requires RPC URL to be set in the PBS config");
205207
};
206208

207-
fetch_lido_registry_keys(rpc_url, chain, U256::from(*node_operator_id)).await
209+
fetch_lido_registry_keys(
210+
rpc_url,
211+
chain,
212+
U256::from(*node_operator_id),
213+
http_timeout,
214+
)
215+
.await
208216
}
209217
NORegistry::SSV => {
210218
fetch_ssv_pubkeys(chain, U256::from(*node_operator_id), http_timeout).await
@@ -254,10 +262,17 @@ async fn fetch_lido_registry_keys(
254262
rpc_url: Url,
255263
chain: Chain,
256264
node_operator_id: U256,
265+
http_timeout: Duration,
257266
) -> eyre::Result<Vec<BlsPublicKey>> {
258267
debug!(?chain, %node_operator_id, "loading operator keys from Lido registry");
259268

260-
let provider = ProviderBuilder::new().on_http(rpc_url);
269+
// Create an RPC provider with HTTP timeout support
270+
let client = Client::builder().timeout(http_timeout).build()?;
271+
let http = Http::with_client(client, rpc_url);
272+
let is_local = http.guess_local();
273+
let rpc_client = RpcClient::new(http, is_local);
274+
let provider = ProviderBuilder::new().on_client(rpc_client);
275+
261276
let registry_address = lido_registry_address(chain)?;
262277
let registry = LidoRegistry::new(registry_address, provider);
263278

@@ -362,9 +377,8 @@ async fn fetch_ssv_pubkeys_from_url(
362377
})?;
363378

364379
// Parse the response as JSON
365-
let body_string = safe_read_http_response(response).await?;
366-
serde_json::from_slice::<SSVResponse>(body_string.as_bytes())
367-
.wrap_err("failed to parse SSV response")
380+
let body_bytes = safe_read_http_response(response).await?;
381+
serde_json::from_slice::<SSVResponse>(&body_bytes).wrap_err("failed to parse SSV response")
368382
}
369383

370384
#[derive(Deserialize)]
@@ -386,19 +400,17 @@ struct SSVPagination {
386400

387401
#[cfg(test)]
388402
mod tests {
389-
use std::{env, net::SocketAddr};
403+
use std::net::SocketAddr;
390404

391405
use alloy::{hex::FromHex, primitives::U256, providers::ProviderBuilder};
392406
use axum::{response::Response, routing::get};
393-
use scopeguard::defer;
394-
use serial_test::serial;
395407
use tokio::{net::TcpListener, task::JoinHandle};
396408
use url::Url;
397409

398410
use super::*;
399-
use crate::config::{
400-
CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV, HTTP_TIMEOUT_SECONDS_DEFAULT,
401-
MUXER_HTTP_MAX_LENGTH,
411+
use crate::{
412+
config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH},
413+
utils::{set_ignore_content_length, ResponseReadError},
402414
};
403415

404416
const TEST_HTTP_TIMEOUT: u64 = 2;
@@ -471,25 +483,28 @@ mod tests {
471483
}
472484

473485
#[tokio::test]
474-
#[serial]
475486
/// Tests that the SSV network fetch is handled properly when the response's
476487
/// body is too large
477488
async fn test_ssv_network_fetch_big_data() -> eyre::Result<()> {
478489
// Start the mock server
479490
let port = 30101;
480-
env::remove_var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV);
481491
let _server_handle = create_mock_server(port).await?;
482492
let url = format!("http://localhost:{port}/big_data");
483493
let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await;
484494

485495
// The response should fail due to content length being too big
486-
assert!(response.is_err(), "Expected error due to big content length, but got success");
487-
if let Err(e) = response {
488-
assert!(
489-
e.to_string().contains("content length") &&
490-
e.to_string().contains("exceeds the maximum allowed length"),
491-
"Expected content length error, got: {e}",
492-
);
496+
match response {
497+
Ok(_) => {
498+
panic!("Expected an error due to big content length, but got a successful response")
499+
}
500+
Err(e) => match e.downcast_ref::<ResponseReadError>() {
501+
Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => {
502+
assert_eq!(*max, MUXER_HTTP_MAX_LENGTH);
503+
assert!(*content_length > MUXER_HTTP_MAX_LENGTH);
504+
assert!(raw.is_empty());
505+
}
506+
_ => panic!("Expected PayloadTooLarge error, got: {}", e),
507+
},
493508
}
494509

495510
// Clean up the server handle
@@ -522,25 +537,29 @@ mod tests {
522537
}
523538

524539
#[tokio::test]
525-
#[serial]
526540
/// Tests that the SSV network fetch is handled properly when the response's
527541
/// content-length header is missing
528542
async fn test_ssv_network_fetch_big_data_without_content_length() -> eyre::Result<()> {
529543
// Start the mock server
530544
let port = 30103;
531-
env::set_var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV, "1");
532-
defer! { env::remove_var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV); }
545+
set_ignore_content_length(true);
533546
let _server_handle = create_mock_server(port).await?;
534547
let url = format!("http://localhost:{port}/big_data");
535548
let response = fetch_ssv_pubkeys_from_url(&url, Duration::from_secs(120)).await;
536549

537-
// The response should fail due to timeout
538-
assert!(response.is_err(), "Expected error due to body size, but got success");
539-
if let Err(e) = response {
540-
assert!(
541-
e.to_string().contains("Response body exceeds the maximum allowed length "),
542-
"Expected content length error, got: {e}",
543-
);
550+
// The response should fail due to the body being too big
551+
match response {
552+
Ok(_) => {
553+
panic!("Expected an error due to excessive data, but got a successful response")
554+
}
555+
Err(e) => match e.downcast_ref::<ResponseReadError>() {
556+
Some(ResponseReadError::PayloadTooLarge { max, content_length, raw }) => {
557+
assert_eq!(*max, MUXER_HTTP_MAX_LENGTH);
558+
assert_eq!(*content_length, 0);
559+
assert!(!raw.is_empty());
560+
}
561+
_ => panic!("Expected PayloadTooLarge error, got: {}", e),
562+
},
544563
}
545564

546565
// Clean up the server handle
@@ -585,10 +604,12 @@ mod tests {
585604
.unwrap()
586605
}
587606

588-
/// Sends a response with a large body but no content length
607+
/// Sends a response with a large body - larger than the maximum allowed.
608+
/// Note that hyper overwrites the content-length header automatically, so
609+
/// setting it here wouldn't actually change the value that ultimately
610+
/// gets sent to the server.
589611
async fn handle_big_data() -> Response {
590-
// Create a response with a large body but no content length
591-
let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH as usize);
612+
let body = "f".repeat(2 * MUXER_HTTP_MAX_LENGTH);
592613
Response::builder()
593614
.status(200)
594615
.header("Content-Type", "application/text")

crates/common/src/config/utils.rs

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
use std::{collections::HashMap, env, path::Path};
1+
use std::{collections::HashMap, path::Path};
22

33
use alloy::rpc::types::beacon::BlsPublicKey;
44
use eyre::{bail, Context, Result};
55
use serde::de::DeserializeOwned;
66

77
use super::JWTS_ENV;
8-
use crate::{
9-
config::MUXER_HTTP_MAX_LENGTH,
10-
types::ModuleId,
11-
utils::{read_chunked_body_with_max, ResponseReadError},
12-
};
13-
14-
pub const CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV: &str = "CB_TEST_HTTP_DISABLE_CONTENT_LENGTH";
8+
use crate::{config::MUXER_HTTP_MAX_LENGTH, types::ModuleId, utils::read_chunked_body_with_max};
159

1610
pub fn load_env_var(env: &str) -> Result<String> {
1711
std::env::var(env).wrap_err(format!("{env} is not set"))
@@ -39,40 +33,33 @@ pub fn load_jwt_secrets() -> Result<HashMap<ModuleId, String>> {
3933

4034
/// Reads an HTTP response safely, erroring out if it failed or if the body is
4135
/// too large.
42-
pub async fn safe_read_http_response(response: reqwest::Response) -> Result<String> {
43-
// Get the content length from the response headers
44-
let mut content_length = response.content_length();
45-
if env::var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV).is_ok() {
46-
content_length = None;
47-
}
48-
49-
// Break if content length is provided but it's too big
50-
if let Some(length) = content_length {
51-
if length > MUXER_HTTP_MAX_LENGTH {
52-
bail!("Response content length ({length}) exceeds the maximum allowed length ({MUXER_HTTP_MAX_LENGTH} bytes)");
53-
}
54-
}
36+
pub async fn safe_read_http_response(response: reqwest::Response) -> Result<Vec<u8>> {
37+
// Read the response to a buffer in chunks
38+
let status_code = response.status();
39+
let response_bytes = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH)
40+
.await
41+
.map_err(|e| eyre::Report::new(e));
5542

5643
// Make sure the response is a 200
57-
if response.status() != reqwest::StatusCode::OK {
58-
bail!("Request failed with status: {}", response.status());
59-
}
60-
61-
// Read the response to a buffer in chunks
62-
let result = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH as usize).await;
63-
let bytes = match result {
64-
Ok(bytes) => Ok(bytes),
65-
Err(ResponseReadError::PayloadTooLarge { max: _, raw: _ }) => bail!(
66-
"Response body exceeds the maximum allowed length ({MUXER_HTTP_MAX_LENGTH} bytes)"
67-
),
68-
Err(ResponseReadError::ChunkError { inner }) => Err(inner),
69-
}?;
70-
71-
// Convert the buffer to a string
72-
match std::str::from_utf8(&bytes) {
73-
Ok(s) => Ok(s.to_string()),
74-
Err(e) => bail!("Failed to decode response body as UTF-8: {e}"),
44+
if status_code != reqwest::StatusCode::OK {
45+
match response_bytes {
46+
Ok(bytes) => {
47+
bail!(
48+
"Request failed with status: {}, body: {}",
49+
status_code,
50+
String::from_utf8_lossy(&bytes)
51+
);
52+
}
53+
Err(e) => {
54+
bail!(
55+
"Request failed with status: {} but decoding the response body failed: {}",
56+
status_code,
57+
e
58+
);
59+
}
60+
}
7561
}
62+
response_bytes
7663
}
7764

7865
/// Removes duplicate entries from a vector of BlsPublicKey

0 commit comments

Comments
 (0)