Skip to content

Commit 4664fdd

Browse files
committed
wrap calls to read_chunked_body_with_max with safe_read_http_response
1 parent 90ca2ea commit 4664fdd

9 files changed

Lines changed: 98 additions & 93 deletions

File tree

crates/common/src/config/mux.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
pbs::RelayClient,
2626
types::{BlsPublicKey, Chain},
2727
utils::default_bool,
28-
wire::read_chunked_body_with_max,
28+
wire::safe_read_http_response,
2929
};
3030

3131
#[derive(Debug, Clone, Deserialize, Serialize)]
@@ -236,18 +236,11 @@ impl MuxKeysLoader {
236236
"Mux keys URL {url} is insecure; consider using HTTPS if possible instead"
237237
);
238238
}
239+
let url = url.as_str();
239240
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
240241
let response = client.get(url).send().await?;
241-
let status = response.status();
242-
let pubkey_bytes = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH)
243-
.await
244-
.wrap_err("Failed to read response body")?;
245-
if !status.is_success() {
246-
bail!(
247-
"Request failed with status: {status}, body: {}",
248-
String::from_utf8_lossy(&pubkey_bytes)
249-
);
250-
}
242+
let pubkey_bytes =
243+
safe_read_http_response(response, MUXER_HTTP_MAX_LENGTH, url).await?;
251244
serde_json::from_slice(&pubkey_bytes)
252245
.wrap_err("failed to fetch mux keys from HTTP endpoint")
253246
}
Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
use std::time::Duration;
22

33
use alloy::primitives::U256;
4-
use eyre::{Context, bail};
4+
use eyre::Context;
55
use serde_json::json;
66
use url::Url;
77

88
use crate::{
99
config::MUXER_HTTP_MAX_LENGTH,
1010
interop::ssv::types::{SSVNodeResponse, SSVPublicResponse},
11-
wire::read_chunked_body_with_max,
11+
wire::safe_read_http_response,
1212
};
1313

1414
pub async fn request_ssv_pubkeys_from_ssv_node(
1515
url: Url,
1616
node_operator_id: U256,
1717
http_timeout: Duration,
1818
) -> eyre::Result<SSVNodeResponse> {
19+
let url = url.as_str();
1920
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
2021
let body = json!({
2122
"operators": [node_operator_id]
@@ -29,23 +30,15 @@ pub async fn request_ssv_pubkeys_from_ssv_node(
2930
})?;
3031

3132
// Parse the response as JSON
32-
let status = response.status();
33-
let body_bytes = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH)
34-
.await
35-
.wrap_err("Failed to read response body")?;
36-
if !status.is_success() {
37-
bail!(
38-
"Request failed with status: {status}, body: {}",
39-
String::from_utf8_lossy(&body_bytes)
40-
);
41-
}
33+
let body_bytes = safe_read_http_response(response, MUXER_HTTP_MAX_LENGTH, url).await?;
4234
serde_json::from_slice::<SSVNodeResponse>(&body_bytes).wrap_err("failed to parse SSV response")
4335
}
4436

4537
pub async fn request_ssv_pubkeys_from_public_api(
4638
url: Url,
4739
http_timeout: Duration,
4840
) -> eyre::Result<SSVPublicResponse> {
41+
let url = url.as_str();
4942
let client = reqwest::ClientBuilder::new().timeout(http_timeout).build()?;
5043
let response = client.get(url).send().await.map_err(|e| {
5144
if e.is_timeout() {
@@ -56,16 +49,7 @@ pub async fn request_ssv_pubkeys_from_public_api(
5649
})?;
5750

5851
// Parse the response as JSON
59-
let status = response.status();
60-
let body_bytes = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH)
61-
.await
62-
.wrap_err("Failed to read response body")?;
63-
if !status.is_success() {
64-
bail!(
65-
"Request failed with status: {status}, body: {}",
66-
String::from_utf8_lossy(&body_bytes)
67-
);
68-
}
52+
let body_bytes = safe_read_http_response(response, MUXER_HTTP_MAX_LENGTH, url).await?;
6953
serde_json::from_slice::<SSVPublicResponse>(&body_bytes)
7054
.wrap_err("failed to parse SSV response")
7155
}

crates/common/src/pbs/error.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,27 @@ impl PbsError {
3939
matches!(self, PbsError::Reqwest(err) if err.is_timeout())
4040
}
4141

42+
/// Extract the HTTP status code from relay-originated errors.
43+
fn relay_status_code(&self) -> Option<u16> {
44+
match self {
45+
PbsError::RelayResponse { code, .. } => Some(*code),
46+
PbsError::ReadResponse(ResponseReadError::NonSuccess { status_code, .. }) => {
47+
Some(*status_code)
48+
}
49+
_ => None,
50+
}
51+
}
52+
4253
/// Whether the error is retryable in requests to relays
4354
pub fn should_retry(&self) -> bool {
4455
match self {
45-
PbsError::Reqwest(err) => {
46-
// Retry on timeout or connection error
47-
err.is_timeout() || err.is_connect()
48-
}
49-
PbsError::RelayResponse { code, .. } => match *code {
50-
500..509 => true, // Retry on server errors
51-
400 | 429 => false, // Do not retry if rate limited or bad request
52-
_ => false,
53-
},
54-
_ => false,
56+
PbsError::Reqwest(err) => err.is_timeout() || err.is_connect(),
57+
_ => matches!(self.relay_status_code(), Some(500..=508)),
5558
}
5659
}
5760

5861
pub fn is_not_found(&self) -> bool {
59-
matches!(&self, PbsError::RelayResponse { code: 404, .. })
62+
self.relay_status_code() == Some(404)
6063
}
6164
}
6265

crates/common/src/wire.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@ pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version";
2323

2424
#[derive(Debug, Error)]
2525
pub enum ResponseReadError {
26-
#[error("response size exceeds max size; max: {max}, content_length: {content_length}")]
27-
PayloadTooLarge { max: usize, content_length: usize },
26+
#[error(
27+
"response size exceeds max size; max: {max}, content_length: {content_length}, request_url: {request_url}, request_id: {request_id}"
28+
)]
29+
PayloadTooLarge { max: usize, content_length: usize, request_url: String, request_id: String },
2830

2931
#[error("error reading response stream: {0}")]
3032
ReqwestError(#[from] reqwest::Error),
33+
34+
#[error(
35+
"request failed with status: {status_code}, request_url: {request_url}, request_id: {request_id}, body: {error_msg}"
36+
)]
37+
NonSuccess { status_code: u16, error_msg: String, request_url: String, request_id: String },
3138
}
3239

3340
#[cfg(feature = "testing-flags")]
@@ -51,6 +58,8 @@ fn should_ignore_content_length() -> bool {
5158
pub async fn read_chunked_body_with_max(
5259
res: Response,
5360
max_size: usize,
61+
request_id: &str,
62+
request_url: &str,
5463
) -> Result<Vec<u8>, ResponseReadError> {
5564
// Get the content length from the response headers
5665
#[cfg(not(feature = "testing-flags"))]
@@ -72,6 +81,8 @@ pub async fn read_chunked_body_with_max(
7281
return Err(ResponseReadError::PayloadTooLarge {
7382
max: max_size,
7483
content_length: length as usize,
84+
request_url: request_url.to_string(),
85+
request_id: request_id.to_string(),
7586
});
7687
}
7788

@@ -86,6 +97,8 @@ pub async fn read_chunked_body_with_max(
8697
return Err(ResponseReadError::PayloadTooLarge {
8798
max: max_size,
8899
content_length: content_length.unwrap_or(0) as usize,
100+
request_url: request_url.to_string(),
101+
request_id: request_id.to_string(),
89102
});
90103
}
91104

@@ -95,6 +108,28 @@ pub async fn read_chunked_body_with_max(
95108
Ok(response_bytes)
96109
}
97110

111+
/// Reads an HTTP response body with a size limit, erroring on non-success
112+
/// status or read failure.
113+
pub async fn safe_read_http_response(
114+
response: reqwest::Response,
115+
max_size: usize,
116+
request_id: &str,
117+
) -> Result<Vec<u8>, ResponseReadError> {
118+
let status_code = response.status();
119+
let request_url = response.url().to_string();
120+
let body = read_chunked_body_with_max(response, max_size, request_id, &request_url).await?;
121+
if status_code.is_success() {
122+
Ok(body)
123+
} else {
124+
Err(ResponseReadError::NonSuccess {
125+
status_code: status_code.as_u16(),
126+
error_msg: String::from_utf8_lossy(&body).into_owned(),
127+
request_url: request_url.to_string(),
128+
request_id: request_id.to_string(),
129+
})
130+
}
131+
}
132+
98133
/// Returns the user agent from the request headers or an empty string if not
99134
/// present
100135
pub fn get_user_agent(req_headers: &HeaderMap) -> String {

crates/pbs/src/mev_boost/get_header.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use cb_common::{
1919
signature::verify_signed_message,
2020
types::{BlsPublicKey, BlsPublicKeyBytes, BlsSignature, Chain},
2121
utils::{ms_into_slot, timestamp_of_slot_start_sec, utcnow_ms},
22-
wire::{get_user_agent_with_version, read_chunked_body_with_max},
22+
wire::{get_user_agent_with_version, safe_read_http_response},
2323
};
2424
use futures::future::join_all;
2525
use parking_lot::RwLock;
@@ -351,14 +351,9 @@ async fn send_one_get_header(
351351
let code = res.status();
352352
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc();
353353

354-
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?;
354+
let response_bytes =
355+
safe_read_http_response(res, MAX_SIZE_GET_HEADER_RESPONSE, relay.id.as_ref()).await?;
355356
let header_size_bytes = response_bytes.len();
356-
if !code.is_success() {
357-
return Err(PbsError::RelayResponse {
358-
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
359-
code: code.as_u16(),
360-
});
361-
};
362357
if code == StatusCode::NO_CONTENT {
363358
debug!(
364359
relay_id = relay.id.as_ref(),

crates/pbs/src/mev_boost/register_validator.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use axum::http::{HeaderMap, HeaderValue};
55
use cb_common::{
66
pbs::{HEADER_START_TIME_UNIX_MS, RelayClient, error::PbsError},
77
utils::utcnow_ms,
8-
wire::{get_user_agent_with_version, read_chunked_body_with_max},
8+
wire::{get_user_agent_with_version, safe_read_http_response},
99
};
1010
use eyre::bail;
1111
use futures::{
@@ -187,17 +187,9 @@ async fn send_register_validator(
187187
.with_label_values(&[code.as_str(), REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id])
188188
.inc();
189189

190-
if !code.is_success() {
191-
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?;
192-
let err = PbsError::RelayResponse {
193-
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
194-
code: code.as_u16(),
195-
};
196-
197-
// error here since we check if any success above
198-
error!(relay_id = relay.id.as_ref(), retry, %err, "failed registration");
199-
return Err(err);
200-
};
190+
safe_read_http_response(res, MAX_SIZE_DEFAULT, relay.id.as_ref()).await.inspect_err(|e| {
191+
error!(relay_id = relay.id.as_ref(), retry, %e, "failed registration");
192+
})?;
201193

202194
debug!(
203195
relay_id = relay.id.as_ref(),

crates/pbs/src/mev_boost/status.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
33
use axum::http::HeaderMap;
44
use cb_common::{
55
pbs::{RelayClient, error::PbsError},
6-
wire::{get_user_agent_with_version, read_chunked_body_with_max},
6+
wire::{get_user_agent_with_version, safe_read_http_response},
77
};
88
use futures::future::select_ok;
99
use reqwest::header::USER_AGENT;
@@ -73,16 +73,9 @@ async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(),
7373
let code = res.status();
7474
RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc();
7575

76-
if !code.is_success() {
77-
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_DEFAULT).await?;
78-
let err = PbsError::RelayResponse {
79-
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
80-
code: code.as_u16(),
81-
};
82-
83-
error!(relay_id = relay.id.as_ref(),%err, "status failed");
84-
return Err(err);
85-
};
76+
safe_read_http_response(res, MAX_SIZE_DEFAULT, relay.id.as_ref()).await.inspect_err(|e| {
77+
error!(relay_id = relay.id.as_ref(), %e, "status failed");
78+
})?;
8679

8780
debug!(relay_id = relay.id.as_ref(),?code, latency = ?request_latency, "status passed");
8881

crates/pbs/src/mev_boost/submit_block.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use cb_common::{
1414
error::{PbsError, ValidationError},
1515
},
1616
utils::utcnow_ms,
17-
wire::{get_user_agent_with_version, read_chunked_body_with_max},
17+
wire::{get_user_agent_with_version, safe_read_http_response},
1818
};
1919
use futures::{FutureExt, future::select_ok};
2020
use reqwest::header::USER_AGENT;
@@ -201,17 +201,11 @@ async fn send_submit_block(
201201
.with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id])
202202
.inc();
203203

204-
let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?;
205-
if !code.is_success() {
206-
let err = PbsError::RelayResponse {
207-
error_msg: String::from_utf8_lossy(&response_bytes).into_owned(),
208-
code: code.as_u16(),
209-
};
210-
211-
// we requested the payload from all relays, but some may have not received it
212-
warn!(relay_id = relay.id.as_ref(), retry, %err, "failed to get payload (this might be ok if other relays have it)");
213-
return Err(err);
214-
};
204+
let response_bytes = safe_read_http_response(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE, relay.id.as_ref())
205+
.await
206+
.inspect_err(|e| {
207+
warn!(relay_id = relay.id.as_ref(), retry, %e, "failed to get payload (this might be ok if other relays have it)");
208+
})?;
215209

216210
if api_version != &BuilderApiVersion::V1 {
217211
// v2 response is going to be empty, so just break here

tests/tests/pbs_mux.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,25 @@ async fn test_ssv_network_fetch_big_data() -> Result<()> {
7878
let server_handle =
7979
cb_tests::mock_ssv_public::create_mock_public_ssv_server(port, None).await?;
8080
let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap();
81-
let response = request_ssv_pubkeys_from_public_api(url, Duration::from_secs(120)).await;
81+
let response = request_ssv_pubkeys_from_public_api(url.clone(), Duration::from_secs(120)).await;
8282

8383
// The response should fail due to content length being too big
8484
match response {
8585
Ok(_) => {
8686
panic!("Expected an error due to big content length, but got a successful response")
8787
}
8888
Err(e) => match e.downcast_ref::<ResponseReadError>() {
89-
Some(ResponseReadError::PayloadTooLarge { max, content_length }) => {
89+
Some(ResponseReadError::PayloadTooLarge {
90+
max,
91+
content_length,
92+
request_url,
93+
request_id,
94+
}) => {
9095
assert_eq!(*max, MUXER_HTTP_MAX_LENGTH);
9196
assert!(*content_length > MUXER_HTTP_MAX_LENGTH);
97+
assert_eq!(url.as_str(), request_url.as_str());
98+
// url used as request id
99+
assert_eq!(request_id.as_str(), request_url.as_str());
92100
}
93101
_ => panic!("Expected PayloadTooLarge error, got: {}", e),
94102
},
@@ -138,17 +146,25 @@ async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()>
138146
set_ignore_content_length(true);
139147
let server_handle = create_mock_public_ssv_server(port, None).await?;
140148
let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap();
141-
let response = request_ssv_pubkeys_from_public_api(url, Duration::from_secs(120)).await;
149+
let response = request_ssv_pubkeys_from_public_api(url.clone(), Duration::from_secs(120)).await;
142150

143151
// The response should fail due to the body being too big
144152
match response {
145153
Ok(_) => {
146154
panic!("Expected an error due to excessive data, but got a successful response")
147155
}
148156
Err(e) => match e.downcast_ref::<ResponseReadError>() {
149-
Some(ResponseReadError::PayloadTooLarge { max, content_length }) => {
157+
Some(ResponseReadError::PayloadTooLarge {
158+
max,
159+
content_length,
160+
request_url,
161+
request_id,
162+
}) => {
150163
assert_eq!(*max, MUXER_HTTP_MAX_LENGTH);
151164
assert_eq!(*content_length, 0);
165+
assert_eq!(url.as_str(), request_url.as_str());
166+
// url used as request id
167+
assert_eq!(request_id.as_str(), request_url.as_str());
152168
}
153169
_ => panic!("Expected PayloadTooLarge error, got: {}", e),
154170
},

0 commit comments

Comments
 (0)