Skip to content

Commit e99a190

Browse files
apollo_infra: add request max size tests (#13559)
1 parent c7cda8b commit e99a190

6 files changed

Lines changed: 171 additions & 17 deletions

File tree

crates/apollo_infra/src/component_definitions.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ where
125125
pub enum ServerError {
126126
#[error("Could not deserialize client request: {0}")]
127127
RequestDeserializationFailure(String),
128+
#[error("Request body too large: {0}")]
129+
RequestBodyTooLarge(String),
128130
}
129131

130132
#[derive(Debug)]

crates/apollo_infra/src/component_server/remote_component_server.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,8 @@ where
190190
match Limited::new(http_request.into_body(), max_request_body_bytes).collect().await {
191191
Ok(collected) => collected.to_bytes(),
192192
Err(err) => {
193-
error!("Failed to collect request body: {err}");
194-
let server_error = ServerError::RequestDeserializationFailure(
195-
"Request body too large".to_string(),
196-
);
193+
warn!("Request body too large: {err}");
194+
let server_error = ServerError::RequestBodyTooLarge(err.to_string());
197195
return Ok(HyperResponse::builder()
198196
.status(StatusCode::PAYLOAD_TOO_LARGE)
199197
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::convert::Infallible;
2+
3+
use apollo_proc_macros::unique_u16;
4+
use bytes::Bytes;
5+
use http::header::CONTENT_TYPE;
6+
use http::{StatusCode, Uri};
7+
use http_body_util::{BodyExt, Full};
8+
use hyper::body::Incoming;
9+
use hyper::service::service_fn;
10+
use hyper::{Request, Response};
11+
use hyper_util::client::legacy::Client;
12+
use hyper_util::rt::{TokioExecutor, TokioIo};
13+
use hyper_util::server::conn::auto::Builder as Http2ServerBuilder;
14+
use tokio::net::TcpListener;
15+
use tokio::sync::mpsc::channel;
16+
use tokio::task;
17+
18+
use crate::component_client::{ClientError, LocalComponentClient, RemoteClientConfig};
19+
use crate::component_definitions::{
20+
RequestId,
21+
RequestWrapper,
22+
ServerError,
23+
APPLICATION_OCTET_STREAM,
24+
REQUEST_ID_HEADER,
25+
};
26+
use crate::component_server::{
27+
ComponentServerStarter,
28+
LocalComponentServer,
29+
LocalServerConfig,
30+
RemoteComponentServer,
31+
RemoteServerConfig,
32+
};
33+
use crate::serde_utils::SerdeWrapper;
34+
use crate::tests::test_utils::{
35+
available_ports_factory,
36+
ComponentA,
37+
ComponentAClient,
38+
ComponentAClientTrait,
39+
ComponentARequest,
40+
ComponentAResponse,
41+
ComponentBClient,
42+
FAST_FAILING_CLIENT_CONFIG,
43+
TEST_LOCAL_CLIENT_METRICS,
44+
TEST_LOCAL_SERVER_METRICS,
45+
TEST_REMOTE_CLIENT_METRICS,
46+
TEST_REMOTE_SERVER_METRICS,
47+
};
48+
49+
/// Server rejects a request whose body exceeds `max_request_body_bytes` with 413 and
50+
/// `ServerError::RequestBodyTooLarge`.
51+
#[tokio::test]
52+
async fn request_body_too_large() {
53+
let mut available_ports = available_ports_factory(unique_u16!());
54+
let a_socket = available_ports.get_next_local_host_socket();
55+
let dummy_b_socket = available_ports.get_next_local_host_socket();
56+
57+
// B client points at a non-existent server; it will never be called because the oversized
58+
// request is rejected at the HTTP layer before any component logic runs.
59+
let b_remote_client = ComponentBClient::new(
60+
RemoteClientConfig::default(),
61+
&dummy_b_socket.ip().to_string(),
62+
dummy_b_socket.port(),
63+
&TEST_REMOTE_CLIENT_METRICS,
64+
);
65+
let component_a = ComponentA::new(Box::new(b_remote_client));
66+
67+
let (tx_a, rx_a) = channel::<RequestWrapper<ComponentARequest, ComponentAResponse>>(32);
68+
let a_local_client = LocalComponentClient::new(tx_a, &TEST_LOCAL_CLIENT_METRICS);
69+
70+
let mut local_server = LocalComponentServer::new(
71+
component_a,
72+
&LocalServerConfig::default(),
73+
rx_a,
74+
&TEST_LOCAL_SERVER_METRICS,
75+
);
76+
task::spawn(async move {
77+
let _ = local_server.start().await;
78+
});
79+
80+
let server_config = RemoteServerConfig { max_request_body_bytes: 1, ..Default::default() };
81+
let mut remote_server = RemoteComponentServer::new(
82+
a_local_client,
83+
server_config,
84+
a_socket.port(),
85+
&TEST_REMOTE_SERVER_METRICS,
86+
);
87+
task::spawn(async move {
88+
let _ = remote_server.start().await;
89+
});
90+
task::yield_now().await;
91+
92+
let uri: Uri = format!("http://[{}]:{}/", a_socket.ip(), a_socket.port()).parse().unwrap();
93+
let http_request = Request::post(uri)
94+
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
95+
.header(REQUEST_ID_HEADER, RequestId::generate().to_string())
96+
.body(Full::new(Bytes::from("x".repeat(1024))))
97+
.unwrap();
98+
let http_response =
99+
Client::builder(TokioExecutor::new()).build_http().request(http_request).await.unwrap();
100+
101+
assert_eq!(http_response.status(), StatusCode::PAYLOAD_TOO_LARGE);
102+
let body_bytes = http_response.into_body().collect().await.unwrap().to_bytes();
103+
let server_error = SerdeWrapper::<ServerError>::wrapper_deserialize(&body_bytes).unwrap();
104+
assert!(matches!(server_error, ServerError::RequestBodyTooLarge(_)));
105+
}
106+
107+
/// Client returns `ResponseParsingFailure` when the server's response body exceeds
108+
/// `max_response_body_bytes`.
109+
#[tokio::test]
110+
async fn response_body_too_large() {
111+
let socket = available_ports_factory(unique_u16!()).get_next_local_host_socket();
112+
task::spawn(async move {
113+
async fn handler(
114+
_http_request: Request<Incoming>,
115+
) -> Result<Response<Full<Bytes>>, Infallible> {
116+
Ok(Response::builder()
117+
.status(StatusCode::OK)
118+
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
119+
.body(Full::new(Bytes::from(vec![0u8; 1024])))
120+
.unwrap())
121+
}
122+
123+
let listener = TcpListener::bind(&socket).await.unwrap();
124+
loop {
125+
let Ok((stream, _)) = listener.accept().await else { continue };
126+
let io = TokioIo::new(stream);
127+
let service = service_fn(|req| async move { handler(req).await });
128+
tokio::spawn(async move {
129+
let _ = Http2ServerBuilder::new(TokioExecutor::new())
130+
.http2()
131+
.serve_connection(io, service)
132+
.await;
133+
});
134+
}
135+
});
136+
task::yield_now().await;
137+
138+
let client_config =
139+
RemoteClientConfig { max_response_body_bytes: 1, retries: 0, ..FAST_FAILING_CLIENT_CONFIG };
140+
let client = ComponentAClient::new(
141+
client_config,
142+
&socket.ip().to_string(),
143+
socket.port(),
144+
&TEST_REMOTE_CLIENT_METRICS,
145+
);
146+
147+
let Err(error) = client.a_get_value().await else {
148+
panic!("Expected an error");
149+
};
150+
assert!(matches!(error, ClientError::ResponseParsingFailure(_)), "unexpected error: {error}");
151+
}

crates/apollo_infra/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub(crate) mod test_utils;
33
mod concurrent_servers_test;
44
mod local_component_client_server_test;
55
mod local_request_prioritization_test;
6+
mod max_request_size_test;
67
mod remote_client_connection_eviction_test;
78
mod remote_component_client_server_test;
89
mod remote_server_connection_eviction_test;

crates/apollo_infra/src/tests/remote_component_client_server_test.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use crate::tests::test_utils::{
7272
ResultA,
7373
ResultB,
7474
ValueB,
75+
FAST_FAILING_CLIENT_CONFIG,
7576
MAX_CONCURRENCY,
7677
TEST_LOCAL_CLIENT_METRICS,
7778
TEST_LOCAL_SERVER_METRICS,
@@ -85,18 +86,6 @@ const ARBITRARY_DATA: &str = "arbitrary data";
8586
// ServerError::RequestDeserializationFailure error message.
8687
const DESERIALIZE_REQ_ERROR_MESSAGE: &str = "Could not deserialize client request";
8788
const BAD_REQUEST_ERROR_MESSAGE: &str = "Got status code: 400 Bad Request";
88-
const FAST_FAILING_CLIENT_CONFIG: RemoteClientConfig = RemoteClientConfig {
89-
retries: 0,
90-
idle_connections: 0,
91-
keepalive_timeout_ms: 0,
92-
max_retry_interval_ms: 0,
93-
initial_retry_delay_ms: 0,
94-
attempts_per_log: 1,
95-
connection_timeout_ms: 500,
96-
request_timeout_ms: 1000,
97-
set_tcp_nodelay: true,
98-
max_response_body_bytes: usize::MAX,
99-
};
10089

10190
#[async_trait]
10291
impl ComponentAClientTrait for RemoteComponentClient<ComponentARequest, ComponentAResponse> {

crates/apollo_infra/src/tests/test_utils.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
2121
use tokio::net::TcpStream;
2222
use tokio::sync::Semaphore;
2323

24-
use crate::component_client::{ClientResult, RemoteComponentClient};
24+
use crate::component_client::{ClientResult, RemoteClientConfig, RemoteComponentClient};
2525
use crate::component_definitions::{ComponentRequestHandler, ComponentStarter, PrioritizedRequest};
2626
use crate::component_server::RemoteServerConfig;
2727
use crate::metrics::{
@@ -43,6 +43,19 @@ pub(crate) type ComponentBClient = RemoteComponentClient<ComponentBRequest, Comp
4343
pub(crate) const VALID_VALUE_A: ValueA = Felt::ONE;
4444
pub(crate) const MAX_CONCURRENCY: usize = 10;
4545

46+
pub(crate) const FAST_FAILING_CLIENT_CONFIG: RemoteClientConfig = RemoteClientConfig {
47+
retries: 0,
48+
idle_connections: 0,
49+
keepalive_timeout_ms: 0,
50+
max_retry_interval_ms: 0,
51+
initial_retry_delay_ms: 0,
52+
attempts_per_log: 1,
53+
connection_timeout_ms: 500,
54+
request_timeout_ms: 1000,
55+
set_tcp_nodelay: true,
56+
max_response_body_bytes: usize::MAX,
57+
};
58+
4659
#[derive(Serialize, Deserialize, Clone, AsRefStr, EnumDiscriminants)]
4760
#[strum_discriminants(
4861
name(ComponentARequestLabelValue),

0 commit comments

Comments
 (0)