Skip to content

Commit ab461a4

Browse files
[APMSVLS-487] respect Datadog-Client-Computed-Stats header (#1245)
## Overview Makes the extension respect the tracer's `Datadog-Client-Computed-Stats` header and moves `_dd.compute_stats` from a baked-in function tag to a per-span backend directive. We tried supporting the `Datadog-Client-Computed-Stats` header before in #1118, but that was reverted in #1176. ### Background Span attribute `_dd.compute_stats` asks the backend to compute trace stats. It must be set to `"1"` only when nobody else computed them — neither the extension (agent) nor the tracer. Previously the extension: 1. **Baked `_dd.compute_stats` into the function tags** unconditionally (`tags_from_env`), which also leaked the key into `_dd.tags.function`. 2. **Ignored `Datadog-Client-Computed-Stats`** entirely, so when a tracer computed stats client-side, the backend was still asked to compute them. ### Canonical semantics (validated against the Go agent) The Go agent (`pkg/serverless/tags/tags.go`) only ever sets `_dd.compute_stats = "1"`, never `"0"`, and leaves the key absent otherwise. This PR matches that: > Set `_dd.compute_stats="1"` iff `!compute_trace_stats_on_extension && !client_computed_stats`; otherwise leave it absent. | `client_computed_stats` (header from tracer) | `compute_on_extension` (extension config) | who computes | stamp `_dd.compute_stats="1"`? | -------------------- | --------------------- | --------------- | --- | true | (ignored) | tracer | ❌ no | false | true | extension | ❌ no | false | false | backend | ✅ yes ### Changes - **`tags/lambda/tags.rs`** — stop baking `_dd.compute_stats` in `tags_from_env` (no longer leaks into `_dd.tags.function`); `COMPUTE_STATS_KEY` is now `pub` so the integration test can reuse it instead of re-declaring the literal. - **Path A: `traces/trace_processor.rs`** — `ChunkProcessor` gains `client_computed_stats` and stamps `_dd.compute_stats="1"` per-span only when neither side computes stats; the extension-side stats-generation guard in `send_processed_traces` now also skips when `client_computed_stats` is set. - **Path B (extension-generated `aws.lambda` span)** — `client_computed_stats` is propagated from the tracer's placeholder span through `context.rs` → `processor.rs` → `processor_service.rs` → `trace_agent.rs`, so Path B reuses the same `ChunkProcessor` stamping (single source of truth). - **OTLP: `otlp/agent.rs`** — the OTLP stats-generation guard previously checked only `compute_trace_stats_on_extension`, so an OTLP request carrying `Datadog-Client-Computed-Stats` would still generate extension-side stats and double-count against the tracer. It now also skips when `client_computed_stats` is set, mirroring the `send_processed_traces` guard. - **Single source of truth: `traces/trace_processor.rs`** — the three decisions over the same two inputs (the per-span `_dd.compute_stats` stamp, plus the extension-side stats-generation guards in `send_processed_traces` and `otlp/agent.rs`) are now derived from one `StatsComputedBy::resolve(compute_on_extension, client_computed_stats)` helper, so the stamp and the guards can't silently drift apart. ### Note on the header value (cross-runtime) `Datadog-Client-Computed-Stats` is not standardized (`"true"` .NET/Java/PHP/Python, `"yes"` JS/Ruby/C++, `"t"` Go). bottlecap consumes the already-parsed `client_computed_stats` bool from `libdd_trace_utils`, where any non-empty value → `true`, so the fix triggers on every runtime. A separate libdatadog PR ([DataDog/libdatadog#2071](DataDog/libdatadog#2071)) aligns the header parsing with the Go agent's `isHeaderTrue`/`ParseBool` rules; this PR only consumes the bool and does not depend on that change. ## Testing - ~**Tier 0** (header-parsing contract)~ — moved to the libdatadog bump PR #1244, since those tests assert libdatadog parsing behavior that the `db05e1f → 48da0d8` bump changes. This branch keeps Tiers 1–3 and rebases onto [#1244](//pull/1244) after it merges. - **Tier 1** (`trace_processor.rs`) — truth-table on `Span.meta`, stats-skip guard via the real `StatsConcentratorService`, and updated `tags.rs` unit tests asserting the key no longer appears in the tags map. Fixed the logs/metrics integration tests that asserted the old leak. - **Tier 2** (`context.rs`, `processor.rs`) — context-level flag recording and an end-to-end Path B test driving `send_ctx_spans` through the `trace_tx` channel, asserting `_dd.compute_stats` on the `aws.lambda` span across the truth table. - **Tier 3** (`apm_integration_test.rs`) — full fake-intake E2E routing a trace through `SendingTraceProcessor::send_processed_traces`: asserts on the captured `AgentPayload` span meta and on `stats_payloads()` (stats suppressed unless the extension computes and the tracer did not). ### ⚠️ TODO before merging - [x] Rebase onto the libdatadog bump PR ([#1244](#1244)) after it merges. - [ ] E2E tests. --- > *"Computing stats twice doesn't make them twice as true, it just makes the backend twice as grumpy."* — Claude 🤖 [APMSVLS-487](https://datadoghq.atlassian.net/browse/APMSVLS-487) [APMSVLS-487]: https://datadoghq.atlassian.net/browse/APMSVLS-487?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent d2d30fc commit ab461a4

10 files changed

Lines changed: 696 additions & 30 deletions

File tree

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ pub struct Context {
4949
/// `PlatformRuntimeDone`, `max_memory_used == memory_size` in `PlatformReport`);
5050
/// this flag dedupes them.
5151
pub oom_emitted: bool,
52+
/// Whether the tracer signaled (via the `Datadog-Client-Computed-Stats` header) that it
53+
/// already computed trace stats client-side, propagated from the tracer's placeholder span.
54+
///
55+
/// Used when generating the extension-side `aws.lambda` span (Path B) so the backend
56+
/// stats directive (`_dd.compute_stats`) is stamped consistently with Path A.
57+
pub client_computed_stats: bool,
5258
}
5359

5460
/// Struct containing the information needed to reparent a span.
@@ -101,6 +107,7 @@ impl Default for Context {
101107
tracer_span: None,
102108
extracted_span_context: None,
103109
oom_emitted: false,
110+
client_computed_stats: false,
104111
}
105112
}
106113
}
@@ -515,12 +522,20 @@ impl ContextBuffer {
515522

516523
/// Adds the tracer span to a `Context` in the buffer.
517524
///
518-
pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) {
525+
/// `client_computed_stats` carries the tracer's `Datadog-Client-Computed-Stats` signal so
526+
/// the extension-generated `aws.lambda` span can stamp `_dd.compute_stats` consistently.
527+
pub fn add_tracer_span(
528+
&mut self,
529+
request_id: &String,
530+
tracer_span: &Span,
531+
client_computed_stats: bool,
532+
) {
519533
if let Some(context) = self
520534
.buffer
521535
.iter_mut()
522536
.find(|context| context.request_id == *request_id)
523537
{
538+
context.client_computed_stats = client_computed_stats;
524539
context
525540
.invocation_span
526541
.meta
@@ -646,6 +661,34 @@ mod tests {
646661
assert!(buffer.get(&unexistent_request_id).is_none());
647662
}
648663

664+
/// APMSVLS-487 Tier 2: `add_tracer_span` records the tracer's `client_computed_stats`
665+
/// signal onto the matching context (and is a no-op when no context matches).
666+
#[test]
667+
fn test_add_tracer_span_sets_client_computed_stats() {
668+
for client_computed_stats in [true, false] {
669+
let mut buffer = ContextBuffer::with_capacity(2);
670+
let request_id = String::from("req-1");
671+
buffer.insert(Context::from_request_id(&request_id));
672+
673+
let mut tracer_span = Span::default();
674+
tracer_span
675+
.meta
676+
.insert("request_id".to_string(), request_id.clone());
677+
678+
buffer.add_tracer_span(&request_id, &tracer_span, client_computed_stats);
679+
680+
assert_eq!(
681+
buffer.get(&request_id).unwrap().client_computed_stats,
682+
client_computed_stats
683+
);
684+
}
685+
686+
// No matching context -> no panic, nothing recorded.
687+
let mut buffer = ContextBuffer::with_capacity(2);
688+
buffer.add_tracer_span(&String::from("missing"), &Span::default(), true);
689+
assert!(buffer.get(&String::from("missing")).is_none());
690+
}
691+
649692
#[test]
650693
fn test_add_start_time() {
651694
let mut buffer = ContextBuffer::with_capacity(2);

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -665,9 +665,17 @@ impl Processor {
665665
trace_sender: &Arc<SendingTraceProcessor>,
666666
context: Context,
667667
) {
668+
// Capture before `get_ctx_spans` consumes `context`.
669+
let client_computed_stats = context.client_computed_stats;
668670
let (traces, body_size) = self.get_ctx_spans(context);
669-
self.send_spans(traces, body_size, tags_provider, trace_sender)
670-
.await;
671+
self.send_spans(
672+
traces,
673+
body_size,
674+
tags_provider,
675+
trace_sender,
676+
client_computed_stats,
677+
)
678+
.await;
671679
}
672680

673681
fn get_ctx_spans(&mut self, context: Context) -> (Vec<Span>, usize) {
@@ -732,7 +740,9 @@ impl Processor {
732740
let traces = vec![cold_start_span.clone()];
733741
let body_size = size_of_val(cold_start_span);
734742

735-
self.send_spans(traces, body_size, tags_provider, trace_sender)
743+
// The cold start span is extension-generated and not tied to a tracer's stats
744+
// signal, so the backend should compute its stats unless the extension does.
745+
self.send_spans(traces, body_size, tags_provider, trace_sender, false)
736746
.await;
737747
}
738748
}
@@ -746,8 +756,12 @@ impl Processor {
746756
body_size: usize,
747757
tags_provider: &Arc<provider::Provider>,
748758
trace_sender: &Arc<SendingTraceProcessor>,
759+
client_computed_stats: bool,
749760
) {
750761
// todo: figure out what to do here
762+
// `client_computed_stats` is propagated from the tracer's placeholder span so the
763+
// downstream `ChunkProcessor` (reused via `send_processed_traces` -> `process_traces`)
764+
// stamps `_dd.compute_stats` on these extension-generated spans consistently with Path A.
751765
let header_tags = tracer_header_tags::TracerHeaderTags {
752766
lang: "",
753767
lang_version: "",
@@ -756,7 +770,7 @@ impl Processor {
756770
tracer_version: "",
757771
container_id: "",
758772
client_computed_top_level: false,
759-
client_computed_stats: false,
773+
client_computed_stats,
760774
dropped_p0_traces: 0,
761775
dropped_p0_spans: 0,
762776
};
@@ -1448,9 +1462,10 @@ impl Processor {
14481462
///
14491463
/// This is used to enrich the invocation span with additional metadata from the tracers
14501464
/// top level span, since we discard the tracer span when we create the invocation span.
1451-
pub fn add_tracer_span(&mut self, span: &Span) {
1465+
pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) {
14521466
if let Some(request_id) = span.meta.get("request_id") {
1453-
self.context_buffer.add_tracer_span(request_id, span);
1467+
self.context_buffer
1468+
.add_tracer_span(request_id, span, client_computed_stats);
14541469
}
14551470
}
14561471

@@ -2622,4 +2637,137 @@ mod tests {
26222637
"OOM must be emitted when max_memory_used_mb == memory_size_mb"
26232638
);
26242639
}
2640+
2641+
/// Build a [`Processor`] with a caller-supplied config (for toggling
2642+
/// `compute_trace_stats_on_extension`).
2643+
fn setup_with_config(config: Arc<config::Config>) -> Processor {
2644+
let aws_config = Arc::new(AwsConfig {
2645+
region: "us-east-1".into(),
2646+
aws_lwa_proxy_lambda_runtime_api: Some("***".into()),
2647+
function_name: "test-function".into(),
2648+
sandbox_init_time: Instant::now(),
2649+
runtime_api: "***".into(),
2650+
exec_wrapper: None,
2651+
initialization_type: "on-demand".into(),
2652+
});
2653+
let tags_provider = Arc::new(provider::Provider::new(
2654+
Arc::clone(&config),
2655+
LAMBDA_RUNTIME_SLUG.to_string(),
2656+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
2657+
));
2658+
let (service, handle) =
2659+
AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service");
2660+
tokio::spawn(service.run());
2661+
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
2662+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
2663+
Processor::new(
2664+
tags_provider,
2665+
config,
2666+
aws_config,
2667+
handle,
2668+
propagator,
2669+
durable_context_tx,
2670+
)
2671+
}
2672+
2673+
/// Like [`make_trace_sender`], but returns the receiver so the test can inspect the
2674+
/// processed payload that Path B sends downstream.
2675+
fn make_trace_sender_with_rx(
2676+
config: Arc<config::Config>,
2677+
) -> (
2678+
Arc<SendingTraceProcessor>,
2679+
tokio::sync::mpsc::Receiver<crate::traces::trace_aggregator::SendDataBuilderInfo>,
2680+
) {
2681+
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
2682+
let (stats_concentrator_service, stats_concentrator_handle) =
2683+
StatsConcentratorService::new(Arc::clone(&config));
2684+
tokio::spawn(stats_concentrator_service.run());
2685+
let (trace_tx, trace_rx) = tokio::sync::mpsc::channel(8);
2686+
let sender = Arc::new(SendingTraceProcessor {
2687+
appsec: None,
2688+
processor: Arc::new(trace_processor::ServerlessTraceProcessor {
2689+
obfuscation_config: Arc::new(
2690+
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
2691+
),
2692+
}),
2693+
trace_tx,
2694+
stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)),
2695+
});
2696+
(sender, trace_rx)
2697+
}
2698+
2699+
/// APMSVLS-487 Tier 2: the extension-generated `aws.lambda` span (Path B) stamps
2700+
/// `_dd.compute_stats="1"` only when neither the extension nor the tracer computes stats;
2701+
/// otherwise the key is absent. `client_computed_stats` is propagated from the context.
2702+
#[tokio::test]
2703+
#[allow(clippy::unwrap_used)]
2704+
async fn test_send_ctx_spans_stamps_compute_stats() {
2705+
use crate::tags::lambda::tags::COMPUTE_STATS_KEY;
2706+
use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
2707+
2708+
#[cfg(feature = "fips")]
2709+
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
2710+
#[cfg(not(feature = "fips"))]
2711+
let _ = rustls::crypto::ring::default_provider().install_default();
2712+
2713+
// (compute_on_extension, client_computed_stats) -> expected meta value on aws.lambda span
2714+
let cases = [
2715+
(false, false, Some("1")),
2716+
(false, true, None),
2717+
(true, false, None),
2718+
(true, true, None),
2719+
];
2720+
2721+
for (compute_on_extension, client_computed_stats, expected) in cases {
2722+
let config = Arc::new(config::Config {
2723+
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
2724+
service: Some("test-service".to_string()),
2725+
compute_trace_stats_on_extension: compute_on_extension,
2726+
..config::Config::default()
2727+
});
2728+
let mut processor = setup_with_config(Arc::clone(&config));
2729+
let (trace_sender, mut trace_rx) = make_trace_sender_with_rx(Arc::clone(&config));
2730+
2731+
let mut context = Context::from_request_id("req-1");
2732+
context.client_computed_stats = client_computed_stats;
2733+
context.invocation_span = Span {
2734+
name: "aws.lambda".to_string(),
2735+
resource: "test-resource".to_string(),
2736+
service: "test-service".to_string(),
2737+
span_id: 1,
2738+
trace_id: 100,
2739+
..Default::default()
2740+
};
2741+
2742+
let tags_provider = Arc::new(provider::Provider::new(
2743+
Arc::clone(&config),
2744+
LAMBDA_RUNTIME_SLUG.to_string(),
2745+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
2746+
));
2747+
processor
2748+
.send_ctx_spans(&tags_provider, &trace_sender, context)
2749+
.await;
2750+
2751+
let info = trace_rx.recv().await.expect("expected a sent payload");
2752+
let send_data = info.builder.build();
2753+
let TracerPayloadCollection::V07(payloads) = send_data.get_payloads() else {
2754+
panic!("expected V07 payload");
2755+
};
2756+
let aws_lambda_span = payloads
2757+
.iter()
2758+
.flat_map(|p| &p.chunks)
2759+
.flat_map(|c| &c.spans)
2760+
.find(|s| s.name == "aws.lambda")
2761+
.expect("aws.lambda span should be present");
2762+
2763+
assert_eq!(
2764+
aws_lambda_span
2765+
.meta
2766+
.get(COMPUTE_STATS_KEY)
2767+
.map(String::as_str),
2768+
expected,
2769+
"compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}"
2770+
);
2771+
}
2772+
}
26252773
}

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub enum ProcessorCommand {
109109
},
110110
AddTracerSpan {
111111
span: Box<Span>,
112+
client_computed_stats: bool,
112113
},
113114
ForwardDurableContext {
114115
request_id: String,
@@ -379,10 +380,12 @@ impl InvocationProcessorHandle {
379380
pub async fn add_tracer_span(
380381
&self,
381382
span: Span,
383+
client_computed_stats: bool,
382384
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
383385
self.sender
384386
.send(ProcessorCommand::AddTracerSpan {
385387
span: Box::new(span),
388+
client_computed_stats,
386389
})
387390
.await
388391
}
@@ -617,8 +620,11 @@ impl InvocationProcessorService {
617620
let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id));
618621
let _ = response.send(result);
619622
}
620-
ProcessorCommand::AddTracerSpan { span } => {
621-
self.processor.add_tracer_span(&span);
623+
ProcessorCommand::AddTracerSpan {
624+
span,
625+
client_computed_stats,
626+
} => {
627+
self.processor.add_tracer_span(&span, client_computed_stats);
622628
}
623629
ProcessorCommand::ForwardDurableContext {
624630
request_id,

bottlecap/src/otlp/agent.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use crate::{
2525
otlp::processor::{OtlpEncoding, Processor as OtlpProcessor},
2626
tags::provider,
2727
traces::{
28-
stats_generator::StatsGenerator, trace_aggregator::SendDataBuilderInfo,
29-
trace_processor::TraceProcessor,
28+
stats_generator::StatsGenerator,
29+
trace_aggregator::SendDataBuilderInfo,
30+
trace_processor::{StatsComputedBy, TraceProcessor},
3031
},
3132
};
3233

@@ -58,6 +59,8 @@ impl TracePipeline {
5859
}
5960

6061
let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension;
62+
// Capture before `tracer_header_tags` is moved into process_traces below.
63+
let client_computed_stats = tracer_header_tags.client_computed_stats;
6164
let (send_data_builder, processed_traces) = self.trace_processor.process_traces(
6265
self.config.clone(),
6366
self.tags_provider.clone(),
@@ -78,7 +81,10 @@ impl TracePipeline {
7881

7982
// This needs to be after process_traces() because process_traces()
8083
// performs obfuscation, and we need to compute stats on the obfuscated traces.
81-
if compute_trace_stats_on_extension
84+
// Skip extension-side stats generation when the tracer already computed stats
85+
// client-side (Datadog-Client-Computed-Stats), to avoid double-counting.
86+
if StatsComputedBy::resolve(compute_trace_stats_on_extension, client_computed_stats)
87+
== StatsComputedBy::Extension
8288
&& let Err(err) = self.stats_generator.send(&processed_traces)
8389
{
8490
// Just log the error. We don't think trace stats are critical, so we don't want to
@@ -111,6 +117,10 @@ impl TraceService for TracePipeline {
111117
}
112118
};
113119

120+
// Default tags mean client_computed_stats is always false for OTLP gRPC, so
121+
// Datadog-Client-Computed-Stats metadata is ignored. Safe today: Datadog tracers
122+
// that compute stats never export via OTLP. Revisit if that ever changes, to
123+
// avoid double-counting stats.
114124
let tracer_header_tags = DatadogTracerHeaderTags::default();
115125

116126
self.process_and_send_traces(tracer_header_tags, traces, body_size)

0 commit comments

Comments
 (0)