Skip to content

Commit d2d30fc

Browse files
authored
fix(http): disable pooling on shared reqwest client (#1092) (#1248)
## Overview Fixes the remaining cases of [#1092](#1092) — customers continuing to see `Max retries exceeded, returning request error` and `TRACES | Request failed: No requests sent` after the v94 patch in #1094. PR #1094 disabled connection pooling on the **hyper** `HttpClient` used by the trace flusher and stats flusher. It did not touch the **reqwest** `Client` in `bottlecap/src/http.rs`, which is used by: - the trace proxy (`proxy_flusher`) — the path AppSec and many tracer configurations route through - the metrics flusher - the logs flusher That client was still configured with `pool_idle_timeout(270s)` and `tcp_keepalive(120s)`, leaving idle connections in the pool across Lambda freeze/resume cycles. Connections go stale during a freeze (the OS tears down the remote side while the local kernel still considers them established), and the next request after resume fails with broken pipe / RST → retries exhaust → the user-visible errors. ## Change `bottlecap/src/http.rs`: replace `pool_idle_timeout(270s) + tcp_keepalive(120s)` with `pool_max_idle_per_host(0)`. Mirrors the fix in `bottlecap/src/traces/http_client.rs` and the pattern in libdatadog's `new_client_periodic`. Each request opens a fresh connection; TLS session resumption inside the client still amortizes the handshake. Trade-off: one extra TLS handshake per flush. Matches the pre-v93 behavior that customers had no complaints about. ## Testing - New unit test `shared_client_does_not_pool_connections` in `bottlecap/src/http.rs`: - Spawns a minimal HTTP/1.1 keep-alive server that counts accepted TCP connections. - Issues two sequential GETs through `get_client(...)`. - Asserts the server saw **2** connections. - Verified the test **fails** against the unpatched code (`opened=1`, connection reused from pool) — proves it catches the regression. - Verified the test **passes** against the patched code (`opened=2`, fresh connection each time). - `cargo test --lib` — 547 tests pass. - `cargo clippy --lib --tests -- -D warnings` — clean. - `cargo fmt --check` — clean. ## Other clients (unchanged) | Client | File | Pooling | Notes | |---|---|---|---| | hyper `HttpClient` | `traces/http_client.rs` | already disabled (#1094) | trace + stats flushers | | reqwest `Client` (shared) | `bottlecap/src/http.rs` | **disabled here** | trace proxy, metrics, logs | | reqwest `Client` (Lambda Extensions API) | `bin/bottlecap/main.rs` | unchanged | localhost-only; pooling irrelevant |
1 parent 1c181be commit d2d30fc

1 file changed

Lines changed: 104 additions & 3 deletions

File tree

bottlecap/src/http.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ pub fn get_client(config: &Arc<config::Config>) -> reqwest::Client {
2626
fn build_client(config: &Arc<config::Config>) -> Result<reqwest::Client, Box<dyn Error>> {
2727
let mut client = create_reqwest_client_builder()?
2828
.timeout(Duration::from_secs(config.flush_timeout))
29-
.pool_idle_timeout(Some(Duration::from_secs(270)))
30-
// Enable TCP keepalive
31-
.tcp_keepalive(Some(Duration::from_secs(120)))
29+
// Disable connection pooling to avoid stale connections after Lambda freeze/resume.
30+
// The execution environment can be frozen for seconds to minutes between invocations;
31+
// pooled connections become stale during this time and fail on reuse, surfacing as
32+
// "Max retries exceeded" in the trace proxy / metrics / logs flushers. Mirrors the
33+
// pattern in `traces/http_client.rs` and libdatadog's `new_client_periodic`.
34+
.pool_max_idle_per_host(0)
3235
.danger_accept_invalid_certs(config.skip_ssl_validation);
3336

3437
// Determine if we should use HTTP/1 or HTTP/2
@@ -145,3 +148,101 @@ pub fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
145148
})
146149
.collect()
147150
}
151+
152+
#[cfg(test)]
153+
mod tests {
154+
use super::*;
155+
use std::sync::atomic::{AtomicUsize, Ordering};
156+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
157+
use tokio::net::TcpListener;
158+
159+
/// Spawns a minimal HTTP/1.1 keep-alive server that:
160+
/// - increments `connection_counter` on every accepted TCP connection
161+
/// - serves any number of requests on the same connection until the client
162+
/// closes it (matching how Datadog intakes behave in production).
163+
///
164+
/// With pooling enabled, a client making two sequential requests will reuse
165+
/// the same TCP connection → counter stays at 1. With pooling disabled,
166+
/// each request opens a fresh connection → counter reaches 2.
167+
async fn spawn_keepalive_server(connection_counter: Arc<AtomicUsize>) -> u16 {
168+
let listener = TcpListener::bind("127.0.0.1:0")
169+
.await
170+
.expect("bind test listener");
171+
let port = listener.local_addr().expect("listener local_addr").port();
172+
tokio::spawn(async move {
173+
loop {
174+
let Ok((mut sock, _)) = listener.accept().await else {
175+
return;
176+
};
177+
connection_counter.fetch_add(1, Ordering::SeqCst);
178+
tokio::spawn(async move {
179+
let mut buf = vec![0u8; 8192];
180+
let mut accumulated = Vec::new();
181+
loop {
182+
let n = match sock.read(&mut buf).await {
183+
Ok(0) | Err(_) => return,
184+
Ok(n) => n,
185+
};
186+
accumulated.extend_from_slice(&buf[..n]);
187+
while let Some(idx) = accumulated.windows(4).position(|w| w == b"\r\n\r\n")
188+
{
189+
accumulated.drain(..idx + 4);
190+
if sock
191+
.write_all(
192+
b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: keep-alive\r\n\r\nok",
193+
)
194+
.await
195+
.is_err()
196+
{
197+
return;
198+
}
199+
}
200+
}
201+
});
202+
}
203+
});
204+
port
205+
}
206+
207+
/// Verifies that the shared reqwest client does not pool idle connections:
208+
/// two sequential requests to the same host must result in two distinct
209+
/// TCP connections at the server. This guards against regression of
210+
/// [issue #1092](https://github.com/DataDog/datadog-lambda-extension/issues/1092):
211+
/// pooled connections go stale across Lambda freeze/resume and surface as
212+
/// "Max retries exceeded" errors on the proxy / metrics / logs flushers.
213+
#[tokio::test]
214+
async fn shared_client_does_not_pool_connections() {
215+
let counter = Arc::new(AtomicUsize::new(0));
216+
let port = spawn_keepalive_server(counter.clone()).await;
217+
218+
let cfg = config::Config {
219+
// Force HTTP/1 path; HTTP/2 would also work but H1 makes the pool
220+
// behavior easiest to reason about with a hand-rolled server.
221+
http_protocol: Some("http1".to_string()),
222+
flush_timeout: 5,
223+
..config::Config::default()
224+
};
225+
let client = get_client(&Arc::new(cfg));
226+
227+
let url = format!("http://127.0.0.1:{port}/");
228+
229+
// First request: opens connection #1.
230+
let r1 = client.get(&url).send().await.expect("first request");
231+
assert_eq!(r1.status(), 200);
232+
// Drain body so reqwest can decide whether to return the conn to the pool.
233+
let _ = r1.bytes().await;
234+
235+
// Second request: with pooling disabled, must open a fresh connection #2.
236+
// With pooling enabled (the v93-era regression), reqwest would reuse
237+
// conn #1, which after a Lambda freeze would be stale.
238+
let r2 = client.get(&url).send().await.expect("second request");
239+
assert_eq!(r2.status(), 200);
240+
let _ = r2.bytes().await;
241+
242+
let opened = counter.load(Ordering::SeqCst);
243+
assert_eq!(
244+
opened, 2,
245+
"expected 2 fresh TCP connections, got {opened} (pooling not disabled?)"
246+
);
247+
}
248+
}

0 commit comments

Comments
 (0)