diff --git a/datadog-opentelemetry/src/abandoned_traces.rs b/datadog-opentelemetry/src/abandoned_traces.rs new file mode 100644 index 00000000..1b787ef3 --- /dev/null +++ b/datadog-opentelemetry/src/abandoned_traces.rs @@ -0,0 +1,270 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span_processor::ShardedTraces; +use hashbrown::{hash_map::Entry, HashMap}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +struct TraceInfo { + open_span_names: HashMap, + start_ts: Instant, + open_spans: usize, +} + +pub struct OldTrace { + pub tid: u128, + pub open_span_names: HashMap, + pub age: Duration, + pub open_spans: usize, +} + +#[derive(Clone, Debug)] +/// This registry tracks the age and name of currently open traces +pub struct AbandonedTracesRegistry { + shards: ShardedTraces, +} + +impl AbandonedTracesRegistry { + pub fn new() -> Self { + Self { + shards: ShardedTraces::new(|_| InnerAbandonedTracesRegistry { + traces: HashMap::new(), + }), + } + } + pub fn register_root_span_sampling(&self, trace_id: [u8; 16], name: String) { + self.shards + .write_shard(trace_id) + .register_root_span_sampling(trace_id, name); + } + + pub fn register_local_root_span(&self, trace_id: [u8; 16]) { + self.shards + .write_shard(trace_id) + .register_root_span(trace_id); + } + + pub fn register_span_sampling(&self, trace_id: [u8; 16], name: String) { + self.shards + .write_shard(trace_id) + .register_span_sampling(trace_id, name); + } + + pub fn register_span(&self, trace_id: [u8; 16]) { + self.shards.write_shard(trace_id).register_span(trace_id); + } + + pub fn finish_span(&self, trace_id: [u8; 16], name: &str) { + self.shards + .write_shard(trace_id) + .finish_span(trace_id, name); + } + + pub fn iter_open_traces(&self) -> impl Iterator + use<'_> { + let now = Instant::now(); + self.shards.iter().flat_map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now; + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age: Duration = now.checked_duration_since(trace.start_ts)?; + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + open_span_names: trace.open_span_names.clone(), + age, + open_spans: trace.open_spans, + }) + }) + .collect::>() + }) + } + + pub fn iter_old_traces(&self, min_age: Duration) -> impl Iterator + use<'_> { + let now = Instant::now(); + self.shards.iter().flat_map(move |shard| { + let shard = shard + .read() + .expect("failed to lock the abandoned spans registry"); + let now = now; + shard + .traces + .iter() + .filter_map(|(tid, trace)| { + let age = now.checked_duration_since(trace.start_ts)?; + if age < min_age { + return None; + } + Some(OldTrace { + tid: u128::from_be_bytes(*tid), + open_span_names: trace.open_span_names.clone(), + age, + open_spans: trace.open_spans, + }) + }) + .collect::>() + }) + } +} + +#[derive(Debug)] +struct InnerAbandonedTracesRegistry { + traces: HashMap<[u8; 16], TraceInfo>, +} + +impl InnerAbandonedTracesRegistry { + fn register_root_span_sampling(&mut self, trace_id: [u8; 16], name: String) { + self.traces.entry(trace_id).or_insert_with(|| TraceInfo { + open_spans: 1, + open_span_names: HashMap::from_iter([(name, 1)]), + start_ts: Instant::now(), + }); + } + + fn register_root_span(&mut self, trace_id: [u8; 16]) { + let Entry::Vacant(e) = self.traces.entry(trace_id) else { + return; + }; + e.insert(TraceInfo { + open_spans: 1, + open_span_names: HashMap::new(), + start_ts: Instant::now(), + }); + } + + fn register_span_sampling(&mut self, trace_id: [u8; 16], name: String) { + let c = self + .traces + .entry(trace_id) + .or_insert(TraceInfo { + open_spans: 0, + start_ts: Instant::now(), + open_span_names: HashMap::new(), + }) + .open_span_names + .entry(name) + .or_default(); + *c += 1; + } + + fn register_span(&mut self, trace_id: [u8; 16]) { + self.traces + .entry(trace_id) + .or_insert(TraceInfo { + open_spans: 0, + open_span_names: HashMap::new(), + start_ts: Instant::now(), + }) + .open_spans += 1; + } + + fn finish_span(&mut self, trace_id: [u8; 16], name: &str) { + let Entry::Occupied(mut e) = self.traces.entry(trace_id) else { + return; + }; + let trace = e.get_mut(); + if *trace + .open_span_names + .entry_ref(name) + .and_modify(|c| *c = c.saturating_sub(1)) + .or_default() + == 0 + { + trace.open_span_names.remove(name); + }; + trace.open_spans -= 1; + if trace.open_spans == 0 { + e.remove(); + } + } +} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + + use hashbrown::HashSet; + + use crate::abandoned_traces::AbandonedTracesRegistry; + + fn active_traces(r: &AbandonedTracesRegistry) -> usize { + r.shards + .iter() + .map(|s| s.read().unwrap().traces.len()) + .sum::() + } + + #[test] + fn test_span_registration() { + let registry = AbandonedTracesRegistry::new(); + let trace_id = [1; 16]; + registry.register_root_span_sampling(trace_id, "root_span".to_owned()); + registry.register_local_root_span(trace_id); + for i in 0..16 { + registry.register_span(trace_id); + registry.finish_span(trace_id, &i.to_string()); + } + assert_eq!(active_traces(®istry), 1); + + registry.finish_span(trace_id, "root_span"); + + assert_eq!(active_traces(®istry), 0); + } + + #[test] + fn test_abandoned_spans() { + let registry = AbandonedTracesRegistry::new(); + for i in 1..=2 { + let trace_id = (i as u128).to_be_bytes(); + registry.register_root_span_sampling(trace_id, format!("root_span_{i}")); + registry.register_local_root_span(trace_id); + } + thread::sleep(Duration::from_millis(50)); + + let trace_id = 3_u128.to_be_bytes(); + registry.register_root_span_sampling(trace_id, format!("root_span_{}", 3)); + registry.register_local_root_span(trace_id); + + let collect_old_traces = || { + registry + .iter_old_traces(Duration::from_millis(10)) + .map(|t| { + ( + t.tid, + t.open_span_names + .iter() + .map(|(k, v)| (k.to_owned(), *v)) + .collect::>(), + t.open_spans, + ) + }) + .collect::>() + }; + + let old_traces = collect_old_traces(); + assert_eq!(active_traces(®istry), 3); + assert_eq!( + old_traces, + HashSet::from_iter([ + (1, vec![("root_span_1".to_owned(), 1)], 1), + (2, vec![("root_span_2".to_owned(), 1)], 1), + ]) + ); + + for i in 1..=2 { + let trace_id = (i as u128).to_be_bytes(); + registry.finish_span(trace_id, &format!("root_span_{}", 3)); + } + thread::sleep(Duration::from_millis(50)); + let old_traces = collect_old_traces(); + assert_eq!(active_traces(®istry), 1); + assert_eq!( + old_traces, + HashSet::from_iter([(3, vec![("root_span_3".to_owned(), 1)], 1),]) + ); + } +} diff --git a/datadog-opentelemetry/src/lib.rs b/datadog-opentelemetry/src/lib.rs index f4776960..f26c7746 100644 --- a/datadog-opentelemetry/src/lib.rs +++ b/datadog-opentelemetry/src/lib.rs @@ -63,6 +63,7 @@ //! .init(); //! ``` +mod abandoned_traces; mod ddtrace_transform; mod sampler; mod span_exporter; diff --git a/datadog-opentelemetry/src/sampler.rs b/datadog-opentelemetry/src/sampler.rs index e724c40d..6fa0da31 100644 --- a/datadog-opentelemetry/src/sampler.rs +++ b/datadog-opentelemetry/src/sampler.rs @@ -144,6 +144,7 @@ impl ShouldSample for Sampler { .register_local_root_trace_propagation_data( trace_id.to_bytes(), trace_propagation_data, + self.cfg.trace_debug_open_spans().then(|| name.to_string()), ) { RegisterTracePropagationResult::Existing(sampling_decision) => { return opentelemetry::trace::SamplingResult { @@ -164,6 +165,11 @@ impl ShouldSample for Sampler { } RegisterTracePropagationResult::New => {} } + } else { + self.trace_registry.register_span_sampling( + trace_id.to_bytes(), + self.cfg.trace_debug_open_spans().then(|| name.to_string()), + ) } opentelemetry::trace::SamplingResult { diff --git a/datadog-opentelemetry/src/span_exporter.rs b/datadog-opentelemetry/src/span_exporter.rs index 06367660..709a2b30 100644 --- a/datadog-opentelemetry/src/span_exporter.rs +++ b/datadog-opentelemetry/src/span_exporter.rs @@ -589,25 +589,23 @@ impl TraceExporterWorker { agent_response_handler: Option Fn(&'a str) + Send + Sync>>, ) -> TraceExporterHandle { - let handle = thread::spawn({ - move || { - let trace_exporter = match builder.build() { - Ok(exporter) => exporter, - Err(e) => { - return Err(e); - } - }; - let cached_config = CachedConfig::new(&cfg); - let task = Self { - trace_exporter, - cached_config, - rx, - otel_resource, - agent_response_handler, - }; - task.run() - } - }); + let handle = thread::spawn(dd_trace::log::with_local_logger(move || { + let trace_exporter = match builder.build() { + Ok(exporter) => exporter, + Err(e) => { + return Err(e); + } + }; + let cached_config = CachedConfig::new(&cfg); + let task = Self { + trace_exporter, + cached_config, + rx, + otel_resource, + agent_response_handler, + }; + task.run() + })); TraceExporterHandle { handle: Mutex::new(Some(handle)), } diff --git a/datadog-opentelemetry/src/span_processor.rs b/datadog-opentelemetry/src/span_processor.rs index c441d487..569c3bb8 100644 --- a/datadog-opentelemetry/src/span_processor.rs +++ b/datadog-opentelemetry/src/span_processor.rs @@ -5,8 +5,10 @@ use hashbrown::{hash_map, HashMap as BHashMap}; use std::{ collections::HashMap, fmt::Debug, + ops::DerefMut, str::FromStr, sync::{Arc, RwLock}, + time::Duration, }; use dd_trace::{ @@ -29,6 +31,7 @@ use opentelemetry_sdk::Resource; use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use crate::{ + abandoned_traces::{self, AbandonedTracesRegistry}, create_dd_resource, span_exporter::DatadogExporter, spans_metrics::{TelemetryMetricsCollector, TelemetryMetricsCollectorHandle}, @@ -41,7 +44,6 @@ struct Trace { /// Root span will always be the first span in this vector if it is present finished_spans: Vec, open_span_count: usize, - propagation_data: TracePropagationData, } @@ -247,6 +249,47 @@ impl InnerTraceRegistry { } } +#[derive(Debug)] +pub struct ShardedTraces { + shards: Arc<[CachePadded>; TRACE_REGISTRY_SHARDS]>, + hasher: foldhash::fast::RandomState, +} + +impl ShardedTraces { + pub fn new T>(mut f: F) -> Self { + Self { + shards: Arc::new(std::array::from_fn(|i| CachePadded(RwLock::new(f(i))))), + hasher: foldhash::fast::RandomState::default(), + } + } + + pub fn write_shard(&self, trace_id: [u8; 16]) -> impl DerefMut + use<'_, T> { + self.get_shard(trace_id) + .write() + .expect("Failed to acquire lock on trace registry") + } + + pub fn iter(&self) -> impl Iterator> { + self.shards.iter().map(|i| &i.0) + } + + fn get_shard(&self, trace_id: [u8; 16]) -> &RwLock { + use std::hash::BuildHasher; + let hash = self.hasher.hash_one(u128::from_ne_bytes(trace_id)); + let shard: usize = hash as usize % TRACE_REGISTRY_SHARDS; + &self.shards[shard].0 + } +} + +impl Clone for ShardedTraces { + fn clone(&self) -> Self { + Self { + shards: self.shards.clone(), + hasher: self.hasher, + } + } +} + const TRACE_REGISTRY_SHARDS: usize = 64; #[repr(align(128))] @@ -262,34 +305,24 @@ struct CachePadded(T); /// - The number of open spans in the trace /// - The sampling decision of the trace pub(crate) struct TraceRegistry { - // Example: - // inner: Arc<[CacheAligned>; N]>; - // to access a trace we do inner[hash(trace_id) % N].read() - inner: Arc<[CachePadded>; TRACE_REGISTRY_SHARDS]>, - hasher: foldhash::fast::RandomState, + shards: ShardedTraces, + abandoned_spans: Option, } impl TraceRegistry { pub fn new(config: Arc) -> Self { Self { - inner: Arc::new(std::array::from_fn(|_| { - CachePadded(RwLock::new(InnerTraceRegistry { - registry: BHashMap::new(), - metrics: TraceRegistryMetrics::default(), - config: config.clone(), - })) - })), - hasher: foldhash::fast::RandomState::default(), + shards: ShardedTraces::new(|_| InnerTraceRegistry { + registry: BHashMap::new(), + metrics: TraceRegistryMetrics::default(), + config: config.clone(), + }), + abandoned_spans: config + .trace_debug_open_spans() + .then(AbandonedTracesRegistry::new), } } - fn get_shard(&self, trace_id: [u8; 16]) -> &RwLock { - use std::hash::BuildHasher; - let hash = self.hasher.hash_one(u128::from_ne_bytes(trace_id)); - let shard = hash as usize % TRACE_REGISTRY_SHARDS; - &self.inner[shard].0 - } - /// Register the trace propagation data for a given trace ID /// This increases the open span count for the trace by 1, but does not set the root span ID. /// You will then need to call `register_local_root_span` to set the root span ID @@ -300,11 +333,14 @@ impl TraceRegistry { &self, trace_id: [u8; 16], propagation_data: TracePropagationData, + span_name: Option, ) -> RegisterTracePropagationResult { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + self.abandoned_spans.as_ref().and_then(|a| { + let _: () = a.register_root_span_sampling(trace_id, span_name?); + Some(()) + }); + + let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_trace_propagation_data(trace_id, propagation_data) } @@ -312,13 +348,20 @@ impl TraceRegistry { /// This will also increment the open span count for the trace. /// If the trace is already registered, it will ignore the new root span ID and log a warning. pub fn register_local_root_span(&self, trace_id: [u8; 16], root_span_id: [u8; 8]) { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + if let Some(a) = self.abandoned_spans.as_ref() { + a.register_local_root_span(trace_id) + } + + let mut inner = self.shards.write_shard(trace_id); inner.register_local_root_span(trace_id, root_span_id); } + pub fn register_span_sampling(&self, trace_id: [u8; 16], span_name: Option) { + self.abandoned_spans + .as_ref() + .and_then(|a| Some(a.register_span_sampling(trace_id, span_name?))); + } + /// Register a new span with the given trace ID and span ID. pub fn register_span( &self, @@ -326,10 +369,11 @@ impl TraceRegistry { span_id: [u8; 8], propagation_data: TracePropagationData, ) { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + if let Some(a) = self.abandoned_spans.as_ref() { + a.register_span(trace_id) + } + + let mut inner = self.shards.write_shard(trace_id); inner.register_span(trace_id, span_id, propagation_data); } @@ -337,26 +381,23 @@ impl TraceRegistry { /// If the trace is finished (i.e., all spans are finished), return the full trace chunk to /// flush fn finish_span(&self, trace_id: [u8; 16], span_data: SpanData) -> Option { - let mut inner = self - .get_shard(trace_id) - .write() - .expect("Failed to acquire lock on trace registry"); + if let Some(a) = self.abandoned_spans.as_ref() { + a.finish_span(trace_id, &span_data.name) + } + + let mut inner = self.shards.write_shard(trace_id); inner.finish_span(trace_id, span_data) } pub fn get_trace_propagation_data(&self, trace_id: [u8; 16]) -> TracePropagationData { - let inner = self - .get_shard(trace_id) - .read() - .expect("Failed to acquire lock on trace registry"); - + let inner = self.shards.write_shard(trace_id); inner.get_trace_propagation_data(trace_id).clone() } pub fn get_metrics(&self) -> TraceRegistryMetrics { let mut stats = TraceRegistryMetrics::default(); - for shard_idx in 0..TRACE_REGISTRY_SHARDS { - let mut shard = self.inner[shard_idx].0.write().unwrap(); + for shard in self.shards.iter() { + let mut shard = shard.write().unwrap(); let shard_stats = shard.get_metrics(); stats.spans_created += shard_stats.spans_created; stats.spans_finished += shard_stats.spans_finished; @@ -366,6 +407,21 @@ impl TraceRegistry { } stats } + + pub fn iter_old_traces( + &self, + min_age: Duration, + ) -> impl Iterator + use<'_> { + self.abandoned_spans + .iter() + .flat_map(move |a| a.iter_old_traces(min_age)) + } + + pub fn iter_lost_traces(&self) -> impl Iterator + use<'_> { + self.abandoned_spans + .iter() + .flat_map(move |a| a.iter_open_traces()) + } } #[derive(Default, Debug)] @@ -412,9 +468,15 @@ impl DatadogSpanProcessor { } else { None }; + let span_exporter = DatadogExporter::new(config.clone(), agent_response_handler); + let telemetry_metrics_handle = config.telemetry_enabled().then(|| { - TelemetryMetricsCollector::start(registry.clone(), span_exporter.queue_metrics()) + TelemetryMetricsCollector::start( + config.clone(), + registry.clone(), + span_exporter.queue_metrics(), + ) }); Self { @@ -868,6 +930,7 @@ mod tests { "foobar".to_string(), )])), }, + None, ); } tr @@ -956,7 +1019,7 @@ mod tests { let span_id = [1u8; 8]; // Register and finish a single span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -978,7 +1041,7 @@ mod tests { let child2_span_id = [3u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register child spans @@ -1019,7 +1082,11 @@ mod tests { let trace_id = [i; 16]; let span_id = [i; 8]; - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data( + trace_id, + EMPTY_PROPAGATION_DATA, + None, + ); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -1047,7 +1114,7 @@ mod tests { let child_span_id = [2u8; 8]; // Register root and child spans - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); registry.register_span(trace_id, child_span_id, EMPTY_PROPAGATION_DATA); @@ -1092,7 +1159,7 @@ mod tests { let span_id = [1u8; 8]; // Create and finish a trace - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); registry.finish_span(trace_id, span_data); @@ -1119,7 +1186,11 @@ mod tests { let trace_id = (i as u128).to_be_bytes(); let span_id = [i as u8; 8]; - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data( + trace_id, + EMPTY_PROPAGATION_DATA, + None, + ); registry.register_local_root_span(trace_id, span_id); let span_data = create_test_span_data(trace_id, span_id); @@ -1140,7 +1211,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 5 child spans @@ -1178,7 +1249,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register and finish more than default min_spans @@ -1217,7 +1288,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 15 child spans @@ -1266,7 +1337,7 @@ mod tests { let root_span_id = [1u8; 8]; // Register root span - registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA); + registry.register_local_root_trace_propagation_data(trace_id, EMPTY_PROPAGATION_DATA, None); registry.register_local_root_span(trace_id, root_span_id); // Register 20 child spans diff --git a/datadog-opentelemetry/src/spans_metrics.rs b/datadog-opentelemetry/src/spans_metrics.rs index c5c7a50a..d2493cd6 100644 --- a/datadog-opentelemetry/src/spans_metrics.rs +++ b/datadog-opentelemetry/src/spans_metrics.rs @@ -1,13 +1,17 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::{sync::Arc, time::Duration}; +use std::{fmt, sync::Arc, time::Duration}; -use dd_trace::utils::{ShutdownSignaler, WorkerError, WorkerHandle}; +use dd_trace::{ + utils::{ShutdownSignaler, WorkerError, WorkerHandle}, + Config, +}; use crate::{span_exporter::QueueMetricsFetcher, TraceRegistry}; pub struct TelemetryMetricsCollector { + config: Arc, registry: TraceRegistry, exporter_queue_metrics: QueueMetricsFetcher, shutdown_rx: std::sync::mpsc::Receiver<()>, @@ -37,18 +41,20 @@ impl Drop for TelemetryMetricsCollector { impl TelemetryMetricsCollector { pub fn start( + config: Arc, registry: TraceRegistry, exporter_queue_metrics: QueueMetricsFetcher, ) -> TelemetryMetricsCollectorHandle { let (shutdown_tx, shutdown_rx) = std::sync::mpsc::sync_channel(1); let shutdown_finished = ShutdownSignaler::new(); let worker = Self { + config, registry, shutdown_rx, shutdown_finished: shutdown_finished.clone(), exporter_queue_metrics, }; - let handle = std::thread::spawn(|| worker.run()); + let handle = std::thread::spawn(dd_trace::log::with_local_logger(|| worker.run())); TelemetryMetricsCollectorHandle { shutdown_tx, worker_handle: WorkerHandle::new(shutdown_finished, handle), @@ -56,12 +62,55 @@ impl TelemetryMetricsCollector { } fn run(mut self) { + let interval; + #[cfg(feature = "test-utils")] + { + interval = self.config.__internal_span_metrics_interval(); + } + #[cfg(not(feature = "test-utils"))] + { + interval = Duration::from_secs(10); + } + #[allow(clippy::while_let_loop)] loop { - match self.shutdown_rx.recv_timeout(Duration::from_secs(10)) { + match self.shutdown_rx.recv_timeout(interval) { Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) | Ok(()) => return, + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) | Ok(()) => break, }; self.emit_metrics(); + if self.config.trace_debug_open_spans() { + self.warn_maybe_abandoned_traces(); + } + } + if self.config.trace_debug_open_spans() { + self.warn_shutdown_abandoned_traces(); + } + } + + fn warn_shutdown_abandoned_traces(&self) { + for t in self.registry.iter_lost_traces().take(100) { + // Log at most 100 traces + dd_trace::dd_warn!( + "lost trace not finished during shutdown trace_id={} age={}ms open_spans={} open_span_names={} ", + t.tid, + t.age.as_millis(), + t.open_spans, + SpanNamesDisplay(&t.open_span_names), + ) + } + } + + fn warn_maybe_abandoned_traces(&self) { + let min_age = self.config.trace_debug_open_spans_timeout(); + // Log at most 100 traces + for t in self.registry.iter_old_traces(min_age).take(100) { + dd_trace::dd_warn!( + "possibly abandoned trace trace_id={} age={}ms open_spans={} open_span_names={} ", + t.tid, + t.age.as_millis(), + t.open_spans, + SpanNamesDisplay(&t.open_span_names), + ) } } @@ -96,3 +145,15 @@ impl TelemetryMetricsCollector { ]); } } + +struct SpanNamesDisplay<'a>(&'a hashbrown::HashMap); + +impl fmt::Display for SpanNamesDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + for (k, v) in self.0.iter() { + write!(f, "({},{}),", k, v)?; + } + write!(f, "]") + } +} diff --git a/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs b/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs index db294b3a..a802ff4a 100644 --- a/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/opentelemetry_api.rs @@ -1,14 +1,16 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::thread; +use std::time::Duration; use std::{collections::HashMap, ops::Deref, sync::Arc}; use datadog_opentelemetry::make_test_tracer; use dd_trace::configuration::{SamplingRuleConfig, TracePropagationStyle}; use opentelemetry::global::ObjectSafeSpan; use opentelemetry::trace::{ - SamplingDecision, SamplingResult, SpanBuilder, TraceContextExt, TraceState, Tracer, - TracerProvider, + mark_span_as_active, SamplingDecision, SamplingResult, SpanBuilder, TraceContextExt, + TraceState, Tracer, TracerProvider, }; use opentelemetry::Context; @@ -306,3 +308,52 @@ async fn test_tracing_disabled() { }) .await } + +#[tokio::test] +async fn test_debug_open_spans() { + const SESSION_NAME: &str = "opentelemetry_api/test_debug_open_spans"; + let mut cfg = dd_trace::Config::builder(); + cfg.set_log_level_filter(dd_trace::log::LevelFilter::Debug) + .set_trace_debug_open_spans(true) + .set_trace_debug_open_spans_timeout(Duration::from_millis(1)) + .__internal_set_span_metrics_interval(Duration::from_millis(100)); + let _logger_guard = dd_trace::log::test_logger::activate_test_logger(); + with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { + let tracer = tracer_provider.tracer("test_debug_open_spans"); + let _child_span_1; + let child_span_2; + + // leak a span + { + let _root = mark_span_as_active(tracer.start("root_span")); + _child_span_1 = tracer.start("child_span"); + child_span_2 = tracer.start("child_span"); + } + thread::sleep(Duration::from_millis(300)); + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("possibly abandoned trace") + && msg.contains("open_span_names=") + && msg.contains("[(child_span,2),]") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()); + std::mem::forget(child_span_2); + }) + .await; + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("lost trace not finished during shutdown") + && msg.contains("[(child_span,1),]") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()) +} diff --git a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs index cd65b15a..feaf3a7c 100644 --- a/datadog-opentelemetry/tests/integration_tests/tracing_api.rs +++ b/datadog-opentelemetry/tests/integration_tests/tracing_api.rs @@ -1,7 +1,7 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; +use std::{collections::HashMap, thread, time::Duration}; use opentelemetry::{ trace::{TraceContextExt, TracerProvider}, @@ -25,7 +25,7 @@ async fn test_smoke() { let span = tracing::trace_span!("test_span", _sampling_priority_v1 = 2); span.in_scope(|| { { - tracing::trace_span!("child_span_1") + tracing::trace_span!("child_span_1"); }; { tracing::trace_span!("child_span_2") @@ -85,3 +85,61 @@ async fn test_remote_span_extraction_propagation() { }) .await; } + +#[tokio::test] +async fn test_debug_open_spans() { + const SESSION_NAME: &str = "tracing_api/test_debug_open_spans"; + let mut cfg = dd_trace::Config::builder(); + cfg.set_log_level_filter(dd_trace::log::LevelFilter::Debug) + .set_trace_debug_open_spans(true) + .set_trace_debug_open_spans_timeout(Duration::from_millis(1)) + .__internal_set_span_metrics_interval(Duration::from_millis(100)); + let _logger_guard = dd_trace::log::test_logger::activate_test_logger(); + with_test_agent_session(SESSION_NAME, cfg, |_, tracer_provider, _, _| { + let subscriber = tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with( + tracing_opentelemetry::layer() + .with_context_activation(true) + .with_tracer(tracer_provider.tracer("test")), + ); + let _guard = subscriber.set_default(); + let _child_span_1; + let child_span_2; + + // leak a span + { + let _root_span = tracing::trace_span!("root_span").entered(); + _child_span_1 = tracing::trace_span!("child_span").entered().exit(); + child_span_2 = tracing::trace_span!("child_span").entered().exit(); + } + thread::sleep(Duration::from_millis(300)); + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("possibly abandoned trace") + && msg.contains("open_span_names=") + && msg.contains("(root_span,1)") + && msg.contains("(child_span,2)") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()); + std::mem::forget(child_span_2); + }) + .await; + + let test_logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + let abandoned_logs = test_logs + .iter() + .filter(|(lvl, msg)| { + *lvl == dd_trace::log::Level::Warn + && msg.contains("lost trace not finished during shutdown") + && msg.contains("(root_span,1)") + && msg.contains("(child_span,1)") + }) + .collect::>(); + assert!(!abandoned_logs.is_empty()) +} diff --git a/dd-trace-propagation/benches/inject_benchmark.rs b/dd-trace-propagation/benches/inject_benchmark.rs index 28bf17d0..f964fda3 100644 --- a/dd-trace-propagation/benches/inject_benchmark.rs +++ b/dd-trace-propagation/benches/inject_benchmark.rs @@ -101,7 +101,7 @@ fn bench_datadog_only_inject> { } } -impl_config_value_provider!(simple: Cow<'static, str>, bool, u32, usize, i32, f64, ServiceName, LevelFilter, ParsedSamplingRules); +impl_config_value_provider!(simple: Cow<'static, str>, bool, u32, u64, usize, i32, f64, ServiceName, LevelFilter, ParsedSamplingRules); impl_config_value_provider!(option: String); +impl ConfigurationValueProvider for Duration { + fn get_configuration_value(&self) -> String { + self.as_secs_f64().to_string() + } +} + #[derive(Clone)] #[non_exhaustive] /// Configuration for the Datadog Tracer @@ -811,6 +818,7 @@ pub struct Config { /// Configurations for testing. Not exposed to customer #[cfg(feature = "test-utils")] wait_agent_info_ready: bool, + span_metrics_interval: Duration, // # Telemetry configuration /// Disables telemetry if false @@ -824,6 +832,10 @@ pub struct Config { trace_partial_flush_enabled: ConfigItem, trace_partial_flush_min_spans: ConfigItem, + /// Debug potentially abandoned spans + trace_debug_open_spans: ConfigItem, + trace_debug_open_spans_timeout: ConfigItem, + /// Trace propagation configuration trace_propagation_style: ConfigItem>>, trace_propagation_style_extract: ConfigItem>>, @@ -986,6 +998,15 @@ impl Config { default.telemetry_heartbeat_interval, |interval: f64| interval.abs(), ), + trace_debug_open_spans: cisu.update_parsed( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, + default.trace_debug_open_spans, + ), + trace_debug_open_spans_timeout: cisu.update_parsed_with_transform( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, + default.trace_debug_open_spans_timeout, + |val: u64| Duration::from_secs(val.max(1)), + ), trace_propagation_style: cisu.update_parsed_with_transform( SupportedConfigurations::DD_TRACE_PROPAGATION_STYLE, default.trace_propagation_style, @@ -1005,8 +1026,6 @@ impl Config { SupportedConfigurations::DD_TRACE_PROPAGATION_EXTRACT_FIRST, default.trace_propagation_extract_first, ), - #[cfg(feature = "test-utils")] - wait_agent_info_ready: default.wait_agent_info_ready, extra_services_tracker: ExtraServicesTracker::new(), remote_config_enabled: cisu.update_parsed( SupportedConfigurations::DD_REMOTE_CONFIGURATION_ENABLED, @@ -1023,6 +1042,11 @@ impl Config { default.datadog_tags_max_length, |max: usize| max.min(DATADOG_TAGS_MAX_LENGTH), ), + + // Test only configs + #[cfg(feature = "test-utils")] + wait_agent_info_ready: default.wait_agent_info_ready, + span_metrics_interval: default.span_metrics_interval, } } @@ -1127,6 +1151,14 @@ impl Config { self.dogstatsd_agent_url.value() } + pub fn trace_debug_open_spans(&self) -> bool { + *self.trace_debug_open_spans.value() + } + + pub fn trace_debug_open_spans_timeout(&self) -> Duration { + *self.trace_debug_open_spans_timeout.value() + } + pub fn trace_sampling_rules(&self) -> impl Deref + use<'_> { self.trace_sampling_rules.value() } @@ -1147,11 +1179,6 @@ impl Config { *self.trace_stats_computation_enabled.value() } - #[cfg(feature = "test-utils")] - pub fn __internal_wait_agent_info_ready(&self) -> bool { - self.wait_agent_info_ready - } - /// Static runtime id if the process fn process_runtime_id() -> &'static str { // TODO(paullgdc): Regenerate on fork? Would we even support forks? @@ -1308,6 +1335,17 @@ impl Config { pub fn datadog_tags_max_length(&self) -> usize { *self.datadog_tags_max_length.value() } + + // Test only configs + #[cfg(feature = "test-utils")] + pub fn __internal_wait_agent_info_ready(&self) -> bool { + self.wait_agent_info_ready + } + + #[cfg(feature = "test-utils")] + pub fn __internal_span_metrics_interval(&self) -> Duration { + self.span_metrics_interval + } } impl std::fmt::Debug for Config { @@ -1326,6 +1364,11 @@ impl std::fmt::Debug for Config { .field("trace_rate_limit", &self.trace_rate_limit) .field("enabled", &self.enabled) .field("log_level_filter", &self.log_level_filter) + .field("trace_debug_open_spans", &self.trace_debug_open_spans) + .field( + "trace_debug_open_spans_timeout_secs", + &self.trace_debug_open_spans_timeout, + ) .field( "trace_stats_computation_enabled", &self.trace_stats_computation_enabled, @@ -1384,6 +1427,14 @@ fn default_config() -> Config { SupportedConfigurations::DD_DOGSTATSD_URL, Cow::Borrowed(""), ), + trace_debug_open_spans: ConfigItem::new( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS, + false, + ), + trace_debug_open_spans_timeout: ConfigItem::new( + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, + Duration::from_secs(60), + ), trace_sampling_rules: ConfigItemWithOverride::new_rc( SupportedConfigurations::DD_TRACE_SAMPLING_RULES, ParsedSamplingRules::default(), // Empty rules by default @@ -1401,9 +1452,6 @@ fn default_config() -> Config { SupportedConfigurations::DD_TRACE_STATS_COMPUTATION_ENABLED, true, ), - #[cfg(feature = "test-utils")] - wait_agent_info_ready: false, - telemetry_enabled: ConfigItem::new( SupportedConfigurations::DD_INSTRUMENTATION_TELEMETRY_ENABLED, true, @@ -1457,6 +1505,11 @@ fn default_config() -> Config { SupportedConfigurations::DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH, DATADOG_TAGS_MAX_LENGTH, ), + + // Test only configs + #[cfg(feature = "test-utils")] + wait_agent_info_ready: false, + span_metrics_interval: Duration::from_secs(10), } } @@ -1570,6 +1623,18 @@ impl ConfigBuilder { self } + pub fn set_trace_debug_open_spans(&mut self, enabled: bool) -> &mut Self { + self.config.trace_debug_open_spans.set_code(enabled); + self + } + + pub fn set_trace_debug_open_spans_timeout(&mut self, timeout: Duration) -> &mut Self { + self.config + .trace_debug_open_spans_timeout + .set_code(timeout.max(Duration::from_millis(1))); + self + } + pub fn set_trace_partial_flush_enabled(&mut self, enabled: bool) -> &mut Self { self.config.trace_partial_flush_enabled.set_code(enabled); self @@ -1664,7 +1729,10 @@ impl ConfigBuilder { } #[cfg(feature = "test-utils")] - pub fn set_datadog_tags_max_length_with_no_limit(&mut self, length: usize) -> &mut Self { + pub fn __internal_set_datadog_tags_max_length_with_no_limit( + &mut self, + length: usize, + ) -> &mut Self { self.config.datadog_tags_max_length.set_code(length); self } @@ -1677,6 +1745,12 @@ impl ConfigBuilder { self.config.wait_agent_info_ready = wait_agent_info_ready; self } + + #[cfg(feature = "test-utils")] + pub fn __internal_set_span_metrics_interval(&mut self, interval: Duration) -> &mut Self { + self.config.span_metrics_interval = interval; + self + } } #[cfg(test)] diff --git a/dd-trace/src/configuration/remote_config.rs b/dd-trace/src/configuration/remote_config.rs index 15d24532..0004563b 100644 --- a/dd-trace/src/configuration/remote_config.rs +++ b/dd-trace/src/configuration/remote_config.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::configuration::Config; +use crate::log; use crate::utils::{ShutdownSignaler, WorkerHandle}; use anyhow::Result; @@ -330,7 +331,7 @@ impl RemoteConfigClientWorker { client: RemoteConfigClient::new(config)?, shutdown_receiver, }; - let join_handle = thread::spawn(move || worker.run()); + let join_handle = thread::spawn(log::with_local_logger(move || worker.run())); Ok(RemoteConfigClientHandle { cancel_token, worker_handle: WorkerHandle::new(shutdown_finished, join_handle), diff --git a/dd-trace/src/configuration/supported_configurations.rs b/dd-trace/src/configuration/supported_configurations.rs index 6bd12191..1a10b160 100644 --- a/dd-trace/src/configuration/supported_configurations.rs +++ b/dd-trace/src/configuration/supported_configurations.rs @@ -23,6 +23,8 @@ pub(crate) enum SupportedConfigurations { DD_TELEMETRY_LOG_COLLECTION_ENABLED, DD_TRACE_AGENT_PORT, DD_TRACE_AGENT_URL, + DD_TRACE_DEBUG_OPEN_SPANS, + DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT, DD_TRACE_ENABLED, DD_TRACE_PARTIAL_FLUSH_ENABLED, DD_TRACE_PARTIAL_FLUSH_MIN_SPANS, @@ -79,6 +81,10 @@ impl SupportedConfigurations { } SupportedConfigurations::DD_TRACE_AGENT_PORT => "DD_TRACE_AGENT_PORT", SupportedConfigurations::DD_TRACE_AGENT_URL => "DD_TRACE_AGENT_URL", + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS => "DD_TRACE_DEBUG_OPEN_SPANS", + SupportedConfigurations::DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT => { + "DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT" + } SupportedConfigurations::DD_TRACE_ENABLED => "DD_TRACE_ENABLED", SupportedConfigurations::DD_TRACE_PARTIAL_FLUSH_ENABLED => { "DD_TRACE_PARTIAL_FLUSH_ENABLED" diff --git a/dd-trace/src/log.rs b/dd-trace/src/log.rs index 3bfd62e2..721bd890 100644 --- a/dd-trace/src/log.rs +++ b/dd-trace/src/log.rs @@ -66,7 +66,7 @@ impl Display for LevelFilter { } #[repr(usize)] -#[derive(Copy, Debug, Hash)] +#[derive(Copy, Debug, Hash, PartialEq)] pub enum Level { Error = 1, // this value must match with LogLevelFilter::Error Warn, @@ -128,6 +128,100 @@ impl PartialOrd for Level { } } +#[cfg(feature = "test-utils")] +pub mod test_logger { + //! Implements a thread local, overridable logger + //! + //! Tests can locally intercept logs by calling to `activate_test_logger` + //! + //! ```no_run + //! let _log_guard = dd_trace::log::test_logger::activate_test_logger; + //! (); + //! // whatever is logged by the dd_(level)! macros will be stored + //! dd_trace::dd_debug!("my log"); + //! let logs = dd_trace::log::test_logger::take_test_logs().unwrap(); + //! // logs should contain (Debug, "my log") + //! + //! // to see logs in threads spawned from the test, the function passed to spawn + //! // should be wrapped by `with_local_logger` + //! std::thread::spawn(dd_trace::log::with_local_logger(|| { + //! dd_trace::dd_debug!("my log"); + //! })) + //! .join(); + //! ``` + use std::{cell::RefCell, sync::Arc}; + + #[derive(Default)] + struct TestLogger(std::sync::Mutex>); + + pub fn print_log( + lvl: crate::log::Level, + log: std::fmt::Arguments, + _file: &str, + _line: u32, + _template: Option<&str>, + ) { + let _ = LOCAL_LOGGER.try_with(|l| { + if let Some(l) = &*l.borrow() { + l.0.lock().unwrap().push((lvl, log.to_string())) + } + }); + } + + thread_local! { + static LOCAL_LOGGER: RefCell>> = const { RefCell::new(None) }; + } + + pub fn with_local_logger R, R>(f: F) -> impl FnOnce() -> R { + let logger = LOCAL_LOGGER.try_with(|l| l.borrow().clone()).ok().flatten(); + move || { + let _guard = LoggerGuard { + prev: LOCAL_LOGGER.replace(logger), + }; + f() + } + } + + pub struct LoggerGuard { + prev: Option>, + } + + impl Drop for LoggerGuard { + fn drop(&mut self) { + LOCAL_LOGGER.set(self.prev.take()); + } + } + + pub fn activate_test_logger() -> LoggerGuard { + let prev = LOCAL_LOGGER.replace(Some(Arc::new(TestLogger::default()))); + LoggerGuard { prev } + } + + pub fn take_test_logs() -> Option> { + use std::ops::DerefMut; + + LOCAL_LOGGER + .try_with(|l| { + l.borrow() + .as_deref() + .map(|l| std::mem::take(l.0.lock().unwrap().deref_mut())) + }) + .ok() + .flatten() + } +} + +pub fn with_local_logger R, R>(f: F) -> impl FnOnce() -> R { + #[cfg(feature = "test-utils")] + { + test_logger::with_local_logger(f) + } + #[cfg(not(feature = "test-utils"))] + { + f + } +} + pub fn print_log( lvl: crate::log::Level, log: fmt::Arguments, @@ -190,23 +284,21 @@ macro_rules! dd_log { let loc = std::panic::Location::caller(); $crate::log::print_log(lvl, format_args!($first, $($rest)*), loc.file(), loc.line(), Some($first)); } + #[cfg(feature = "test-utils")] + { + let loc = std::panic::Location::caller(); + $crate::log::test_logger::print_log(lvl, format_args!($first, $($rest)*), loc.file(), loc.line(), Some($first)) + } }}; ($lvl:expr, $first:expr) => { - let lvl = $lvl; - if lvl <= $crate::log::max_level() { - let loc = std::panic::Location::caller(); - $crate::log::print_log(lvl, format_args!($first), loc.file(), loc.line(), Some($first)); - } + $crate::dd_log!($lvl, $first,) }; } #[cfg(test)] mod tests { - use crate::{ - log::LevelFilter, - log::{max_level, set_max_level, Level}, - }; + use crate::log::{max_level, set_max_level, test_logger, Level, LevelFilter}; #[test] fn test_default_max_level() { @@ -245,4 +337,23 @@ mod tests { } } } + + #[test] + fn test_test_logger() { + let _g = test_logger::activate_test_logger(); + dd_debug!("debug log {}", "foo"); + std::thread::spawn(test_logger::with_local_logger(|| { + dd_warn!("debug log {}", "bar"); + })) + .join() + .unwrap(); + let test_logs = test_logger::take_test_logs().unwrap(); + assert_eq!( + &test_logs, + &[ + (Level::Debug, "debug log foo".into()), + (Level::Warn, "debug log bar".into()) + ] + ); + } } diff --git a/supported-configurations.json b/supported-configurations.json index f48403df..07022b96 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -123,6 +123,22 @@ "propertyKeys": ["trace_agent_url"] } ], + "DD_TRACE_DEBUG_OPEN_SPANS": [ + { + "version": "A", + "type": "boolean", + "default_value": "false", + "propertyKeys": ["trace_debug_open_spans"] + } + ], + "DD_TRACE_DEBUG_OPEN_SPANS_TIMEOUT": [ + { + "version": "A", + "type": "seconds", + "default_value": "60", + "propertyKeys": ["trace_debug_open_spans_timeout"] + } + ], "DD_TRACE_ENABLED": [ { "version": "A",