Skip to content

Commit 3498b89

Browse files
authored
fix(http): consolidate clients (#1062)
## Overview The extension was constructing up to 6 separate HTTP clients across different subsystems (metrics, logs, trace proxy, trace flusher, stats flusher, and Lambda API). Three of the reqwest::Client instances (metrics, logs, trace proxy) were identically configured via get_client(config), and two HttpClient (hyper) instances (trace flusher, stats flusher) were identically configured via http_client::create_client(). Each independent client maintains its own connection pool and TLS sessions, which means: - Duplicate TLS handshakes to the same Datadog intake hosts - No connection reuse across subsystems hitting the same endpoints - Unnecessary memory overhead from redundant connection pools This change consolidates to 3 clients total: 1. reqwest::Client (no-proxy) — Lambda Extensions API (intentionally isolated, localhost only) 2. reqwest::Client (shared) — Metrics, Logs, Trace proxy 3. HttpClient (shared) — Trace flusher, Stats flusher Both reqwest::Client and HttpClient are Arc-based internally, so cloning is a refcount increment that shares the underlying connection pool. This also removes the lazy OnceCell initialization from TraceFlusher and StatsFlusher in favor of eager construction at startup. The lazy init was retrying on failure, but the only failure modes (bad proxy URL, unreadable cert file) are configuration errors that won't self-resolve between retries, so failing fast is preferable. ## Testing Unit tests, integration tests, and self monitoring
1 parent 321fb09 commit 3498b89

File tree

6 files changed

+54
-88
lines changed

6 files changed

+54
-88
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use bottlecap::{
6060
provider::Provider as TagProvider,
6161
},
6262
traces::{
63+
http_client as trace_http_client,
6364
propagation::DatadogCompositePropagator,
6465
proxy_aggregator,
6566
proxy_flusher::Flusher as ProxyFlusher,
@@ -292,17 +293,28 @@ async fn extension_loop_active(
292293
let account_id = r.account_id.as_ref().unwrap_or(&"none".to_string()).clone();
293294
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
294295

296+
// Build one shared reqwest::Client for metrics, logs, and trace proxy flushing.
297+
// reqwest::Client is Arc-based internally, so cloning just increments a refcount
298+
// and shares the connection pool.
299+
let shared_client = bottlecap::http::get_client(config);
300+
295301
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
296302
start_logs_agent(
297303
config,
298304
Arc::clone(&api_key_factory),
299305
&tags_provider,
300306
event_bus_tx.clone(),
301307
aws_config.is_managed_instance_mode(),
308+
&shared_client,
302309
);
303310

304-
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
305-
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
311+
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
312+
tags_provider.clone(),
313+
Arc::clone(&api_key_factory),
314+
config,
315+
&shared_client,
316+
)
317+
.await;
306318

307319
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config)));
308320
// Lifecycle Invocation Processor
@@ -345,6 +357,7 @@ async fn extension_loop_active(
345357
&tags_provider,
346358
invocation_processor_handle.clone(),
347359
appsec_processor.clone(),
360+
&shared_client,
348361
);
349362

350363
let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy(
@@ -1020,6 +1033,7 @@ fn start_logs_agent(
10201033
tags_provider: &Arc<TagProvider>,
10211034
event_bus: Sender<Event>,
10221035
is_managed_instance_mode: bool,
1036+
client: &Client,
10231037
) -> (
10241038
Sender<TelemetryEvent>,
10251039
LogsFlusher,
@@ -1048,7 +1062,12 @@ fn start_logs_agent(
10481062
drop(agent);
10491063
});
10501064

1051-
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
1065+
let flusher = LogsFlusher::new(
1066+
api_key_factory,
1067+
aggregator_handle.clone(),
1068+
config.clone(),
1069+
client.clone(),
1070+
);
10521071
(tx, flusher, cancel_token, aggregator_handle)
10531072
}
10541073

@@ -1059,6 +1078,7 @@ fn start_trace_agent(
10591078
tags_provider: &Arc<TagProvider>,
10601079
invocation_processor_handle: InvocationProcessorHandle,
10611080
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
1081+
client: &Client,
10621082
) -> (
10631083
Sender<SendDataBuilderInfo>,
10641084
Arc<trace_flusher::TraceFlusher>,
@@ -1069,6 +1089,14 @@ fn start_trace_agent(
10691089
StatsConcentratorHandle,
10701090
TraceAggregatorHandle,
10711091
) {
1092+
// Build one shared hyper-based HTTP client for trace and stats flushing.
1093+
// This client type is required by libdd_trace_utils for SendData::send().
1094+
let trace_http_client = trace_http_client::create_client(
1095+
config.proxy_https.as_ref(),
1096+
config.tls_cert_file.as_ref(),
1097+
)
1098+
.expect("Failed to create trace HTTP client");
1099+
10721100
// Stats
10731101
let (stats_concentrator_service, stats_concentrator_handle) =
10741102
StatsConcentratorService::new(Arc::clone(config));
@@ -1080,6 +1108,7 @@ fn start_trace_agent(
10801108
api_key_factory.clone(),
10811109
stats_aggregator.clone(),
10821110
Arc::clone(config),
1111+
trace_http_client.clone(),
10831112
));
10841113

10851114
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
@@ -1092,6 +1121,7 @@ fn start_trace_agent(
10921121
trace_aggregator_handle.clone(),
10931122
config.clone(),
10941123
api_key_factory.clone(),
1124+
trace_http_client,
10951125
));
10961126

10971127
let obfuscation_config = obfuscation_config::ObfuscationConfig {
@@ -1117,6 +1147,7 @@ fn start_trace_agent(
11171147
Arc::clone(&proxy_aggregator),
11181148
Arc::clone(tags_provider),
11191149
Arc::clone(config),
1150+
client.clone(),
11201151
));
11211152

11221153
let trace_agent = trace_agent::TraceAgent::new(
@@ -1157,6 +1188,7 @@ async fn start_dogstatsd(
11571188
tags_provider: Arc<TagProvider>,
11581189
api_key_factory: Arc<ApiKeyFactory>,
11591190
config: &Arc<Config>,
1191+
client: &Client,
11601192
) -> (
11611193
Arc<Vec<MetricsFlusher>>,
11621194
MetricsAggregatorHandle,
@@ -1180,12 +1212,11 @@ async fn start_dogstatsd(
11801212
});
11811213

11821214
// Get flushers with aggregator handle
1183-
let metrics_client = bottlecap::http::get_client(config);
11841215
let flushers = Arc::new(start_metrics_flushers(
11851216
Arc::clone(&api_key_factory),
11861217
&aggregator_handle,
11871218
config,
1188-
&metrics_client,
1219+
client,
11891220
));
11901221

11911222
// Create Dogstatsd server

bottlecap/src/logs/flusher.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::FLUSH_RETRY_COUNT;
22
use crate::config;
3-
use crate::http::get_client;
43
use crate::logs::aggregator_service::AggregatorHandle;
54
use dogstatsd::api_key::ApiKeyFactory;
65
use futures::future::join_all;
@@ -36,8 +35,8 @@ impl Flusher {
3635
api_key_factory: Arc<ApiKeyFactory>,
3736
endpoint: String,
3837
config: Arc<config::Config>,
38+
client: reqwest::Client,
3939
) -> Self {
40-
let client = get_client(&config);
4140
Flusher {
4241
client,
4342
endpoint,
@@ -199,6 +198,7 @@ impl LogsFlusher {
199198
api_key_factory: Arc<ApiKeyFactory>,
200199
aggregator_handle: AggregatorHandle,
201200
config: Arc<config::Config>,
201+
client: reqwest::Client,
202202
) -> Self {
203203
let mut flushers = Vec::new();
204204

@@ -216,6 +216,7 @@ impl LogsFlusher {
216216
Arc::clone(&api_key_factory),
217217
endpoint,
218218
config.clone(),
219+
client.clone(),
219220
));
220221

221222
// Create flushers for additional endpoints
@@ -227,6 +228,7 @@ impl LogsFlusher {
227228
additional_api_key_factory,
228229
endpoint_url,
229230
config.clone(),
231+
client.clone(),
230232
));
231233
}
232234

bottlecap/src/traces/proxy_flusher.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use tracing::{debug, error};
99

1010
use crate::{
1111
FLUSH_RETRY_COUNT, config,
12-
http::get_client,
1312
tags::provider,
1413
traces::{
1514
DD_ADDITIONAL_TAGS_HEADER,
@@ -39,9 +38,8 @@ impl Flusher {
3938
aggregator: Arc<Mutex<Aggregator>>,
4039
tags_provider: Arc<provider::Provider>,
4140
config: Arc<config::Config>,
41+
client: reqwest::Client,
4242
) -> Self {
43-
let client = get_client(&config);
44-
4543
Flusher {
4644
client,
4745
aggregator,

bottlecap/src/traces/stats_flusher.rs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tokio::sync::OnceCell;
88

99
use crate::config;
1010
use crate::lifecycle::invocation::processor::S_TO_MS;
11-
use crate::traces::http_client::{self, HttpClient};
11+
use crate::traces::http_client::HttpClient;
1212
use crate::traces::stats_aggregator::StatsAggregator;
1313
use dogstatsd::api_key::ApiKeyFactory;
1414
use libdd_common::Endpoint;
@@ -21,10 +21,7 @@ pub struct StatsFlusher {
2121
config: Arc<config::Config>,
2222
api_key_factory: Arc<ApiKeyFactory>,
2323
endpoint: OnceCell<Endpoint>,
24-
/// Cached HTTP client, lazily initialized on first use.
25-
/// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could
26-
/// share a single HTTP client for better connection pooling.
27-
http_client: OnceCell<HttpClient>,
24+
http_client: HttpClient,
2825
}
2926

3027
impl StatsFlusher {
@@ -33,13 +30,14 @@ impl StatsFlusher {
3330
api_key_factory: Arc<ApiKeyFactory>,
3431
aggregator: Arc<Mutex<StatsAggregator>>,
3532
config: Arc<config::Config>,
33+
http_client: HttpClient,
3634
) -> Self {
3735
StatsFlusher {
3836
aggregator,
3937
config,
4038
api_key_factory,
4139
endpoint: OnceCell::new(),
42-
http_client: OnceCell::new(),
40+
http_client,
4341
}
4442
}
4543

@@ -97,18 +95,11 @@ impl StatsFlusher {
9795

9896
let start = std::time::Instant::now();
9997

100-
// Get or create the cached HTTP client
101-
let http_client = self.get_or_init_http_client().await;
102-
let Some(http_client) = http_client else {
103-
error!("STATS | Failed to create HTTP client, will retry");
104-
return Some(stats);
105-
};
106-
10798
let resp = stats_utils::send_stats_payload_with_client(
10899
serialized_stats_payload,
109100
endpoint,
110101
api_key.as_str(),
111-
Some(http_client),
102+
Some(&self.http_client),
112103
)
113104
.await;
114105
let elapsed = start.elapsed();
@@ -170,29 +161,4 @@ impl StatsFlusher {
170161
Some(all_failed)
171162
}
172163
}
173-
/// Returns a reference to the cached HTTP client, initializing it if necessary.
174-
///
175-
/// The client is created once and reused for all subsequent flushes,
176-
/// providing connection pooling and TLS session reuse.
177-
///
178-
/// Returns `None` if client creation fails. The error is logged but not cached,
179-
/// allowing retry on subsequent calls.
180-
async fn get_or_init_http_client(&self) -> Option<&HttpClient> {
181-
match self
182-
.http_client
183-
.get_or_try_init(|| async {
184-
http_client::create_client(
185-
self.config.proxy_https.as_ref(),
186-
self.config.tls_cert_file.as_ref(),
187-
)
188-
})
189-
.await
190-
{
191-
Ok(client) => Some(client),
192-
Err(e) => {
193-
error!("STATS_FLUSHER | Failed to create HTTP client: {e}");
194-
None
195-
}
196-
}
197-
}
198164
}

bottlecap/src/traces/trace_flusher.rs

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ use libdd_trace_utils::{
1111
};
1212
use std::str::FromStr;
1313
use std::sync::Arc;
14-
use tokio::sync::OnceCell;
1514
use tokio::task::JoinSet;
1615
use tracing::{debug, error};
1716

1817
use crate::config::Config;
1918
use crate::lifecycle::invocation::processor::S_TO_MS;
20-
use crate::traces::http_client::{self, HttpClient};
19+
use crate::traces::http_client::HttpClient;
2120
use crate::traces::trace_aggregator_service::AggregatorHandle;
2221

2322
pub struct TraceFlusher {
@@ -28,10 +27,7 @@ pub struct TraceFlusher {
2827
/// Configured via `DD_APM_ADDITIONAL_ENDPOINTS` (e.g., sending to both US and EU).
2928
/// Each trace batch is sent to the primary endpoint AND all additional endpoints.
3029
pub additional_endpoints: Vec<Endpoint>,
31-
/// Cached HTTP client, lazily initialized on first use.
32-
/// TODO: `TraceFlusher` and `StatsFlusher` both hit trace.agent.datadoghq.{site} and could
33-
/// share a single HTTP client for better connection pooling.
34-
http_client: OnceCell<HttpClient>,
30+
http_client: HttpClient,
3531
}
3632

3733
impl TraceFlusher {
@@ -40,6 +36,7 @@ impl TraceFlusher {
4036
aggregator_handle: AggregatorHandle,
4137
config: Arc<Config>,
4238
api_key_factory: Arc<ApiKeyFactory>,
39+
http_client: HttpClient,
4340
) -> Self {
4441
// Parse additional endpoints for dual-shipping from config.
4542
// Format: { "https://trace.agent.datadoghq.eu": ["api-key-1", "api-key-2"], ... }
@@ -65,7 +62,7 @@ impl TraceFlusher {
6562
config,
6663
api_key_factory,
6764
additional_endpoints,
68-
http_client: OnceCell::new(),
65+
http_client,
6966
}
7067
}
7168

@@ -83,11 +80,7 @@ impl TraceFlusher {
8380
return None;
8481
};
8582

86-
// Get or create the cached HTTP client
87-
let Some(http_client) = self.get_or_init_http_client().await else {
88-
error!("TRACES | Failed to create HTTP client, skipping flush");
89-
return None;
90-
};
83+
let http_client = &self.http_client;
9184

9285
let mut failed_batch: Vec<SendData> = Vec::new();
9386

@@ -167,32 +160,6 @@ impl TraceFlusher {
167160
None
168161
}
169162

170-
/// Returns a clone of the cached HTTP client, initializing it if necessary.
171-
///
172-
/// The client is created once and reused for all subsequent flushes,
173-
/// providing connection pooling and TLS session reuse.
174-
///
175-
/// Returns `None` if client creation fails. The error is logged but not cached,
176-
/// allowing retry on subsequent calls.
177-
async fn get_or_init_http_client(&self) -> Option<HttpClient> {
178-
match self
179-
.http_client
180-
.get_or_try_init(|| async {
181-
http_client::create_client(
182-
self.config.proxy_https.as_ref(),
183-
self.config.tls_cert_file.as_ref(),
184-
)
185-
})
186-
.await
187-
{
188-
Ok(client) => Some(client.clone()),
189-
Err(e) => {
190-
error!("TRACES | Failed to create HTTP client: {e}");
191-
None
192-
}
193-
}
194-
}
195-
196163
/// Sends traces to the Datadog intake endpoint using the provided HTTP client.
197164
///
198165
/// Each `SendData` is sent to its own configured target endpoint.

bottlecap/tests/logs_integration_test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ async fn test_logs() {
7070
false,
7171
);
7272
let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key));
73-
let logs_flusher = LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone());
73+
let client = bottlecap::http::get_client(&Arc::clone(&arc_conf));
74+
let logs_flusher =
75+
LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone(), client);
7476

7577
let telemetry_events: Vec<TelemetryEvent> = serde_json::from_str(
7678
r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#)

0 commit comments

Comments
 (0)