Skip to content

Commit 9959196

Browse files
committed
Moved read_chunked_body_with_max to cb-common
1 parent 9709e2a commit 9959196

5 files changed

Lines changed: 69 additions & 32 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ ethereum_serde_utils.workspace = true
2323
ethereum_ssz.workspace = true
2424
ethereum_ssz_derive.workspace = true
2525
eyre.workspace = true
26+
futures.workspace = true
2627
jsonwebtoken.workspace = true
2728
pbkdf2.workspace = true
2829
rand.workspace = true

crates/common/src/config/utils.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::{collections::HashMap, env, path::Path};
22

33
use alloy::rpc::types::beacon::BlsPublicKey;
4-
use bytes::{BufMut, BytesMut};
54
use eyre::{bail, Context, Result};
65
use serde::de::DeserializeOwned;
76

87
use super::JWTS_ENV;
9-
use crate::{config::MUXER_HTTP_MAX_LENGTH, types::ModuleId};
8+
use crate::{
9+
config::MUXER_HTTP_MAX_LENGTH,
10+
types::ModuleId,
11+
utils::{read_chunked_body_with_max, ResponseReadError},
12+
};
1013

1114
pub const CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV: &str = "CB_TEST_HTTP_DISABLE_CONTENT_LENGTH";
1215

@@ -36,7 +39,7 @@ pub fn load_jwt_secrets() -> Result<HashMap<ModuleId, String>> {
3639

3740
/// Reads an HTTP response safely, erroring out if it failed or if the body is
3841
/// too large.
39-
pub async fn safe_read_http_response(mut response: reqwest::Response) -> Result<String> {
42+
pub async fn safe_read_http_response(response: reqwest::Response) -> Result<String> {
4043
// Get the content length from the response headers
4144
let mut content_length = response.content_length();
4245
if env::var(CB_TEST_HTTP_DISABLE_CONTENT_LENGTH_ENV).is_ok() {
@@ -56,18 +59,16 @@ pub async fn safe_read_http_response(mut response: reqwest::Response) -> Result<
5659
}
5760

5861
// Read the response to a buffer in chunks
59-
let mut buffer = BytesMut::with_capacity(1024);
60-
while let Some(chunk) = response.chunk().await? {
61-
if buffer.len() > MUXER_HTTP_MAX_LENGTH as usize {
62-
bail!(
63-
"Response body exceeds the maximum allowed length ({MUXER_HTTP_MAX_LENGTH} bytes)"
64-
);
65-
}
66-
buffer.put(chunk);
67-
}
62+
let result = read_chunked_body_with_max(response, MUXER_HTTP_MAX_LENGTH as usize).await;
63+
let bytes = match result {
64+
Ok(bytes) => Ok(bytes),
65+
Err(ResponseReadError::PayloadTooLarge { max: _, raw: _ }) => bail!(
66+
"Response body exceeds the maximum allowed length ({MUXER_HTTP_MAX_LENGTH} bytes)"
67+
),
68+
Err(ResponseReadError::ChunkError { inner }) => Err(inner),
69+
}?;
6870

6971
// Convert the buffer to a string
70-
let bytes = buffer.freeze();
7172
match std::str::from_utf8(&bytes) {
7273
Ok(s) => Ok(s.to_string()),
7374
Err(e) => bail!("Failed to decode response body as UTF-8: {e}"),

crates/common/src/utils.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ use alloy::{
99
};
1010
use axum::http::HeaderValue;
1111
use blst::min_pk::{PublicKey, Signature};
12+
use futures::StreamExt;
1213
use rand::{distr::Alphanumeric, Rng};
13-
use reqwest::header::HeaderMap;
14+
use reqwest::{header::HeaderMap, Error as ReqwestError, Response};
1415
use serde::{de::DeserializeOwned, Serialize};
1516
use serde_json::Value;
1617
use ssz::{Decode, Encode};
18+
use thiserror::Error;
1719
use tracing::Level;
1820
use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation};
1921
use tracing_subscriber::{
@@ -31,6 +33,46 @@ use crate::{
3133

3234
const MILLIS_PER_SECOND: u64 = 1_000;
3335

36+
#[derive(Debug, Error)]
37+
pub enum ResponseReadError {
38+
#[error("response size exceeds max size: max: {max} raw: {raw}")]
39+
PayloadTooLarge { max: usize, raw: String },
40+
41+
#[error("error reading chunk from response: {inner:?}")]
42+
ChunkError { inner: ReqwestError },
43+
}
44+
45+
/// Reads the body of a response as a chunked stream, ensuring the size does not
46+
/// exceed `max_size`.
47+
pub async fn read_chunked_body_with_max(
48+
res: Response,
49+
max_size: usize,
50+
) -> Result<Vec<u8>, ResponseReadError> {
51+
let mut stream = res.bytes_stream();
52+
let mut response_bytes = Vec::new();
53+
54+
while let Some(chunk) = stream.next().await {
55+
let chunk = match chunk {
56+
Ok(c) => c,
57+
Err(e) => {
58+
return Err(ResponseReadError::ChunkError { inner: e });
59+
}
60+
};
61+
if response_bytes.len() + chunk.len() > max_size {
62+
// avoid spamming logs if the message is too large
63+
response_bytes.truncate(1024);
64+
return Err(ResponseReadError::PayloadTooLarge {
65+
max: max_size,
66+
raw: String::from_utf8_lossy(&response_bytes).into_owned(),
67+
});
68+
}
69+
70+
response_bytes.extend_from_slice(&chunk);
71+
}
72+
73+
Ok(response_bytes)
74+
}
75+
3476
pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 {
3577
chain.genesis_time_sec() + slot * chain.slot_time_sec()
3678
}

crates/pbs/src/utils.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,21 @@
1-
use cb_common::pbs::error::PbsError;
2-
use futures::StreamExt;
1+
use cb_common::{
2+
pbs::error::PbsError,
3+
utils::{read_chunked_body_with_max as read_chunked_body_with_max_impl, ResponseReadError},
4+
};
35
use reqwest::Response;
46

57
pub async fn read_chunked_body_with_max(
68
res: Response,
79
max_size: usize,
810
) -> Result<Vec<u8>, PbsError> {
9-
let mut stream = res.bytes_stream();
10-
let mut response_bytes = Vec::new();
11-
12-
while let Some(chunk) = stream.next().await {
13-
let chunk = chunk?;
14-
if response_bytes.len() + chunk.len() > max_size {
15-
// avoid spamming logs if the message is too large
16-
response_bytes.truncate(1024);
17-
return Err(PbsError::PayloadTooLarge {
18-
max: max_size,
19-
raw: String::from_utf8_lossy(&response_bytes).into_owned(),
20-
});
11+
let result = read_chunked_body_with_max_impl(res, max_size).await;
12+
match result {
13+
Ok(bytes) => Ok(bytes),
14+
Err(ResponseReadError::PayloadTooLarge { max, raw }) => {
15+
Err(PbsError::PayloadTooLarge { max, raw })
2116
}
22-
23-
response_bytes.extend_from_slice(&chunk);
17+
Err(ResponseReadError::ChunkError { inner }) => Err(PbsError::Reqwest(inner)),
2418
}
25-
26-
Ok(response_bytes)
2719
}
2820

2921
const GAS_LIMIT_ADJUSTMENT_FACTOR: u64 = 1024;

0 commit comments

Comments
 (0)