Skip to content

Commit b4bb433

Browse files
authored
chore(flushing): standardize code with refactoring on some flushers and retries (#1018)
## Overview Simplify code for flushing, trying to standardize everything by avoiding code all over the place, ensuring that we only create one client and we can reuse as much as possible for performance improvements ## Motivation [SVLS-8507](https://datadoghq.atlassian.net/browse/SVLS-8507) [SVLS-8507]: https://datadoghq.atlassian.net/browse/SVLS-8507?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 2ed0e31 commit b4bb433

7 files changed

Lines changed: 348 additions & 240 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ use bottlecap::{
6666
span_dedup_service,
6767
stats_aggregator::StatsAggregator,
6868
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
69-
stats_flusher::{self, StatsFlusher},
69+
stats_flusher,
7070
stats_generator::StatsGenerator,
7171
stats_processor, trace_agent,
7272
trace_aggregator::SendDataBuilderInfo,
7373
trace_aggregator_service::{
7474
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
7575
},
76-
trace_flusher::{self, TraceFlusher},
76+
trace_flusher,
7777
trace_processor::{self, SendingTraceProcessor},
7878
},
7979
};
@@ -1081,9 +1081,9 @@ fn start_trace_agent(
10811081
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
10821082
) -> (
10831083
Sender<SendDataBuilderInfo>,
1084-
Arc<trace_flusher::ServerlessTraceFlusher>,
1084+
Arc<trace_flusher::TraceFlusher>,
10851085
Arc<trace_processor::ServerlessTraceProcessor>,
1086-
Arc<stats_flusher::ServerlessStatsFlusher>,
1086+
Arc<stats_flusher::StatsFlusher>,
10871087
Arc<ProxyFlusher>,
10881088
tokio_util::sync::CancellationToken,
10891089
StatsConcentratorHandle,
@@ -1096,7 +1096,7 @@ fn start_trace_agent(
10961096
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
10971097
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
10981098
));
1099-
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
1099+
let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new(
11001100
api_key_factory.clone(),
11011101
stats_aggregator.clone(),
11021102
Arc::clone(config),
@@ -1108,7 +1108,7 @@ fn start_trace_agent(
11081108
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
11091109
tokio::spawn(trace_aggregator_service.run());
11101110

1111-
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
1111+
let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new(
11121112
trace_aggregator_handle.clone(),
11131113
config.clone(),
11141114
api_key_factory.clone(),
@@ -1394,6 +1394,7 @@ mod flush_handles_tests {
13941394
let mut handles = FlushHandles::new();
13951395
let handle = tokio::spawn(async {
13961396
sleep(Duration::from_millis(5)).await;
1397+
Vec::new() // Return empty Vec for stats retry
13971398
});
13981399
handles.stats_flush_handles.push(handle);
13991400

bottlecap/src/flushing/handles.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use datadog_protos::metrics::SketchPayload;
44
use dogstatsd::datadog::Series;
5+
use libdd_trace_protobuf::pb;
56
use libdd_trace_utils::send_data::SendData;
67
use tokio::task::JoinHandle;
78

@@ -32,8 +33,8 @@ pub struct FlushHandles {
3233
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
3334
/// Handles for proxy flush operations. Returns failed request builders for retry.
3435
pub proxy_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
35-
/// Handles for stats flush operations. Stats don't support retry.
36-
pub stats_flush_handles: Vec<JoinHandle<()>>,
36+
/// Handles for stats flush operations. Returns failed stats payloads for retry.
37+
pub stats_flush_handles: Vec<JoinHandle<Vec<pb::ClientStatsPayload>>>,
3738
}
3839

3940
impl FlushHandles {

bottlecap/src/flushing/service.rs

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,11 @@ use crate::traces::{
2323
/// - Spawning non-blocking flush tasks
2424
/// - Awaiting pending flush handles with retry logic
2525
/// - Performing blocking flushes (spawn + await)
26-
///
27-
/// # Type Parameters
28-
///
29-
/// * `TF` - Trace flusher type implementing `TraceFlusher`
30-
/// * `SF` - Stats flusher type implementing `StatsFlusher`
31-
pub struct FlushingService<TF, SF>
32-
where
33-
TF: TraceFlusher + Send + Sync + 'static,
34-
SF: StatsFlusher + Send + Sync + 'static,
35-
{
26+
pub struct FlushingService {
3627
// Flushers
3728
logs_flusher: LogsFlusher,
38-
trace_flusher: Arc<TF>,
39-
stats_flusher: Arc<SF>,
29+
trace_flusher: Arc<TraceFlusher>,
30+
stats_flusher: Arc<StatsFlusher>,
4031
proxy_flusher: Arc<ProxyFlusher>,
4132
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,
4233

@@ -47,17 +38,13 @@ where
4738
handles: FlushHandles,
4839
}
4940

50-
impl<TF, SF> FlushingService<TF, SF>
51-
where
52-
TF: TraceFlusher + Send + Sync + 'static,
53-
SF: StatsFlusher + Send + Sync + 'static,
54-
{
41+
impl FlushingService {
5542
/// Creates a new `FlushingService` with the given flushers.
5643
#[must_use]
5744
pub fn new(
5845
logs_flusher: LogsFlusher,
59-
trace_flusher: Arc<TF>,
60-
stats_flusher: Arc<SF>,
46+
trace_flusher: Arc<TraceFlusher>,
47+
stats_flusher: Arc<StatsFlusher>,
6148
proxy_flusher: Arc<ProxyFlusher>,
6249
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,
6350
metrics_aggr_handle: MetricsAggregatorHandle,
@@ -135,11 +122,13 @@ where
135122
self.handles.metric_flush_handles.push(handle);
136123
}
137124

138-
// Spawn stats flush (fire-and-forget, no retry)
125+
// Spawn stats flush
139126
let sf = Arc::clone(&self.stats_flusher);
140127
self.handles
141128
.stats_flush_handles
142-
.push(tokio::spawn(async move { sf.flush(false).await }));
129+
.push(tokio::spawn(async move {
130+
sf.flush(false, None).await.unwrap_or_default()
131+
}));
143132

144133
// Spawn proxy flush
145134
let pf = self.proxy_flusher.clone();
@@ -166,11 +155,25 @@ where
166155
let mut joinset = tokio::task::JoinSet::new();
167156
let mut flush_error = false;
168157

169-
// Await stats handles (no retry)
158+
// Await stats handles with retry
170159
for handle in self.handles.stats_flush_handles.drain(..) {
171-
if let Err(e) = handle.await {
172-
error!("FLUSHING_SERVICE | stats flush error {e:?}");
173-
flush_error = true;
160+
match handle.await {
161+
Ok(retry) => {
162+
let sf = self.stats_flusher.clone();
163+
if !retry.is_empty() {
164+
debug!(
165+
"FLUSHING_SERVICE | redriving {:?} stats payloads",
166+
retry.len()
167+
);
168+
joinset.spawn(async move {
169+
sf.flush(false, Some(retry)).await;
170+
});
171+
}
172+
}
173+
Err(e) => {
174+
error!("FLUSHING_SERVICE | stats flush error {e:?}");
175+
flush_error = true;
176+
}
174177
}
175178
}
176179

@@ -325,7 +328,7 @@ where
325328
self.logs_flusher.flush(None),
326329
futures::future::join_all(metrics_futures),
327330
self.trace_flusher.flush(None),
328-
self.stats_flusher.flush(force_stats),
331+
self.stats_flusher.flush(force_stats, None),
329332
self.proxy_flusher.flush(None),
330333
);
331334
}
@@ -340,11 +343,7 @@ where
340343
}
341344
}
342345

343-
impl<TF, SF> std::fmt::Debug for FlushingService<TF, SF>
344-
where
345-
TF: TraceFlusher + Send + Sync + 'static,
346-
SF: StatsFlusher + Send + Sync + 'static,
347-
{
346+
impl std::fmt::Debug for FlushingService {
348347
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349348
f.debug_struct("FlushingService")
350349
.field("handles", &self.handles)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Hyper-based HTTP client for trace and stats flushers.
5+
//!
6+
//! This module provides the HTTP client type required by `libdd_trace_utils`
7+
//! for sending traces and stats to Datadog intake endpoints.
8+
9+
use hyper_http_proxy;
10+
use hyper_rustls::HttpsConnectorBuilder;
11+
use libdd_common::{GenericHttpClient, hyper_migration};
12+
use rustls::RootCertStore;
13+
use rustls_pki_types::CertificateDer;
14+
use std::error::Error;
15+
use std::fs::File;
16+
use std::io::BufReader;
17+
use std::sync::LazyLock;
18+
use tracing::debug;
19+
20+
/// Type alias for the HTTP client used by trace and stats flushers.
21+
///
22+
/// This is the client type expected by `libdd_trace_utils::SendData::send()`.
23+
pub type HyperClient =
24+
GenericHttpClient<hyper_http_proxy::ProxyConnector<libdd_common::connector::Connector>>;
25+
26+
/// Initialize the crypto provider needed for setting custom root certificates.
27+
fn ensure_crypto_provider_initialized() {
28+
static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| {
29+
#[cfg(unix)]
30+
rustls::crypto::aws_lc_rs::default_provider()
31+
.install_default()
32+
.expect("Failed to install default CryptoProvider");
33+
});
34+
35+
let () = &*INIT_CRYPTO_PROVIDER;
36+
}
37+
38+
/// Creates a new hyper-based HTTP client with the given configuration.
39+
///
40+
/// This client is compatible with `libdd_trace_utils` and supports:
41+
/// - HTTPS proxy configuration
42+
/// - Custom TLS root certificates
43+
///
44+
/// # Arguments
45+
///
46+
/// * `proxy_https` - Optional HTTPS proxy URL
47+
/// * `tls_cert_file` - Optional path to a PEM file containing root certificates
48+
///
49+
/// # Errors
50+
///
51+
/// Returns an error if:
52+
/// - The proxy URL cannot be parsed
53+
/// - The TLS certificate file cannot be read or parsed
54+
pub fn create_client(
55+
proxy_https: Option<&String>,
56+
tls_cert_file: Option<&String>,
57+
) -> Result<HyperClient, Box<dyn Error>> {
58+
// Create the base connector with optional custom TLS config
59+
let connector = if let Some(ca_cert_path) = tls_cert_file {
60+
// Ensure crypto provider is initialized before creating TLS config
61+
ensure_crypto_provider_initialized();
62+
63+
// Load the custom certificate
64+
let cert_file = File::open(ca_cert_path)?;
65+
let mut reader = BufReader::new(cert_file);
66+
let certs: Vec<CertificateDer> =
67+
rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
68+
69+
// Create a root certificate store and add custom certs
70+
let mut root_store = RootCertStore::empty();
71+
for cert in certs {
72+
root_store.add(cert)?;
73+
}
74+
75+
// Build the TLS config with custom root certificates
76+
let tls_config = rustls::ClientConfig::builder()
77+
.with_root_certificates(root_store)
78+
.with_no_client_auth();
79+
80+
// Build the HTTPS connector with custom config
81+
let https_connector = HttpsConnectorBuilder::new()
82+
.with_tls_config(tls_config)
83+
.https_or_http()
84+
.enable_http1()
85+
.build();
86+
87+
debug!(
88+
"HYPER_CLIENT | Added root certificate from {}",
89+
ca_cert_path
90+
);
91+
92+
// Construct the Connector::Https variant directly
93+
libdd_common::connector::Connector::Https(https_connector)
94+
} else {
95+
// Use default connector
96+
libdd_common::connector::Connector::default()
97+
};
98+
99+
if let Some(proxy) = proxy_https {
100+
let proxy =
101+
hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?);
102+
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?;
103+
let client = hyper_migration::client_builder().build(proxy_connector);
104+
debug!(
105+
"HYPER_CLIENT | Proxy connector created with proxy: {:?}",
106+
proxy_https
107+
);
108+
Ok(client)
109+
} else {
110+
let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
111+
Ok(hyper_migration::client_builder().build(proxy_connector))
112+
}
113+
}

bottlecap/src/traces/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
pub mod context;
5+
pub mod hyper_client;
56
pub mod propagation;
67
pub mod proxy_aggregator;
78
pub mod proxy_flusher;

0 commit comments

Comments
 (0)