Skip to content

Commit eecf395

Browse files
apollo_infra: add client connection eviction test
1 parent 732e555 commit eecf395

2 files changed

Lines changed: 106 additions & 0 deletions

File tree

crates/apollo_infra/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ pub(crate) mod test_utils;
33
mod concurrent_servers_test;
44
mod local_component_client_server_test;
55
mod local_request_prioritization_test;
6+
mod remote_client_connection_eviction_test;
67
mod remote_component_client_server_test;
78
mod server_metrics_test;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::convert::Infallible;
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use apollo_proc_macros::unique_u16;
7+
use bytes::Bytes;
8+
use http::header::CONTENT_TYPE;
9+
use http::StatusCode;
10+
use http_body_util::Full;
11+
use hyper::body::Incoming;
12+
use hyper::service::service_fn;
13+
use hyper::{Request, Response};
14+
use hyper_util::rt::{TokioExecutor, TokioIo};
15+
use hyper_util::server::conn::auto::Builder as Http2ServerBuilder;
16+
use tokio::net::TcpListener;
17+
use tokio::task;
18+
use tokio::time::sleep;
19+
20+
use crate::component_client::RemoteClientConfig;
21+
use crate::component_definitions::APPLICATION_OCTET_STREAM;
22+
use crate::serde_utils::SerdeWrapper;
23+
use crate::tests::test_utils::{
24+
available_ports_factory,
25+
ComponentAClient,
26+
ComponentAClientTrait,
27+
ComponentAResponse,
28+
TEST_REMOTE_CLIENT_METRICS,
29+
VALID_VALUE_A,
30+
};
31+
32+
/// Verifies that Hyper evicts an idle connection from its pool after the keepalive timeout
33+
/// and opens a new connection for the next request, which is detected by counting server-side
34+
/// accepts.
35+
#[tokio::test]
36+
async fn idle_connection_is_evicted_after_pool_timeout() {
37+
const SUFFICIENTLY_LONG_KEEPALIVE_TIMEOUT_MS: u64 = 1000;
38+
const MARGIN_MS: u64 = 300;
39+
40+
let socket = available_ports_factory(unique_u16!()).get_next_local_host_socket();
41+
let accept_count = Arc::new(AtomicUsize::new(0));
42+
43+
// A server that accepts connections and keeps track of the number of connections it has
44+
// accepted via `accept_count`.
45+
{
46+
let accept_count = accept_count.clone();
47+
task::spawn(async move {
48+
let listener = TcpListener::bind(socket).await.unwrap();
49+
loop {
50+
let Ok((stream, _)) = listener.accept().await else { continue };
51+
accept_count.fetch_add(1, Ordering::SeqCst);
52+
let io = TokioIo::new(stream);
53+
let service = service_fn(|_req: Request<Incoming>| async {
54+
let body = ComponentAResponse::AGetValue(VALID_VALUE_A);
55+
Ok::<_, Infallible>(
56+
Response::builder()
57+
.status(StatusCode::OK)
58+
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
59+
.body(Full::new(Bytes::from(
60+
SerdeWrapper::new(body).wrapper_serialize().unwrap(),
61+
)))
62+
.unwrap(),
63+
)
64+
});
65+
tokio::spawn(async move {
66+
let _ = Http2ServerBuilder::new(TokioExecutor::new())
67+
.http2()
68+
.serve_connection(io, service)
69+
.await;
70+
});
71+
}
72+
});
73+
}
74+
task::yield_now().await;
75+
76+
let client = ComponentAClient::new(
77+
RemoteClientConfig {
78+
keepalive_timeout_ms: SUFFICIENTLY_LONG_KEEPALIVE_TIMEOUT_MS,
79+
..Default::default()
80+
},
81+
&socket.ip().to_string(),
82+
socket.port(),
83+
&TEST_REMOTE_CLIENT_METRICS,
84+
);
85+
86+
// Establish connection C1.
87+
client.a_get_value().await.expect("First request should succeed");
88+
client.a_get_value().await.expect("Second request should succeed");
89+
assert_eq!(
90+
accept_count.load(Ordering::SeqCst),
91+
1,
92+
"There should have been a single connection."
93+
);
94+
95+
// Let the idle timeout expire.
96+
sleep(Duration::from_millis(SUFFICIENTLY_LONG_KEEPALIVE_TIMEOUT_MS + MARGIN_MS)).await;
97+
98+
// The next checkout detects C1 is expired, drops it, and opens a new connection C2.
99+
client.a_get_value().await.expect("Third request should succeed");
100+
assert_eq!(
101+
accept_count.load(Ordering::SeqCst),
102+
2,
103+
"There should have been a new connection opened."
104+
);
105+
}

0 commit comments

Comments
 (0)