diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 12ae368f7..185001f20 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -66,14 +66,14 @@ use bottlecap::{ span_dedup_service, stats_aggregator::StatsAggregator, stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService}, - stats_flusher::{self, StatsFlusher}, + stats_flusher, stats_generator::StatsGenerator, stats_processor, trace_agent, trace_aggregator::SendDataBuilderInfo, trace_aggregator_service::{ AggregatorHandle as TraceAggregatorHandle, AggregatorService as TraceAggregatorService, }, - trace_flusher::{self, TraceFlusher}, + trace_flusher, trace_processor::{self, SendingTraceProcessor}, }, }; @@ -1081,9 +1081,9 @@ fn start_trace_agent( appsec_processor: Option>>, ) -> ( Sender, - Arc, + Arc, Arc, - Arc, + Arc, Arc, tokio_util::sync::CancellationToken, StatsConcentratorHandle, @@ -1096,7 +1096,7 @@ fn start_trace_agent( let stats_aggregator: Arc> = Arc::new(TokioMutex::new( StatsAggregator::new_with_concentrator(stats_concentrator_handle.clone()), )); - let stats_flusher = Arc::new(stats_flusher::ServerlessStatsFlusher::new( + let stats_flusher = Arc::new(stats_flusher::StatsFlusher::new( api_key_factory.clone(), stats_aggregator.clone(), Arc::clone(config), @@ -1108,7 +1108,7 @@ fn start_trace_agent( let (trace_aggregator_service, trace_aggregator_handle) = TraceAggregatorService::default(); tokio::spawn(trace_aggregator_service.run()); - let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher::new( + let trace_flusher = Arc::new(trace_flusher::TraceFlusher::new( trace_aggregator_handle.clone(), config.clone(), api_key_factory.clone(), @@ -1394,6 +1394,7 @@ mod flush_handles_tests { let mut handles = FlushHandles::new(); let handle = tokio::spawn(async { sleep(Duration::from_millis(5)).await; + Vec::new() // Return empty Vec for stats retry }); handles.stats_flush_handles.push(handle); diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs index 851d8e4a9..61376ac4b 100644 --- a/bottlecap/src/flushing/handles.rs +++ b/bottlecap/src/flushing/handles.rs @@ -2,6 +2,7 @@ use datadog_protos::metrics::SketchPayload; use dogstatsd::datadog::Series; +use libdd_trace_protobuf::pb; use libdd_trace_utils::send_data::SendData; use tokio::task::JoinHandle; @@ -32,8 +33,8 @@ pub struct FlushHandles { pub metric_flush_handles: Vec>, /// Handles for proxy flush operations. Returns failed request builders for retry. pub proxy_flush_handles: Vec>>, - /// Handles for stats flush operations. Stats don't support retry. - pub stats_flush_handles: Vec>, + /// Handles for stats flush operations. Returns failed stats payloads for retry. + pub stats_flush_handles: Vec>>, } impl FlushHandles { diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 152a13140..046f65649 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -23,20 +23,11 @@ use crate::traces::{ /// - Spawning non-blocking flush tasks /// - Awaiting pending flush handles with retry logic /// - Performing blocking flushes (spawn + await) -/// -/// # Type Parameters -/// -/// * `TF` - Trace flusher type implementing `TraceFlusher` -/// * `SF` - Stats flusher type implementing `StatsFlusher` -pub struct FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +pub struct FlushingService { // Flushers logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, @@ -47,17 +38,13 @@ where handles: FlushHandles, } -impl FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl FlushingService { /// Creates a new `FlushingService` with the given flushers. #[must_use] pub fn new( logs_flusher: LogsFlusher, - trace_flusher: Arc, - stats_flusher: Arc, + trace_flusher: Arc, + stats_flusher: Arc, proxy_flusher: Arc, metrics_flushers: Arc>>, metrics_aggr_handle: MetricsAggregatorHandle, @@ -135,11 +122,13 @@ where self.handles.metric_flush_handles.push(handle); } - // Spawn stats flush (fire-and-forget, no retry) + // Spawn stats flush let sf = Arc::clone(&self.stats_flusher); self.handles .stats_flush_handles - .push(tokio::spawn(async move { sf.flush(false).await })); + .push(tokio::spawn(async move { + sf.flush(false, None).await.unwrap_or_default() + })); // Spawn proxy flush let pf = self.proxy_flusher.clone(); @@ -166,11 +155,25 @@ where let mut joinset = tokio::task::JoinSet::new(); let mut flush_error = false; - // Await stats handles (no retry) + // Await stats handles with retry for handle in self.handles.stats_flush_handles.drain(..) { - if let Err(e) = handle.await { - error!("FLUSHING_SERVICE | stats flush error {e:?}"); - flush_error = true; + match handle.await { + Ok(retry) => { + let sf = self.stats_flusher.clone(); + if !retry.is_empty() { + debug!( + "FLUSHING_SERVICE | redriving {:?} stats payloads", + retry.len() + ); + joinset.spawn(async move { + sf.flush(false, Some(retry)).await; + }); + } + } + Err(e) => { + error!("FLUSHING_SERVICE | stats flush error {e:?}"); + flush_error = true; + } } } @@ -325,7 +328,7 @@ where self.logs_flusher.flush(None), futures::future::join_all(metrics_futures), self.trace_flusher.flush(None), - self.stats_flusher.flush(force_stats), + self.stats_flusher.flush(force_stats, None), self.proxy_flusher.flush(None), ); } @@ -340,11 +343,7 @@ where } } -impl std::fmt::Debug for FlushingService -where - TF: TraceFlusher + Send + Sync + 'static, - SF: StatsFlusher + Send + Sync + 'static, -{ +impl std::fmt::Debug for FlushingService { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FlushingService") .field("handles", &self.handles) diff --git a/bottlecap/src/traces/hyper_client.rs b/bottlecap/src/traces/hyper_client.rs new file mode 100644 index 000000000..99cd60fe5 --- /dev/null +++ b/bottlecap/src/traces/hyper_client.rs @@ -0,0 +1,113 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Hyper-based HTTP client for trace and stats flushers. +//! +//! This module provides the HTTP client type required by `libdd_trace_utils` +//! for sending traces and stats to Datadog intake endpoints. + +use hyper_http_proxy; +use hyper_rustls::HttpsConnectorBuilder; +use libdd_common::{GenericHttpClient, hyper_migration}; +use rustls::RootCertStore; +use rustls_pki_types::CertificateDer; +use std::error::Error; +use std::fs::File; +use std::io::BufReader; +use std::sync::LazyLock; +use tracing::debug; + +/// Type alias for the HTTP client used by trace and stats flushers. +/// +/// This is the client type expected by `libdd_trace_utils::SendData::send()`. +pub type HyperClient = + GenericHttpClient>; + +/// Initialize the crypto provider needed for setting custom root certificates. +fn ensure_crypto_provider_initialized() { + static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { + #[cfg(unix)] + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("Failed to install default CryptoProvider"); + }); + + let () = &*INIT_CRYPTO_PROVIDER; +} + +/// Creates a new hyper-based HTTP client with the given configuration. +/// +/// This client is compatible with `libdd_trace_utils` and supports: +/// - HTTPS proxy configuration +/// - Custom TLS root certificates +/// +/// # Arguments +/// +/// * `proxy_https` - Optional HTTPS proxy URL +/// * `tls_cert_file` - Optional path to a PEM file containing root certificates +/// +/// # Errors +/// +/// Returns an error if: +/// - The proxy URL cannot be parsed +/// - The TLS certificate file cannot be read or parsed +pub fn create_client( + proxy_https: Option<&String>, + tls_cert_file: Option<&String>, +) -> Result> { + // Create the base connector with optional custom TLS config + let connector = if let Some(ca_cert_path) = tls_cert_file { + // Ensure crypto provider is initialized before creating TLS config + ensure_crypto_provider_initialized(); + + // Load the custom certificate + let cert_file = File::open(ca_cert_path)?; + let mut reader = BufReader::new(cert_file); + let certs: Vec = + rustls_pemfile::certs(&mut reader).collect::, _>>()?; + + // Create a root certificate store and add custom certs + let mut root_store = RootCertStore::empty(); + for cert in certs { + root_store.add(cert)?; + } + + // Build the TLS config with custom root certificates + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + + // Build the HTTPS connector with custom config + let https_connector = HttpsConnectorBuilder::new() + .with_tls_config(tls_config) + .https_or_http() + .enable_http1() + .build(); + + debug!( + "HYPER_CLIENT | Added root certificate from {}", + ca_cert_path + ); + + // Construct the Connector::Https variant directly + libdd_common::connector::Connector::Https(https_connector) + } else { + // Use default connector + libdd_common::connector::Connector::default() + }; + + if let Some(proxy) = proxy_https { + let proxy = + hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); + let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; + let client = hyper_migration::client_builder().build(proxy_connector); + debug!( + "HYPER_CLIENT | Proxy connector created with proxy: {:?}", + proxy_https + ); + Ok(client) + } else { + let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; + Ok(hyper_migration::client_builder().build(proxy_connector)) + } +} diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 5a2a515dc..7d1581c2e 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod context; +pub mod hyper_client; pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 7f2a0d998..222dddbd7 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -1,7 +1,6 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; @@ -9,62 +8,56 @@ use tokio::sync::OnceCell; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::stats_aggregator::StatsAggregator; -use crate::traces::trace_flusher::ServerlessTraceFlusher; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; use libdd_trace_protobuf::pb; use libdd_trace_utils::{config_utils::trace_stats_url, stats_utils}; use tracing::{debug, error}; -#[async_trait] -pub trait StatsFlusher { - fn new( - api_key_factory: Arc, - aggregator: Arc>, - config: Arc, - ) -> Self - where - Self: Sized; - /// Flushes stats to the Datadog trace stats intake. - async fn send(&self, traces: Vec); - - async fn flush(&self, force_flush: bool); -} - -#[allow(clippy::module_name_repetitions)] -#[derive(Clone)] -pub struct ServerlessStatsFlusher { - // pub buffer: Arc>>, +pub struct StatsFlusher { aggregator: Arc>, config: Arc, api_key_factory: Arc, endpoint: OnceCell, + /// Cached HTTP client, lazily initialized on first use. + /// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. + http_client: OnceCell, } -#[async_trait] -impl StatsFlusher for ServerlessStatsFlusher { - fn new( +impl StatsFlusher { + #[must_use] + pub fn new( api_key_factory: Arc, aggregator: Arc>, config: Arc, ) -> Self { - ServerlessStatsFlusher { + StatsFlusher { aggregator, config, api_key_factory, endpoint: OnceCell::new(), + http_client: OnceCell::new(), } } - async fn send(&self, stats: Vec) { + /// Flushes stats to the Datadog trace stats intake. + /// + /// Returns `None` on success, or `Some(failed_stats)` if the flush failed and should be retried. + pub async fn send( + &self, + stats: Vec, + ) -> Option> { if stats.is_empty() { - return; + return None; } let Some(api_key) = self.api_key_factory.get_api_key().await else { - error!("Skipping flushing stats: Failed to resolve API key"); - return; + error!("STATS | Skipping flushing stats: Failed to resolve API key"); + // No API key means we can't send - don't retry as it won't help + return None; }; let api_key_clone = api_key.to_string(); @@ -84,17 +77,18 @@ impl StatsFlusher for ServerlessStatsFlusher { }) .await; - debug!("Flushing {} stats", stats.len()); + debug!("STATS | Flushing {} stats", stats.len()); - let stats_payload = stats_utils::construct_stats_payload(stats); + let stats_payload = stats_utils::construct_stats_payload(stats.clone()); - debug!("Stats payload to be sent: {stats_payload:?}"); + debug!("STATS | Stats payload to be sent: {stats_payload:?}"); let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) { Ok(res) => res, Err(err) => { - error!("Failed to serialize stats payload, dropping stats: {err}"); - return; + // Serialization errors are permanent - data is malformed, don't retry + error!("STATS | Failed to serialize stats payload, dropping stats: {err}"); + return None; } }; @@ -102,43 +96,102 @@ impl StatsFlusher for ServerlessStatsFlusher { let start = std::time::Instant::now(); - let Ok(http_client) = ServerlessTraceFlusher::get_http_client( - self.config.proxy_https.as_ref(), - self.config.tls_cert_file.as_ref(), - ) else { - error!("STATS_FLUSHER | Failed to create HTTP client"); - return; + // Get or create the cached HTTP client + let http_client = self.get_or_init_http_client().await; + let Some(http_client) = http_client else { + error!("STATS | Failed to create HTTP client, will retry"); + return Some(stats); }; let resp = stats_utils::send_stats_payload_with_client( serialized_stats_payload, endpoint, api_key.as_str(), - Some(&http_client), + Some(http_client), ) .await; let elapsed = start.elapsed(); debug!( - "Stats request to {} took {} ms", + "STATS | Stats request to {} took {} ms", stats_url, elapsed.as_millis() ); match resp { - Ok(()) => debug!("Successfully flushed stats"), + Ok(()) => { + debug!("STATS | Successfully flushed stats"); + None + } Err(e) => { - error!("Error sending stats: {e:?}"); + // Network/server errors are temporary - return stats for retry + error!("STATS | Error sending stats: {e:?}"); + Some(stats) } - }; + } } - async fn flush(&self, force_flush: bool) { - let mut guard = self.aggregator.lock().await; + /// Flushes stats from the aggregator. + /// + /// Returns `None` on success, or `Some(failed_stats)` if any flush failed and should be retried. + /// If `failed_stats` is provided, it will attempt to send those first before fetching new stats. + pub async fn flush( + &self, + force_flush: bool, + failed_stats: Option>, + ) -> Option> { + let mut all_failed: Vec = Vec::new(); + + // First, retry any previously failed stats + if let Some(retry_stats) = failed_stats { + if !retry_stats.is_empty() { + debug!( + "STATS | Retrying {} previously failed stats", + retry_stats.len() + ); + if let Some(still_failed) = self.send(retry_stats).await { + all_failed.extend(still_failed); + } + } + } + // Then flush new stats from the aggregator + let mut guard = self.aggregator.lock().await; let mut stats = guard.get_batch(force_flush).await; while !stats.is_empty() { - self.send(stats).await; - + if let Some(failed) = self.send(stats).await { + all_failed.extend(failed); + } stats = guard.get_batch(force_flush).await; } + + if all_failed.is_empty() { + None + } else { + Some(all_failed) + } + } + /// Returns a reference to the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. + async fn get_or_init_http_client(&self) -> Option<&HyperClient> { + match self + .http_client + .get_or_try_init(|| async { + hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) + }) + .await + { + Ok(client) => Some(client), + Err(e) => { + error!("STATS_FLUSHER | Failed to create HTTP client: {e}"); + None + } + } } } diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 6d88a12d2..811751d8b 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -1,73 +1,49 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; use dogstatsd::api_key::ApiKeyFactory; -use hyper_http_proxy; -use hyper_rustls::HttpsConnectorBuilder; -use libdd_common::{Endpoint, GenericHttpClient, hyper_migration}; +use libdd_common::Endpoint; use libdd_trace_utils::{ config_utils::trace_intake_url_prefixed, send_data::SendDataBuilder, trace_utils::{self, SendData}, }; -use rustls::RootCertStore; -use rustls_pki_types::CertificateDer; -use std::error::Error; -use std::fs::File; -use std::io::BufReader; use std::str::FromStr; use std::sync::Arc; -use std::sync::LazyLock; +use tokio::sync::OnceCell; use tokio::task::JoinSet; use tracing::{debug, error}; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; +use crate::traces::hyper_client::{self, HyperClient}; use crate::traces::trace_aggregator_service::AggregatorHandle; -#[async_trait] -pub trait TraceFlusher { - fn new( - aggregator_handle: AggregatorHandle, - config: Arc, - api_key_factory: Arc, - ) -> Self - where - Self: Sized; - /// Given a `Vec`, a tracer payload, send it to the Datadog intake endpoint. - /// Returns the traces back if there was an error sending them. - async fn send( - traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, - ) -> Option>; - - /// Flushes traces by getting every available batch on the aggregator. - /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. - /// Returns any traces that failed to send and should be retried. - async fn flush(&self, failed_traces: Option>) -> Option>; -} - -#[derive(Clone)] -#[allow(clippy::module_name_repetitions)] -pub struct ServerlessTraceFlusher { +pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, pub api_key_factory: Arc, + /// Additional endpoints for dual-shipping traces to multiple Datadog sites. + /// Configured via `DD_APM_ADDITIONAL_ENDPOINTS` (e.g., sending to both US and EU). + /// Each trace batch is sent to the primary endpoint AND all additional endpoints. pub additional_endpoints: Vec, + /// Cached HTTP client, lazily initialized on first use. + /// TODO: `TraceFlusher` and `StatsFlusher` both hit trace.agent.datadoghq.{site} and could + /// share a single HTTP client for better connection pooling. + http_client: OnceCell, } -#[async_trait] -impl TraceFlusher for ServerlessTraceFlusher { - fn new( +impl TraceFlusher { + #[must_use] + pub fn new( aggregator_handle: AggregatorHandle, config: Arc, api_key_factory: Arc, ) -> Self { + // Parse additional endpoints for dual-shipping from config. + // Format: { "https://trace.agent.datadoghq.eu": ["api-key-1", "api-key-2"], ... } + // Each URL + API key combination becomes a separate endpoint. let mut additional_endpoints: Vec = Vec::new(); - for (endpoint_url, api_keys) in config.apm_additional_endpoints.clone() { for api_key in api_keys { let trace_intake_url = trace_intake_url_prefixed(&endpoint_url); @@ -78,20 +54,23 @@ impl TraceFlusher for ServerlessTraceFlusher { timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, }; - additional_endpoints.push(endpoint); } } - ServerlessTraceFlusher { + TraceFlusher { aggregator_handle, config, api_key_factory, additional_endpoints, + http_client: OnceCell::new(), } } - async fn flush(&self, failed_traces: Option>) -> Option> { + /// Flushes traces by getting every available batch on the aggregator. + /// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces. + /// Returns any traces that failed to send and should be retried. + pub async fn flush(&self, failed_traces: Option>) -> Option> { let Some(api_key) = self.api_key_factory.get_api_key().await else { error!( "TRACES | Failed to resolve API key, dropping aggregated data and skipping flushing." @@ -102,22 +81,27 @@ impl TraceFlusher for ServerlessTraceFlusher { return None; }; + // Get or create the cached HTTP client + let Some(http_client) = self.get_or_init_http_client().await else { + error!("TRACES | Failed to create HTTP client, skipping flush"); + return None; + }; + let mut failed_batch: Vec = Vec::new(); if let Some(traces) = failed_traces { - // If we have traces from a previous failed attempt, try to send those first + // If we have traces from a previous failed attempt, try to send those first. + // TODO: Currently retries always go to the primary endpoint (None), even if the + // original failure was for an additional endpoint. This means traces that failed + // to send to additional endpoints will be retried to the primary endpoint instead. + // To fix this, we need to track which endpoint each failed trace was destined for, + // possibly by storing (Vec, Option) pairs in failed_batch. if !traces.is_empty() { debug!( "TRACES | Retrying to send {} previously failed batches", traces.len() ); - let retry_result = Self::send( - traces, - None, - &self.config.proxy_https, - &self.config.tls_cert_file, - ) - .await; + let retry_result = Self::send_traces(traces, None, http_client.clone()).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -142,22 +126,26 @@ impl TraceFlusher for ServerlessTraceFlusher { .map(SendDataBuilder::build) .collect(); + // Send to PRIMARY endpoint (the default endpoint configured in the trace). + // Passing None means "use the endpoint already configured in the SendData". let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); - batch_tasks.spawn(async move { - Self::send(traces_clone, None, &proxy_https, &tls_cert_file).await - }); + let client_clone = http_client.clone(); + batch_tasks + .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); + // Send to ADDITIONAL endpoints for dual-shipping. + // Each additional endpoint gets the same traces, enabling multi-region delivery. for endpoint in self.additional_endpoints.clone() { let traces_clone = traces.clone(); - let proxy_https = self.config.proxy_https.clone(); - let tls_cert_file = self.config.tls_cert_file.clone(); + let client_clone = http_client.clone(); batch_tasks.spawn(async move { - Self::send(traces_clone, Some(&endpoint), &proxy_https, &tls_cert_file).await + Self::send_traces(traces_clone, Some(endpoint), client_clone).await }); } } + // Collect failed traces from all endpoints (primary + additional). + // Note: We lose track of which endpoint each failure came from here. + // All failures are mixed together and will be retried to the primary endpoint only. while let Some(result) = batch_tasks.join_next().await { if let Ok(Some(mut failed)) = result { failed_batch.append(&mut failed); @@ -171,11 +159,48 @@ impl TraceFlusher for ServerlessTraceFlusher { None } - async fn send( + /// Returns a clone of the cached HTTP client, initializing it if necessary. + /// + /// The client is created once and reused for all subsequent flushes, + /// providing connection pooling and TLS session reuse. + /// + /// Returns `None` if client creation fails. The error is logged but not cached, + /// allowing retry on subsequent calls. + async fn get_or_init_http_client(&self) -> Option { + match self + .http_client + .get_or_try_init(|| async { + hyper_client::create_client( + self.config.proxy_https.as_ref(), + self.config.tls_cert_file.as_ref(), + ) + }) + .await + { + Ok(client) => Some(client.clone()), + Err(e) => { + error!("TRACES | Failed to create HTTP client: {e}"); + None + } + } + } + + /// Sends traces to the Datadog intake endpoint using the provided HTTP client. + /// + /// # Arguments + /// + /// * `traces` - The traces to send + /// * `override_endpoint` - If `Some`, sends to this endpoint instead of the trace's + /// configured endpoint. Used for sending to additional endpoints. + /// * `http_client` - The HTTP client to use for sending + /// + /// # Returns + /// + /// Returns the traces back if there was an error sending them (for retry). + async fn send_traces( traces: Vec, - endpoint: Option<&Endpoint>, - proxy_https: &Option, - tls_cert_file: &Option, + override_endpoint: Option, + http_client: HyperClient, ) -> Option> { if traces.is_empty() { return None; @@ -185,20 +210,13 @@ impl TraceFlusher for ServerlessTraceFlusher { tokio::task::yield_now().await; debug!("TRACES | Flushing {} traces", coalesced_traces.len()); - let Ok(http_client) = - ServerlessTraceFlusher::get_http_client(proxy_https.as_ref(), tls_cert_file.as_ref()) - else { - error!("TRACES | Failed to create HTTP client"); - return None; - }; - for trace in &coalesced_traces { - let trace_with_endpoint = match endpoint { - Some(additional_endpoint) => trace.with_endpoint(additional_endpoint.clone()), + let trace_to_send = match &override_endpoint { + Some(endpoint) => trace.with_endpoint(endpoint.clone()), None => trace.clone(), }; - let send_result = trace_with_endpoint.send(&http_client).await.last_result; + let send_result = trace_to_send.send(&http_client).await.last_result; if let Err(e) = send_result { error!("TRACES | Request failed: {e:?}"); @@ -211,81 +229,3 @@ impl TraceFlusher for ServerlessTraceFlusher { None } } - -// Initialize the crypto provider needed for setting custom root certificates -fn ensure_crypto_provider_initialized() { - static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| { - #[cfg(unix)] - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .expect("Failed to install default CryptoProvider"); - }); - - let () = &*INIT_CRYPTO_PROVIDER; -} - -impl ServerlessTraceFlusher { - pub fn get_http_client( - proxy_https: Option<&String>, - tls_cert_file: Option<&String>, - ) -> Result< - GenericHttpClient>, - Box, - > { - // Create the base connector with optional custom TLS config - let connector = if let Some(ca_cert_path) = tls_cert_file { - // Ensure crypto provider is initialized before creating TLS config - ensure_crypto_provider_initialized(); - - // Load the custom certificate - let cert_file = File::open(ca_cert_path)?; - let mut reader = BufReader::new(cert_file); - let certs: Vec = - rustls_pemfile::certs(&mut reader).collect::, _>>()?; - - // Create a root certificate store and add custom certs - let mut root_store = RootCertStore::empty(); - for cert in certs { - root_store.add(cert)?; - } - - // Build the TLS config with custom root certificates - let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - - // Build the HTTPS connector with custom config - let https_connector = HttpsConnectorBuilder::new() - .with_tls_config(tls_config) - .https_or_http() - .enable_http1() - .build(); - - debug!( - "TRACES | GET_HTTP_CLIENT | Added root certificate from {}", - ca_cert_path - ); - - // Construct the Connector::Https variant directly - libdd_common::connector::Connector::Https(https_connector) - } else { - // Use default connector - libdd_common::connector::Connector::default() - }; - - if let Some(proxy) = proxy_https { - let proxy = - hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); - let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; - let client = hyper_migration::client_builder().build(proxy_connector); - debug!( - "TRACES | GET_HTTP_CLIENT | Proxy connector created with proxy: {:?}", - proxy_https - ); - Ok(client) - } else { - let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; - Ok(hyper_migration::client_builder().build(proxy_connector)) - } - } -}