Skip to content

Commit 51998da

Browse files
Inject APM trace headers into SQS _datadog message attribute on send (#10843)
Add failing tests for missing APM header injection into SQS _datadog attribute Documents the missing send-side feature: Java never writes x-datadog-trace-id or x-datadog-parent-id into _datadog, breaking Java→other-tracer propagation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Inject APM trace headers into SQS _datadog message attribute on send Previously, the Java SQS instrumentation never wrote APM trace context (x-datadog-trace-id, x-datadog-parent-id) into the _datadog message attribute, breaking distributed tracing from Java to other language tracers (Python, Node, etc.) that rely on that attribute. Changes: - SqsInterceptor (v1 + v2): call defaultPropagator().inject() before DSM inject so APM headers are written into _datadog on SendMessage and SendMessageBatch - MessageAttributeInjector (v1 + v2): accumulate multiple key-value pairs into the _datadog JSON blob instead of short-circuiting on the second key - SqsClientInstrumentation (v1 + v2): register SqsInterceptor when isSqsInjectDatadogAttributeEnabled() is true, not only when DSM is enabled, so APM injection works even without Data Streams - Tests: add APM trace context injection test for both v1 and v2, covering DSM-on and DSM-off variants Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Guard against null stringValue in MessageAttributeInjector When a message is sent with an existing _datadog attribute of Binary type (e.g. from the SQS-JMS library), getStringValue() returns null. Skip injection in that case rather than NPE. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Remove redundant dsmPropagator.inject() in SqsInterceptor DSM_CONCERN is registered with usedAsDefault=true, so defaultPropagator() already includes DSM injection when Data Streams is enabled. The separate dsmPropagator.inject() call was therefore always redundant: a no-op when DSM is off, a duplicate when DSM is on. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Don't overwrite pre-existing _datadog attribute; clarify accumulation SqsInterceptor: skip injection if _datadog is already present in the message attributes. Appending to a caller-provided attribute could produce duplicate JSON keys or corrupt propagation context set intentionally upstream. MessageAttributeInjector: add comments explaining why set() accumulates into the existing _datadog value rather than overwriting — a single propagator.inject() call fires set() once per header key, so all keys (x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64, ...) must land in the same JSON attribute. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Fix span leak in SQS SendMessageBatch: create context once per batch Moving newContext/getContext outside the per-entry loop prevents creating a new span per entry where each overwrites the same contextStore slot, leaking all but the last span and injecting mismatched parent IDs into earlier batch messages. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Merge branch 'master' into ygree/sqs-inject-tracing-context Fix SQS v1 body propagation fallback when _datadog lacks DSM context Merge branch 'master' into ygree/sqs-inject-tracing-context Co-authored-by: valentin.zakharov <valentin.zakharov@datadoghq.com>
1 parent e4d4017 commit 51998da

File tree

9 files changed

+288
-32
lines changed

9 files changed

+288
-32
lines changed

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageAttributeInjector.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,35 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
1414
@Override
1515
public void set(
1616
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
17-
if (carrier.size() < 10
18-
&& !carrier.containsKey(DATADOG_KEY)
19-
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
20-
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
17+
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
18+
return;
19+
}
20+
// A single propagator.inject() call invokes set() once per header key (e.g.
21+
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
22+
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
23+
if (!carrier.containsKey(DATADOG_KEY)) {
24+
if (carrier.size() >= 10) {
25+
return;
26+
}
2127
carrier.put(
2228
DATADOG_KEY,
23-
new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway));
29+
new MessageAttributeValue()
30+
.withDataType("String")
31+
.withStringValue(String.format("{\"%s\": \"%s\"}", key, value)));
32+
} else {
33+
// _datadog was created by an earlier set() call in this same inject session; append to it.
34+
String existing = carrier.get(DATADOG_KEY).getStringValue();
35+
if (existing == null) {
36+
return;
37+
}
38+
int closingBrace = existing.lastIndexOf('}');
39+
if (closingBrace >= 0) {
40+
String updated =
41+
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
42+
carrier.put(
43+
DATADOG_KEY,
44+
new MessageAttributeValue().withDataType("String").withStringValue(updated));
45+
}
2446
}
2547
}
2648
}

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/MessageExtractAdapter.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.instrumentation.aws.v1.sqs;
22

3+
import static datadog.trace.api.datastreams.PathwayContext.PROPAGATION_KEY_BASE64;
4+
35
import com.amazonaws.services.sqs.model.Message;
46
import com.amazonaws.services.sqs.model.MessageAttributeValue;
57
import com.fasterxml.jackson.databind.JsonNode;
@@ -9,6 +11,7 @@
911
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser;
1012
import java.io.IOException;
1113
import java.nio.ByteBuffer;
14+
import java.nio.charset.StandardCharsets;
1215
import java.util.Base64;
1316
import java.util.Map;
1417
import org.slf4j.Logger;
@@ -23,6 +26,7 @@ public final class MessageExtractAdapter implements AgentPropagation.ContextVisi
2326

2427
@Override
2528
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) {
29+
boolean shouldExtractContextFromBody = SHOULD_EXTRACT_CONTEXT_FROM_BODY;
2630
Map<String, String> systemAttributes = carrier.getAttributes();
2731
if (systemAttributes.containsKey("AWSTraceHeader")) {
2832
// alias 'AWSTraceHeader' to 'X-Amzn-Trace-Id' because it uses the same format
@@ -31,12 +35,27 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie
3135
Map<String, MessageAttributeValue> messageAttributes = carrier.getMessageAttributes();
3236
if (messageAttributes.containsKey("_datadog")) {
3337
MessageAttributeValue datadog = messageAttributes.get("_datadog");
38+
boolean hasPathwayContext = false;
3439
if ("String".equals(datadog.getDataType())) {
35-
DatadogAttributeParser.forEachProperty(classifier, datadog.getStringValue());
40+
String value = datadog.getStringValue();
41+
hasPathwayContext = value != null && value.contains(PROPAGATION_KEY_BASE64);
42+
DatadogAttributeParser.forEachProperty(classifier, value);
3643
} else if ("Binary".equals(datadog.getDataType())) {
37-
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue());
44+
ByteBuffer value = datadog.getBinaryValue();
45+
if (value != null) {
46+
ByteBuffer duplicate = value.duplicate();
47+
hasPathwayContext =
48+
StandardCharsets.UTF_8.decode(duplicate).toString().contains(PROPAGATION_KEY_BASE64);
49+
}
50+
DatadogAttributeParser.forEachProperty(classifier, value);
3851
}
39-
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) {
52+
shouldExtractContextFromBody &= !hasPathwayContext;
53+
}
54+
55+
if (shouldExtractContextFromBody) {
56+
// The top-level SQS _datadog attribute and the SNS-style body payload are separate carriers.
57+
// APM headers may be present in the message attribute while DSM context still only exists in
58+
// the body payload.
4059
try {
4160
this.forEachKeyInBody(carrier.getBody(), classifier);
4261
} catch (Throwable e) {

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsClientInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public Map<String, String> contextStore() {
5252
public static class HandlerChainAdvice {
5353
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
5454
public static void addHandler(@Advice.Return final List<RequestHandler2> handlers) {
55-
if (Config.get().isDataStreamsEnabled()) {
55+
if (Config.get().isDataStreamsEnabled()
56+
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
5657
for (RequestHandler2 interceptor : handlers) {
5758
if (interceptor instanceof SqsInterceptor) {
5859
return; // list already has our interceptor, return to builder

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package datadog.trace.instrumentation.aws.v1.sqs;
22

3+
import static datadog.context.propagation.Propagators.defaultPropagator;
34
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
45
import static datadog.trace.api.datastreams.DataStreamsTags.create;
56
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
6-
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
88
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
99
import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER;
@@ -16,8 +16,6 @@
1616
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
1717
import com.amazonaws.services.sqs.model.SendMessageRequest;
1818
import datadog.context.Context;
19-
import datadog.context.propagation.Propagator;
20-
import datadog.context.propagation.Propagators;
2119
import datadog.trace.api.Config;
2220
import datadog.trace.api.datastreams.DataStreamsContext;
2321
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -44,13 +42,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
4442
String queueUrl = smRequest.getQueueUrl();
4543
if (queueUrl == null) return request;
4644

47-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
48-
Context context = newContext(request, queueUrl);
4945
// making a copy of the MessageAttributes before modifying them because they can be stored in
5046
// a kind of ImmutableMap
5147
Map<String, MessageAttributeValue> messageAttributes =
5248
new HashMap<>(smRequest.getMessageAttributes());
53-
dsmPropagator.inject(context, messageAttributes, SETTER);
49+
if (!messageAttributes.containsKey(DATADOG_KEY)) {
50+
Context context = newContext(request, queueUrl);
51+
defaultPropagator().inject(context, messageAttributes, SETTER);
52+
}
5453
// note: modifying message attributes has to be done before marshalling, otherwise the changes
5554
// are not reflected in the actual request (and the MD5 check on send will fail).
5655
smRequest.setMessageAttributes(messageAttributes);
@@ -60,12 +59,13 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
6059
String queueUrl = smbRequest.getQueueUrl();
6160
if (queueUrl == null) return request;
6261

63-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
6462
Context context = newContext(request, queueUrl);
6563
for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) {
6664
Map<String, MessageAttributeValue> messageAttributes =
6765
new HashMap<>(entry.getMessageAttributes());
68-
dsmPropagator.inject(context, messageAttributes, SETTER);
66+
if (!messageAttributes.containsKey(DATADOG_KEY)) {
67+
defaultPropagator().inject(context, messageAttributes, SETTER);
68+
}
6969
entry.setMessageAttributes(messageAttributes);
7070
}
7171
} else if (request instanceof ReceiveMessageRequest) {

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
265265
client.shutdown()
266266
}
267267

268+
def "APM trace context is injected into _datadog message attribute on send"() {
269+
setup:
270+
def client = AmazonSQSClientBuilder.standard()
271+
.withEndpointConfiguration(endpoint)
272+
.withCredentials(credentialsProvider)
273+
.build()
274+
def queueUrl = client.createQueue('somequeue').queueUrl
275+
TEST_WRITER.clear()
276+
277+
when:
278+
TraceUtils.runUnderTrace('parent', {
279+
client.sendMessage(queueUrl, 'sometext')
280+
})
281+
def messages = client.receiveMessage(queueUrl).messages
282+
messages.forEach {/* consume to create message spans */ }
283+
284+
if (isDataStreamsEnabled()) {
285+
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
286+
}
287+
288+
then:
289+
def sendSpan
290+
assertTraces(2) {
291+
trace(2) {
292+
basicSpan(it, 'parent')
293+
span {
294+
serviceName expectedService("SQS", "SendMessage")
295+
operationName expectedOperation("SQS", "SendMessage")
296+
resourceName "SQS.SendMessage"
297+
spanType DDSpanTypes.HTTP_CLIENT
298+
errored false
299+
measured true
300+
childOf(span(0))
301+
tags {
302+
"$Tags.COMPONENT" "java-aws-sdk"
303+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
304+
"$Tags.HTTP_URL" "http://localhost:${address.port}/"
305+
"$Tags.HTTP_METHOD" "POST"
306+
"$Tags.HTTP_STATUS" 200
307+
"$Tags.PEER_PORT" address.port
308+
"$Tags.PEER_HOSTNAME" "localhost"
309+
"aws.service" "AmazonSQS"
310+
"aws_service" "sqs"
311+
"aws.endpoint" "http://localhost:${address.port}"
312+
"aws.operation" "SendMessageRequest"
313+
"aws.agent" "java-aws-sdk"
314+
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
315+
if ({ isDataStreamsEnabled() }) {
316+
"$DDTags.PATHWAY_HASH" { String }
317+
}
318+
serviceNameSource("java-aws-sdk")
319+
defaultTags()
320+
}
321+
}
322+
sendSpan = span(1)
323+
}
324+
trace(1) {
325+
span {
326+
serviceName expectedService("SQS", "ReceiveMessage")
327+
operationName expectedOperation("SQS", "ReceiveMessage")
328+
resourceName "SQS.ReceiveMessage"
329+
spanType DDSpanTypes.MESSAGE_CONSUMER
330+
errored false
331+
measured true
332+
childOf(sendSpan)
333+
tags {
334+
"$Tags.COMPONENT" "java-aws-sdk"
335+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
336+
"aws.service" "AmazonSQS"
337+
"aws_service" "sqs"
338+
"aws.operation" "ReceiveMessageRequest"
339+
"aws.agent" "java-aws-sdk"
340+
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
341+
if ({ isDataStreamsEnabled() }) {
342+
"$DDTags.PATHWAY_HASH" { String }
343+
}
344+
defaultTags(true)
345+
}
346+
}
347+
}
348+
}
349+
350+
def ddAttr = messages[0].messageAttributes['_datadog']
351+
ddAttr != null
352+
ddAttr.dataType == 'String'
353+
ddAttr.stringValue.contains('"x-datadog-trace-id"')
354+
ddAttr.stringValue.contains(sendSpan.traceId.toString())
355+
ddAttr.stringValue.contains('"x-datadog-parent-id"')
356+
ddAttr.stringValue.contains(Long.toUnsignedString(sendSpan.spanId))
357+
358+
cleanup:
359+
client.shutdown()
360+
}
361+
268362
@IgnoreIf({ instance.isDataStreamsEnabled() })
269363
def "trace details propagated via embedded SQS message attribute (string)"() {
270364
setup:

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/MessageAttributeInjector.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,36 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
1616
@Override
1717
public void set(
1818
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
19-
if (carrier.size() < 10
20-
&& !carrier.containsKey(DATADOG_KEY)
21-
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
22-
23-
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
19+
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
20+
return;
21+
}
22+
// A single propagator.inject() call invokes set() once per header key (e.g.
23+
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
24+
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
25+
if (!carrier.containsKey(DATADOG_KEY)) {
26+
if (carrier.size() >= 10) {
27+
return;
28+
}
2429
carrier.put(
2530
DATADOG_KEY,
26-
MessageAttributeValue.builder().dataType("String").stringValue(jsonPathway).build());
31+
MessageAttributeValue.builder()
32+
.dataType("String")
33+
.stringValue(String.format("{\"%s\": \"%s\"}", key, value))
34+
.build());
35+
} else {
36+
// _datadog was created by an earlier set() call in this same inject session; append to it.
37+
String existing = carrier.get(DATADOG_KEY).stringValue();
38+
if (existing == null) {
39+
return;
40+
}
41+
int closingBrace = existing.lastIndexOf('}');
42+
if (closingBrace >= 0) {
43+
String updated =
44+
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
45+
carrier.put(
46+
DATADOG_KEY,
47+
MessageAttributeValue.builder().dataType("String").stringValue(updated).build());
48+
}
2749
}
2850
}
2951
}

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsClientInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ public void methodAdvice(MethodTransformer transformer) {
2828
public static class AwsSqsBuilderAdvice {
2929
@Advice.OnMethodExit(suppress = Throwable.class)
3030
public static void methodExit(@Advice.Return final List<ExecutionInterceptor> interceptors) {
31-
if (Config.get().isDataStreamsEnabled()) {
31+
if (Config.get().isDataStreamsEnabled()
32+
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
3233
for (ExecutionInterceptor interceptor : interceptors) {
3334
if (interceptor instanceof SqsInterceptor) {
3435
return; // list already has our interceptor, return to builder

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsInterceptor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

3+
import static datadog.context.propagation.Propagators.defaultPropagator;
34
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
45
import static datadog.trace.api.datastreams.DataStreamsTags.create;
56
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
6-
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
77
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
88
import static datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector.SETTER;
99

1010
import datadog.context.Context;
11-
import datadog.context.propagation.Propagator;
12-
import datadog.context.propagation.Propagators;
1311
import datadog.trace.api.Config;
1412
import datadog.trace.api.datastreams.DataStreamsContext;
1513
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -47,11 +45,12 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
4745
return request;
4846
}
4947

50-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
51-
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
5248
Map<String, MessageAttributeValue> messageAttributes =
5349
new HashMap<>(request.messageAttributes());
54-
dsmPropagator.inject(ctx, messageAttributes, SETTER);
50+
if (!messageAttributes.containsKey(DATADOG_KEY)) {
51+
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
52+
defaultPropagator().inject(ctx, messageAttributes, SETTER);
53+
}
5554

5655
return request.toBuilder().messageAttributes(messageAttributes).build();
5756

@@ -62,14 +61,15 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
6261
return request;
6362
}
6463

65-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
6664
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
6765
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
6866

6967
for (SendMessageBatchRequestEntry entry : request.entries()) {
7068
Map<String, MessageAttributeValue> messageAttributes =
7169
new HashMap<>(entry.messageAttributes());
72-
dsmPropagator.inject(ctx, messageAttributes, SETTER);
70+
if (!messageAttributes.containsKey(DATADOG_KEY)) {
71+
defaultPropagator().inject(ctx, messageAttributes, SETTER);
72+
}
7373
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
7474
}
7575

0 commit comments

Comments
 (0)