Skip to content

Commit 55659d9

Browse files
lym953claude
andauthored
feat(traces): [SVLS-8734] respect Datadog-Client-Computed-Stats header (#1118)
## Background traces/stats that go through Lambda extension can generate stats in three ways: 1. tracer generates stats and sends them to extension at `/v0.6/stats`. Only works if the env var `DD_TRACE_STATS_COMPUTATION_ENABLED` is `true`. 2. extension generates stats from traces. It's off by default, and works only if env var `DD_COMPUTE_TRACE_STATS_ON_EXTENSION` is true. 3. Datadog backend generates stats from traces. Works only if the trace tag `_dd.compute_stats` is `1`. A separate thing: the Go tracer sends a dummy `aws.lambda` span to the extension, just to send metadata. The extension extracts metadata from this span received, and sets it on the real `aws.lambda` span generated on the extension side. The latter is what's finally sent to the trace intake. ## Summary If a trace has the header `Datadog-Client-Computed-Stats` with a truthy value (which means 1 is on), then turn off 2 and 3 to avoid duplicate counts, i.e. 1. make extension not generate stats 2. set trace tag `_dd.compute_stats` to 0 As a special case, if the dummy `aws.lambda` span sent by the Go tracer has a `Datadog-Client-Computed-Stats` header with truthy value, then extension propagates this value to the `aws.lambda` span generated by the extension. The `_dd.compute_stats` tag is determined by the combination of `compute_trace_stats_on_extension` and `client_computed_stats`: | Input: `compute_trace_stats_on_extension` | Input: `client_computed_stats` | Expected: `_dd.compute_stats` | Expected: Extension generates stats? | |-------------------------------------------|--------------------------------|-------------------------------|--------------------------------------| | `false` | `false` | `"1"` | No | | `false` | `true` | `"0"` | No | | `true` | `false` | `"0"` | Yes | | `true` | `true` | `"0"` | No | ## Details - `_dd.compute_stats` is now always set in `process_traces()` (centralized from `tags_from_env()` in `tags.rs`), with the value determined by both `compute_trace_stats_on_extension` and `client_computed_stats` as shown in the table above - When `client_computed_stats` is true or `compute_trace_stats_on_extension` is true, the extension skips stats generation in `send_processed_traces()` to avoid double-counting - `COMPUTE_STATS_KEY` is `pub(crate)` in `tags.rs` so it can be referenced from `trace_processor` - Propagates `client_computed_stats` to the extension-generated `aws.lambda` invocation span: when the Go tracer sends its placeholder span (`dd-tracer-serverless-span`) with `Datadog-Client-Computed-Stats: t`, that flag is stored in the invocation `Context` and passed through to `send_spans`, so the extension also skips stats generation for the `aws.lambda` span it creates ## Test plan ### Automated tests Passed the new unit tests ### Manual tests #### Steps 1. Deploy a Lambda function that uses `github.com/DataDog/dd-trace-go/v2 v2.6.0` 2. Set env var: 1. `DD_TRACE_STATS_COMPUTATION_ENABLED`: `true`, which turns on 1 2. `DD_COMPUTE_TRACE_STATS_ON_EXTENSION`: `true`, which tries to turn on 2 #### Result Before: 2 hits (duplicate): 1. one with `resource_name=dd-tracer-serverless-span`, generated by Go tracer 1. one with `resource_name=yiming2-xxx` <img width="699" height="286" alt="image" src="https://github.com/user-attachments/assets/6545cf12-c90b-4ed1-832e-1dab7b6c4d7c" /> After: 1 hit. Only the one with `resource_name=dd-tracer-serverless-span` <img width="696" height="283" alt="image" src="https://github.com/user-attachments/assets/d2e49c3a-473c-4043-a952-9a555eada0e7" /> 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f7cc7a0 commit 55659d9

File tree

6 files changed

+316
-24
lines changed

6 files changed

+316
-24
lines changed

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ pub struct Context {
4343
/// tracing.
4444
///
4545
pub extracted_span_context: Option<SpanContext>,
46+
/// Whether the tracer has already generated stats for the dummy `aws.lambda` span
47+
/// for this invocation (which only exists for universal instrumentation
48+
/// languages).
49+
///
50+
/// Set from the `Datadog-Client-Computed-Stats` header when the tracer's
51+
/// dummy `aws.lambda` span (with `resource_name="dd-tracer-serverless-span"`) is received.
52+
/// The value will be propagated to the same field of the real `aws.lambda` span generated by
53+
/// the extension, which will be used to decide whether stats should be generated on extension
54+
/// or backend.
55+
pub client_computed_stats: bool,
4656
}
4757

4858
/// Struct containing the information needed to reparent a span.
@@ -94,6 +104,7 @@ impl Default for Context {
94104
snapstart_restore_span: None,
95105
tracer_span: None,
96106
extracted_span_context: None,
107+
client_computed_stats: false,
97108
}
98109
}
99110
}
@@ -508,7 +519,12 @@ impl ContextBuffer {
508519

509520
/// Adds the tracer span to a `Context` in the buffer.
510521
///
511-
pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) {
522+
pub fn add_tracer_span(
523+
&mut self,
524+
request_id: &String,
525+
tracer_span: &Span,
526+
client_computed_stats: bool,
527+
) {
512528
if let Some(context) = self
513529
.buffer
514530
.iter_mut()
@@ -528,6 +544,7 @@ impl ContextBuffer {
528544
.extend(tracer_span.metrics.clone());
529545

530546
context.tracer_span = Some(tracer_span.clone());
547+
context.client_computed_stats = client_computed_stats;
531548
} else {
532549
debug!("Could not add tracer span - context not found");
533550
}

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -610,9 +610,16 @@ impl Processor {
610610
trace_sender: &Arc<SendingTraceProcessor>,
611611
context: Context,
612612
) {
613+
let client_computed_stats = context.client_computed_stats;
613614
let (traces, body_size) = self.get_ctx_spans(context);
614-
self.send_spans(traces, body_size, tags_provider, trace_sender)
615-
.await;
615+
self.send_spans(
616+
traces,
617+
body_size,
618+
client_computed_stats,
619+
tags_provider,
620+
trace_sender,
621+
)
622+
.await;
616623
}
617624

618625
fn get_ctx_spans(&mut self, context: Context) -> (Vec<Span>, usize) {
@@ -677,7 +684,7 @@ impl Processor {
677684
let traces = vec![cold_start_span.clone()];
678685
let body_size = size_of_val(cold_start_span);
679686

680-
self.send_spans(traces, body_size, tags_provider, trace_sender)
687+
self.send_spans(traces, body_size, false, tags_provider, trace_sender)
681688
.await;
682689
}
683690
}
@@ -689,6 +696,7 @@ impl Processor {
689696
&mut self,
690697
traces: Vec<Span>,
691698
body_size: usize,
699+
client_computed_stats: bool,
692700
tags_provider: &Arc<provider::Provider>,
693701
trace_sender: &Arc<SendingTraceProcessor>,
694702
) {
@@ -701,7 +709,7 @@ impl Processor {
701709
tracer_version: "",
702710
container_id: "",
703711
client_computed_top_level: false,
704-
client_computed_stats: false,
712+
client_computed_stats,
705713
dropped_p0_traces: 0,
706714
dropped_p0_spans: 0,
707715
};
@@ -1337,9 +1345,10 @@ impl Processor {
13371345
///
13381346
/// This is used to enrich the invocation span with additional metadata from the tracers
13391347
/// top level span, since we discard the tracer span when we create the invocation span.
1340-
pub fn add_tracer_span(&mut self, span: &Span) {
1348+
pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) {
13411349
if let Some(request_id) = span.meta.get("request_id") {
1342-
self.context_buffer.add_tracer_span(request_id, span);
1350+
self.context_buffer
1351+
.add_tracer_span(request_id, span, client_computed_stats);
13431352
}
13441353
}
13451354
}
@@ -2110,4 +2119,94 @@ mod tests {
21102119
"Should return None when no trace context found"
21112120
);
21122121
}
2122+
2123+
/// Verifies that `client_computed_stats` set on a context via `add_tracer_span` is
2124+
/// propagated all the way through `send_ctx_spans` to the `aws.lambda` payload sent
2125+
/// to the backend, so the extension does not generate duplicate stats.
2126+
#[tokio::test]
2127+
#[allow(clippy::unwrap_used)]
2128+
async fn test_client_computed_stats_propagated_to_aws_lambda_span() {
2129+
use crate::traces::stats_concentrator_service::StatsConcentratorService;
2130+
use crate::traces::stats_generator::StatsGenerator;
2131+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
2132+
use tokio::sync::mpsc;
2133+
2134+
let config = Arc::new(config::Config {
2135+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
2136+
..config::Config::default()
2137+
});
2138+
let tags_provider = Arc::new(provider::Provider::new(
2139+
Arc::clone(&config),
2140+
LAMBDA_RUNTIME_SLUG.to_string(),
2141+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
2142+
));
2143+
let aws_config = Arc::new(AwsConfig {
2144+
region: "us-east-1".into(),
2145+
aws_lwa_proxy_lambda_runtime_api: None,
2146+
function_name: "test-function".into(),
2147+
sandbox_init_time: Instant::now(),
2148+
runtime_api: "***".into(),
2149+
exec_wrapper: None,
2150+
initialization_type: "on-demand".into(),
2151+
});
2152+
let (aggregator_service, aggregator_handle) =
2153+
AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service");
2154+
tokio::spawn(aggregator_service.run());
2155+
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
2156+
let mut p = Processor::new(
2157+
Arc::clone(&tags_provider),
2158+
Arc::clone(&config),
2159+
aws_config,
2160+
aggregator_handle,
2161+
propagator,
2162+
);
2163+
2164+
let (trace_tx, mut trace_rx) = mpsc::channel(10);
2165+
let (stats_concentrator_service, stats_concentrator_handle) =
2166+
StatsConcentratorService::new(Arc::clone(&config));
2167+
tokio::spawn(stats_concentrator_service.run());
2168+
let trace_sender = Arc::new(SendingTraceProcessor {
2169+
appsec: None,
2170+
processor: Arc::new(trace_processor::ServerlessTraceProcessor {
2171+
obfuscation_config: Arc::new(
2172+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
2173+
),
2174+
}),
2175+
trace_tx,
2176+
stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)),
2177+
});
2178+
2179+
let mut context = Context::from_request_id("req-1");
2180+
context.invocation_span.trace_id = 1;
2181+
context.invocation_span.span_id = 2;
2182+
context.client_computed_stats = true;
2183+
2184+
p.send_ctx_spans(&tags_provider, &trace_sender, context)
2185+
.await;
2186+
2187+
let payload = trace_rx
2188+
.recv()
2189+
.await
2190+
.expect("expected payload from trace_tx");
2191+
assert!(
2192+
payload.header_tags.client_computed_stats,
2193+
"client_computed_stats must be propagated to the aws.lambda span payload"
2194+
);
2195+
2196+
// Verify _dd.compute_stats is "0" in the built payload tags: client_computed_stats=true
2197+
// means the tracer has already computed stats, so neither extension nor backend should.
2198+
let send_data = payload.builder.build();
2199+
let libdd_trace_utils::tracer_payload::TracerPayloadCollection::V07(payloads) =
2200+
send_data.get_payloads()
2201+
else {
2202+
panic!("expected V07 payload");
2203+
};
2204+
for p in payloads {
2205+
assert_eq!(
2206+
p.tags.get(crate::tags::lambda::tags::COMPUTE_STATS_KEY),
2207+
Some(&"0".to_string()),
2208+
"_dd.compute_stats must be 0 when client_computed_stats is true"
2209+
);
2210+
}
2211+
}
21132212
}

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ pub enum ProcessorCommand {
110110
},
111111
AddTracerSpan {
112112
span: Box<Span>,
113+
client_computed_stats: bool,
113114
},
114115
OnOutOfMemoryError {
115116
timestamp: i64,
@@ -372,10 +373,12 @@ impl InvocationProcessorHandle {
372373
pub async fn add_tracer_span(
373374
&self,
374375
span: Span,
376+
client_computed_stats: bool,
375377
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
376378
self.sender
377379
.send(ProcessorCommand::AddTracerSpan {
378380
span: Box::new(span),
381+
client_computed_stats,
379382
})
380383
.await
381384
}
@@ -585,8 +588,11 @@ impl InvocationProcessorService {
585588
let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id));
586589
let _ = response.send(result);
587590
}
588-
ProcessorCommand::AddTracerSpan { span } => {
589-
self.processor.add_tracer_span(&span);
591+
ProcessorCommand::AddTracerSpan {
592+
span,
593+
client_computed_stats,
594+
} => {
595+
self.processor.add_tracer_span(&span, client_computed_stats);
590596
}
591597
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
592598
self.processor.on_out_of_memory_error(timestamp);

bottlecap/src/tags/lambda/tags.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const VERSION_KEY: &str = "version";
3939
const SERVICE_KEY: &str = "service";
4040

4141
// ComputeStatsKey is the tag key indicating whether trace stats should be computed
42-
const COMPUTE_STATS_KEY: &str = "_dd.compute_stats";
42+
pub(crate) const COMPUTE_STATS_KEY: &str = "_dd.compute_stats";
4343
// FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload
4444
const FUNCTION_TAGS_KEY: &str = "_dd.tags.function";
4545
// TODO(astuyve) decide what to do with the version
@@ -127,12 +127,6 @@ fn tags_from_env(
127127
tags_map.extend(config.tags.clone());
128128
}
129129

130-
// The value of _dd.compute_stats is the opposite of config.compute_trace_stats_on_extension.
131-
// "config.compute_trace_stats_on_extension == true" means computing stats on the extension side,
132-
// so we set _dd.compute_stats to 0 so stats won't be computed on the backend side.
133-
let compute_stats = i32::from(!config.compute_trace_stats_on_extension);
134-
tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string());
135-
136130
tags_map
137131
}
138132

@@ -292,8 +286,7 @@ mod tests {
292286
fn test_new_from_config() {
293287
let metadata = HashMap::new();
294288
let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata);
295-
assert_eq!(tags.tags_map.len(), 3);
296-
assert_eq!(tags.tags_map.get(COMPUTE_STATS_KEY).unwrap(), "1");
289+
assert_eq!(tags.tags_map.len(), 2);
297290
let arch = arch_to_platform();
298291
assert_eq!(
299292
tags.tags_map.get(ARCHITECTURE_KEY).unwrap(),
@@ -428,7 +421,7 @@ mod tests {
428421
(parts[0].to_string(), parts[1].to_string())
429422
})
430423
.collect();
431-
assert_eq!(fn_tags_map.len(), 14);
424+
assert_eq!(fn_tags_map.len(), 13);
432425
assert_eq!(fn_tags_map.get("key1").unwrap(), "value1");
433426
assert_eq!(fn_tags_map.get("key2").unwrap(), "value2");
434427
assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012");
@@ -470,7 +463,7 @@ mod tests {
470463
(parts[0].to_string(), parts[1].to_string())
471464
})
472465
.collect();
473-
assert_eq!(fn_tags_map.len(), 14);
466+
assert_eq!(fn_tags_map.len(), 13);
474467
assert_eq!(fn_tags_map.get("key1").unwrap(), "value1");
475468
assert_eq!(fn_tags_map.get("key2").unwrap(), "value2");
476469
assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012");

bottlecap/src/traces/trace_agent.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,8 @@ impl TraceAgent {
501501
);
502502
}
503503

504-
let tracer_header_tags = (&parts.headers).into();
504+
let tracer_header_tags: libdd_trace_utils::tracer_header_tags::TracerHeaderTags<'_> =
505+
(&parts.headers).into();
505506

506507
let (body_size, mut traces): (usize, Vec<Vec<pb::Span>>) = match version {
507508
ApiVersion::V04 => {
@@ -531,7 +532,6 @@ impl TraceAgent {
531532
}
532533
},
533534
};
534-
535535
let mut reparenting_info = match invocation_processor_handle.get_reparenting_info().await {
536536
Ok(info) => info,
537537
Err(e) => {
@@ -586,7 +586,7 @@ impl TraceAgent {
586586

587587
if span.resource == INVOCATION_SPAN_RESOURCE
588588
&& let Err(e) = invocation_processor_handle
589-
.add_tracer_span(span.clone())
589+
.add_tracer_span(span.clone(), tracer_header_tags.client_computed_stats)
590590
.await
591591
{
592592
error!("Failed to add tracer span to processor: {}", e);

0 commit comments

Comments
 (0)