Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
return;
}
// A single propagator.inject() call invokes set() once per header key (e.g.
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
if (!carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() >= 10) {
return;
}
carrier.put(
DATADOG_KEY,
new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway));
new MessageAttributeValue()
.withDataType("String")
.withStringValue(String.format("{\"%s\": \"%s\"}", key, value)));
} else {
// _datadog was created by an earlier set() call in this same inject session; append to it.
String existing = carrier.get(DATADOG_KEY).getStringValue();
if (existing == null) {
return;
}
int closingBrace = existing.lastIndexOf('}');
if (closingBrace >= 0) {
String updated =
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
new MessageAttributeValue().withDataType("String").withStringValue(updated));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.api.datastreams.PathwayContext.PROPAGATION_KEY_BASE64;

import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -9,6 +11,7 @@
import datadog.trace.bootstrap.instrumentation.messaging.DatadogAttributeParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import org.slf4j.Logger;
Expand All @@ -23,6 +26,7 @@ public final class MessageExtractAdapter implements AgentPropagation.ContextVisi

@Override
public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) {
boolean shouldExtractContextFromBody = SHOULD_EXTRACT_CONTEXT_FROM_BODY;
Map<String, String> systemAttributes = carrier.getAttributes();
if (systemAttributes.containsKey("AWSTraceHeader")) {
// alias 'AWSTraceHeader' to 'X-Amzn-Trace-Id' because it uses the same format
Expand All @@ -31,12 +35,27 @@ public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifie
Map<String, MessageAttributeValue> messageAttributes = carrier.getMessageAttributes();
if (messageAttributes.containsKey("_datadog")) {
MessageAttributeValue datadog = messageAttributes.get("_datadog");
boolean hasPathwayContext = false;
if ("String".equals(datadog.getDataType())) {
DatadogAttributeParser.forEachProperty(classifier, datadog.getStringValue());
String value = datadog.getStringValue();
hasPathwayContext = value != null && value.contains(PROPAGATION_KEY_BASE64);
DatadogAttributeParser.forEachProperty(classifier, value);
} else if ("Binary".equals(datadog.getDataType())) {
DatadogAttributeParser.forEachProperty(classifier, datadog.getBinaryValue());
ByteBuffer value = datadog.getBinaryValue();
if (value != null) {
ByteBuffer duplicate = value.duplicate();
hasPathwayContext =
StandardCharsets.UTF_8.decode(duplicate).toString().contains(PROPAGATION_KEY_BASE64);
}
DatadogAttributeParser.forEachProperty(classifier, value);
}
} else if (SHOULD_EXTRACT_CONTEXT_FROM_BODY) {
shouldExtractContextFromBody &= !hasPathwayContext;
}

if (shouldExtractContextFromBody) {
// The top-level SQS _datadog attribute and the SNS-style body payload are separate carriers.
// APM headers may be present in the message attribute while DSM context still only exists in
// the body payload.
try {
this.forEachKeyInBody(carrier.getBody(), classifier);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public Map<String, String> contextStore() {
public static class HandlerChainAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<RequestHandler2> handlers) {
if (Config.get().isDataStreamsEnabled()) {
if (Config.get().isDataStreamsEnabled()
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
for (RequestHandler2 interceptor : handlers) {
if (interceptor instanceof SqsInterceptor) {
return; // list already has our interceptor, return to builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.create;
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER;
Expand All @@ -16,8 +16,6 @@
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
Expand All @@ -44,13 +42,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
String queueUrl = smRequest.getQueueUrl();
if (queueUrl == null) return request;

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
// making a copy of the MessageAttributes before modifying them because they can be stored in
// a kind of ImmutableMap
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(smRequest.getMessageAttributes());
dsmPropagator.inject(context, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
Context context = newContext(request, queueUrl);
defaultPropagator().inject(context, messageAttributes, SETTER);
}
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
smRequest.setMessageAttributes(messageAttributes);
Expand All @@ -60,12 +59,13 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
String queueUrl = smbRequest.getQueueUrl();
if (queueUrl == null) return request;

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.getMessageAttributes());
dsmPropagator.inject(context, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
defaultPropagator().inject(context, messageAttributes, SETTER);
}
entry.setMessageAttributes(messageAttributes);
}
} else if (request instanceof ReceiveMessageRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,100 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
client.shutdown()
}

def "APM trace context is injected into _datadog message attribute on send"() {
setup:
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
TraceUtils.runUnderTrace('parent', {
client.sendMessage(queueUrl, 'sometext')
})
def messages = client.receiveMessage(queueUrl).messages
messages.forEach {/* consume to create message spans */ }

if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}

then:
def sendSpan
assertTraces(2) {
trace(2) {
basicSpan(it, 'parent')
span {
serviceName expectedService("SQS", "SendMessage")
operationName expectedOperation("SQS", "SendMessage")
resourceName "SQS.SendMessage"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" "http://localhost:${address.port}/"
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" address.port
"$Tags.PEER_HOSTNAME" "localhost"
"aws.service" "AmazonSQS"
"aws_service" "sqs"
"aws.endpoint" "http://localhost:${address.port}"
"aws.operation" "SendMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
serviceNameSource("java-aws-sdk")
defaultTags()
}
}
sendSpan = span(1)
}
trace(1) {
span {
serviceName expectedService("SQS", "ReceiveMessage")
operationName expectedOperation("SQS", "ReceiveMessage")
resourceName "SQS.ReceiveMessage"
spanType DDSpanTypes.MESSAGE_CONSUMER
errored false
measured true
childOf(sendSpan)
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"aws.service" "AmazonSQS"
"aws_service" "sqs"
"aws.operation" "ReceiveMessageRequest"
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:${address.port}/000000000000/somequeue"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
defaultTags(true)
}
}
}
}

def ddAttr = messages[0].messageAttributes['_datadog']
ddAttr != null
ddAttr.dataType == 'String'
ddAttr.stringValue.contains('"x-datadog-trace-id"')
ddAttr.stringValue.contains(sendSpan.traceId.toString())
ddAttr.stringValue.contains('"x-datadog-parent-id"')
ddAttr.stringValue.contains(Long.toUnsignedString(sendSpan.spanId))

cleanup:
client.shutdown()
}

@IgnoreIf({ instance.isDataStreamsEnabled() })
def "trace details propagated via embedded SQS message attribute (string)"() {
setup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,36 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {

String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
if (!Config.get().isSqsInjectDatadogAttributeEnabled()) {
return;
}
// A single propagator.inject() call invokes set() once per header key (e.g.
// x-datadog-trace-id, x-datadog-parent-id, dd-pathway-ctx-base64). All of them must be
// accumulated into the same _datadog JSON attribute rather than overwriting each other.
if (!carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() >= 10) {
return;
}
carrier.put(
DATADOG_KEY,
MessageAttributeValue.builder().dataType("String").stringValue(jsonPathway).build());
MessageAttributeValue.builder()
.dataType("String")
.stringValue(String.format("{\"%s\": \"%s\"}", key, value))
.build());
} else {
// _datadog was created by an earlier set() call in this same inject session; append to it.
String existing = carrier.get(DATADOG_KEY).stringValue();
if (existing == null) {
return;
}
int closingBrace = existing.lastIndexOf('}');
if (closingBrace >= 0) {
String updated =
existing.substring(0, closingBrace) + String.format(", \"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
MessageAttributeValue.builder().dataType("String").stringValue(updated).build());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public void methodAdvice(MethodTransformer transformer) {
public static class AwsSqsBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(@Advice.Return final List<ExecutionInterceptor> interceptors) {
if (Config.get().isDataStreamsEnabled()) {
if (Config.get().isDataStreamsEnabled()
|| Config.get().isSqsInjectDatadogAttributeEnabled()) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof SqsInterceptor) {
return; // list already has our interceptor, return to builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.create;
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.instrumentation.aws.v2.sqs.MessageAttributeInjector.SETTER;

import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
Expand Down Expand Up @@ -47,11 +45,12 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
return request;
}

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
dsmPropagator.inject(ctx, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
defaultPropagator().inject(ctx, messageAttributes, SETTER);
}

return request.toBuilder().messageAttributes(messageAttributes).build();

Expand All @@ -62,14 +61,15 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
return request;
}

Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context ctx = getContext(executionAttributes, optionalQueueUrl.get());
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();

for (SendMessageBatchRequestEntry entry : request.entries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
dsmPropagator.inject(ctx, messageAttributes, SETTER);
if (!messageAttributes.containsKey(DATADOG_KEY)) {
defaultPropagator().inject(ctx, messageAttributes, SETTER);
}
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
}

Expand Down
Loading
Loading