Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ use bottlecap::{
span_dedup_service,
stats_aggregator::StatsAggregator,
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
stats_flusher::{self, StatsFlusher},
stats_flusher,
stats_generator::StatsGenerator,
stats_processor, trace_agent,
trace_aggregator::SendDataBuilderInfo,
trace_aggregator_service::{
AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService,
},
trace_flusher::{self, TraceFlusher},
trace_flusher,
trace_processor::{self, SendingTraceProcessor},
},
};
Expand Down Expand Up @@ -1081,9 +1081,9 @@ fn start_trace_agent(
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
) -> (
Sender<SendDataBuilderInfo>,
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<trace_flusher::TraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
Arc<stats_flusher::StatsFlusher>,
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
StatsConcentratorHandle,
Expand All @@ -1096,7 +1096,7 @@ fn start_trace_agent(
let stats_aggregator: Arc<TokioMutex<StatsAggregator>> = Arc::new(TokioMutex::new(
StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()),
));
let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new(
let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new(
api_key_factory.clone(),
stats_aggregator.clone(),
Arc::clone(config),
Expand All @@ -1108,7 +1108,7 @@ fn start_trace_agent(
let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default();
tokio::spawn(trace_aggregator_service.run());

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new(
let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new(
trace_aggregator_handle.clone(),
config.clone(),
api_key_factory.clone(),
Expand Down Expand Up @@ -1394,6 +1394,7 @@ mod flush_handles_tests {
let mut handles = FlushHandles::new();
let handle = tokio::spawn(async {
sleep(Duration::from_millis(5)).await;
Vec::new() // Return empty Vec for stats retry
});
handles.stats_flush_handles.push(handle);

Expand Down
5 changes: 3 additions & 2 deletions bottlecap/src/flushing/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use datadog_protos::metrics::SketchPayload;
use dogstatsd::datadog::Series;
use libdd_trace_protobuf::pb;
use libdd_trace_utils::send_data::SendData;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -32,8 +33,8 @@ pub struct FlushHandles {
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
/// Handles for proxy flush operations. Returns failed request builders for retry.
pub proxy_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
/// Handles for stats flush operations. Stats don't support retry.
pub stats_flush_handles: Vec<JoinHandle<()>>,
/// Handles for stats flush operations. Returns failed stats payloads for retry.
pub stats_flush_handles: Vec<JoinHandle<Vec<pb::ClientStatsPayload>>>,
}

impl FlushHandles {
Expand Down
61 changes: 30 additions & 31 deletions bottlecap/src/flushing/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,11 @@ use crate::traces::{
/// - Spawning non-blocking flush tasks
/// - Awaiting pending flush handles with retry logic
/// - Performing blocking flushes (spawn + await)
///
/// # Type Parameters
///
/// * `TF` - Trace flusher type implementing `TraceFlusher`
/// * `SF` - Stats flusher type implementing `StatsFlusher`
pub struct FlushingService<TF, SF>
where
TF: TraceFlusher + Send + Sync + 'static,
SF: StatsFlusher + Send + Sync + 'static,
{
pub struct FlushingService {
// Flushers
logs_flusher: LogsFlusher,
trace_flusher: Arc<TF>,
stats_flusher: Arc<SF>,
trace_flusher: Arc<TraceFlusher>,
stats_flusher: Arc<StatsFlusher>,
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,

Expand All @@ -47,17 +38,13 @@ where
handles: FlushHandles,
}

impl<TF, SF> FlushingService<TF, SF>
where
TF: TraceFlusher + Send + Sync + 'static,
SF: StatsFlusher + Send + Sync + 'static,
{
impl FlushingService {
/// Creates a new `FlushingService` with the given flushers.
#[must_use]
pub fn new(
logs_flusher: LogsFlusher,
trace_flusher: Arc<TF>,
stats_flusher: Arc<SF>,
trace_flusher: Arc<TraceFlusher>,
stats_flusher: Arc<StatsFlusher>,
proxy_flusher: Arc<ProxyFlusher>,
metrics_flushers: Arc<TokioMutex<Vec<MetricsFlusher>>>,
metrics_aggr_handle: MetricsAggregatorHandle,
Expand Down Expand Up @@ -135,11 +122,13 @@ where
self.handles.metric_flush_handles.push(handle);
}

// Spawn stats flush (fire-and-forget, no retry)
// Spawn stats flush
let sf = Arc::clone(&self.stats_flusher);
self.handles
.stats_flush_handles
.push(tokio::spawn(async move { sf.flush(false).await }));
.push(tokio::spawn(async move {
sf.flush(false, None).await.unwrap_or_default()
}));

// Spawn proxy flush
let pf = self.proxy_flusher.clone();
Expand All @@ -166,11 +155,25 @@ where
let mut joinset = tokio::task::JoinSet::new();
let mut flush_error = false;

// Await stats handles (no retry)
// Await stats handles with retry
for handle in self.handles.stats_flush_handles.drain(..) {
if let Err(e) = handle.await {
error!("FLUSHING_SERVICE | stats flush error {e:?}");
flush_error = true;
match handle.await {
Ok(retry) => {
let sf = self.stats_flusher.clone();
if !retry.is_empty() {
debug!(
"FLUSHING_SERVICE | redriving {:?} stats payloads",
retry.len()
);
joinset.spawn(async move {
sf.flush(false, Some(retry)).await;
});
}
}
Err(e) => {
error!("FLUSHING_SERVICE | stats flush error {e:?}");
flush_error = true;
}
}
}

Expand Down Expand Up @@ -325,7 +328,7 @@ where
self.logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
self.trace_flusher.flush(None),
self.stats_flusher.flush(force_stats),
self.stats_flusher.flush(force_stats, None),
self.proxy_flusher.flush(None),
);
}
Expand All @@ -340,11 +343,7 @@ where
}
}

impl<TF, SF> std::fmt::Debug for FlushingService<TF, SF>
where
TF: TraceFlusher + Send + Sync + 'static,
SF: StatsFlusher + Send + Sync + 'static,
{
impl std::fmt::Debug for FlushingService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlushingService")
.field("handles", &self.handles)
Expand Down
113 changes: 113 additions & 0 deletions bottlecap/src/traces/hyper_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Hyper-based HTTP client for trace and stats flushers.
//!
//! This module provides the HTTP client type required by `libdd_trace_utils`
//! for sending traces and stats to Datadog intake endpoints.

use hyper_http_proxy;
use hyper_rustls::HttpsConnectorBuilder;
use libdd_common::{GenericHttpClient, hyper_migration};
use rustls::RootCertStore;
use rustls_pki_types::CertificateDer;
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::sync::LazyLock;
use tracing::debug;

/// Type alias for the HTTP client used by trace and stats flushers.
///
/// This is the client type expected by `libdd_trace_utils::SendData::send()`.
pub type HyperClient =
GenericHttpClient<hyper_http_proxy::ProxyConnector<libdd_common::connector::Connector>>;

/// Initialize the crypto provider needed for setting custom root certificates.
fn ensure_crypto_provider_initialized() {
static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| {
#[cfg(unix)]
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to install default CryptoProvider");
});

let () = &*INIT_CRYPTO_PROVIDER;
}

/// Creates a new hyper-based HTTP client with the given configuration.
///
/// This client is compatible with `libdd_trace_utils` and supports:
/// - HTTPS proxy configuration
/// - Custom TLS root certificates
///
/// # Arguments
///
/// * `proxy_https` - Optional HTTPS proxy URL
/// * `tls_cert_file` - Optional path to a PEM file containing root certificates
///
/// # Errors
///
/// Returns an error if:
/// - The proxy URL cannot be parsed
/// - The TLS certificate file cannot be read or parsed
pub fn create_client(
proxy_https: Option<&String>,
tls_cert_file: Option<&String>,
) -> Result<HyperClient, Box<dyn Error>> {
// Create the base connector with optional custom TLS config
let connector = if let Some(ca_cert_path) = tls_cert_file {
// Ensure crypto provider is initialized before creating TLS config
ensure_crypto_provider_initialized();

// Load the custom certificate
let cert_file = File::open(ca_cert_path)?;
let mut reader = BufReader::new(cert_file);
let certs: Vec<CertificateDer> =
rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;

// Create a root certificate store and add custom certs
let mut root_store = RootCertStore::empty();
for cert in certs {
root_store.add(cert)?;
}

// Build the TLS config with custom root certificates
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

// Build the HTTPS connector with custom config
let https_connector = HttpsConnectorBuilder::new()
.with_tls_config(tls_config)
.https_or_http()
.enable_http1()
.build();

debug!(
"HYPER_CLIENT | Added root certificate from {}",
ca_cert_path
);

// Construct the Connector::Https variant directly
libdd_common::connector::Connector::Https(https_connector)
} else {
// Use default connector
libdd_common::connector::Connector::default()
};

if let Some(proxy) = proxy_https {
let proxy =
hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?);
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?;
let client = hyper_migration::client_builder().build(proxy_connector);
debug!(
"HYPER_CLIENT | Proxy connector created with proxy: {:?}",
proxy_https
);
Ok(client)
} else {
let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
Ok(hyper_migration::client_builder().build(proxy_connector))
}
}
1 change: 1 addition & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub mod context;
pub mod hyper_client;
pub mod propagation;
pub mod proxy_aggregator;
pub mod proxy_flusher;
Expand Down
Loading
Loading