Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ out/
# Visual Studio Code #
######################
.vscode
.cursor

# Others #
##########
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@
2 org.springframework.lang.*
2 org.springframework.messaging.*
0 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
0 org.springframework.messaging.support.GenericMessage
2 org.springframework.objenesis.*
2 org.springframework.orm.*
2 org.springframework.remoting.*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/**
* Instrumentation for SqsAsyncClient receiveMessage calls to track when Spring-managed clients are
* making receive operations and mark the responses accordingly.
*/
@AutoService(InstrumenterModule.class)
public class SqsAsyncClientInstrumentation extends AbstractSqsInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "software.amazon.awssdk.services.sqs.DefaultSqsAsyncClient";
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStore = new java.util.HashMap<>(2);
contextStore.put("software.amazon.awssdk.services.sqs.SqsAsyncClient", Boolean.class.getName());
// Map queue URL to Spring management status
contextStore.put("java.lang.String", "java.lang.Boolean");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, this seems suspicious. Do we really need a context store for string types? Perhaps there is a more specific class for this context store.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not possible to find a more specific class for the context store, then perhaps a value class with a more specific type than Boolean can be used for both queueURL and asyncClient instead?

return contextStore;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
// Instrument the receiveMessage method to map queue URLs to Spring management status
transformer.applyAdvice(
isMethod()
.and(named("receiveMessage"))
.and(
takesArgument(
0, named("software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest"))),
getClass().getName() + "$ReceiveMessageAdvice");
}

public static class ReceiveMessageAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
static void onExit(
@Advice.This SqsAsyncClient client, @Advice.Argument(0) ReceiveMessageRequest req) {

Boolean isSpringClient =
InstrumentationContext.get(SqsAsyncClient.class, Boolean.class).get(client);

if (Boolean.TRUE.equals(isSpringClient)) {
// Map the queue URL to Spring management status
final ContextStore<String, Boolean> queueUrlFlags =
InstrumentationContext.get(String.class, Boolean.class);
queueUrlFlags.put(req.queueUrl(), Boolean.TRUE);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -44,8 +45,16 @@ public String[] helperClassNames() {

@Override
public Map<String, String> contextStore() {
return singletonMap(
Map<String, String> contextStore = new java.util.HashMap<>(3);
// Keep original String context for backward compatibility with TracingExecutionInterceptor
contextStore.put(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
contextStore.put(
"software.amazon.awssdk.services.sqs.model.Message",
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
// Map queue URL to Spring management status (shared with SqsAsyncClientInstrumentation)
contextStore.put("java.lang.String", "java.lang.Boolean");
return contextStore;
}

@Override
Expand All @@ -63,7 +72,20 @@ public static void onExit(
String queueUrl =
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
if (queueUrl != null) {
messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId());
// Check if this queue URL is from a Spring-managed client
Boolean isFromSpringClient =
InstrumentationContext.get(String.class, Boolean.class).get(queueUrl);

ContextStore<Message, State> messageStateStore = null;
if (Boolean.TRUE.equals(isFromSpringClient)) {
// Only continue span if message has been retrieved by spring-messaging.
// Only set messageStateStore for Spring clients
messageStateStore = InstrumentationContext.get(Message.class, State.class);
}

messages =
new TracingList(
messageStateStore, messages, queueUrl, result.responseMetadata().requestId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,12 +31,18 @@
public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);

private final ContextStore<Message, State> messageStateStore;
protected final L delegate;
private final String queueUrl;
private final String requestId;
private AgentSpanContext batchContext;

public TracingIterator(L delegate, String queueUrl, String requestId) {
public TracingIterator(
ContextStore<Message, State> messageStateStore,
L delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -99,6 +107,13 @@ protected void startNewMessageSpan(Message message) {
BROKER_DECORATE.beforeFinish(queueSpan);
queueSpan.finish();
}

if (messageStateStore != null) {
// Capture state after data streams checkpoint is set for spring applications
State state = State.FACTORY.create();
state.captureAndSetContinuation(span);
messageStateStore.put(message, state);
}
}
} catch (Exception e) {
log.debug("Problem tracing new SQS message span", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingList implements List<Message> {
private final ContextStore<Message, State> messageStateStore;
private final List<Message> delegate;
private final String queueUrl;
private final String requestId;

public TracingList(List<Message> delegate, String queueUrl, String requestId) {
public TracingList(
ContextStore<Message, State> messageStateStore,
List<Message> delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -125,12 +133,14 @@ public ListIterator<Message> listIterator() {
@Override
public ListIterator<Message> listIterator(int index) {
// every iteration will add spans. Not only the very first one
return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId);
return new TracingListIterator(
messageStateStore, delegate.listIterator(index), queueUrl, requestId);
}

@Override
public List<Message> subList(int fromIndex, int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId);
return new TracingList(
messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingListIterator extends TracingIterator<ListIterator<Message>>
implements ListIterator<Message> {

public TracingListIterator(ListIterator<Message> delegate, String queueUrl, String requestId) {
super(delegate, queueUrl, requestId);
public TracingListIterator(
ContextStore<Message, State> messageStateStore,
ListIterator<Message> delegate,
String queueUrl,
String requestId) {
super(messageStateStore, delegate, queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
def message = Message.builder().messageAttributes(['_datadog': MessageAttributeValue.builder().dataType('String').stringValue(
"{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}"
).build()]).build()
def messages = new TracingList([message],
def messages = new TracingList(null, [message],
"http://localhost:${address.port}/000000000000/somequeue",
"00000000-0000-0000-0000-000000000000")

Expand Down Expand Up @@ -241,7 +241,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
def message = Message.builder().messageAttributes(['_datadog': MessageAttributeValue.builder().dataType('Binary').binaryValue(SdkBytes.fromByteBuffer(
headerValue
)).build()]).build()
def messages = new TracingList([message],
def messages = new TracingList(null, [message],
"http://localhost:${address.port}/000000000000/somequeue",
"00000000-0000-0000-0000-000000000000")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND;
import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
Expand Down Expand Up @@ -53,12 +57,33 @@ public String[] helperClassNames() {
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap("org.springframework.messaging.Message", State.class.getName());
}

public static class HandleMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
AgentSpanContext parentContext;
AgentSpan parent = activeSpan();

// First try to get context from continuation (preferred method)
State state = InstrumentationContext.get(Message.class, State.class).get(message);
if (null != state) {
AgentScope.Continuation continuation = state.getAndResetContinuation();
if (null != continuation) {
try (AgentScope scope = continuation.activate()) {
AgentSpan span = startSpan(SPRING_INBOUND);
DECORATE.afterStart(span);
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
return activateSpan(span);
}
}
}

// Fallback to existing context or header extraction
if (null != parent) {
// prefer existing context, assume it was already extracted from this message
parentContext = parent.context();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
muzzle {
pass {
group = 'io.awspring.cloud'
module = 'spring-cloud-aws-sqs'
versions = "[3.0.0,)"
assertInverse = true
}
}

ext {
minJavaVersionForTests = JavaVersion.VERSION_17
}


apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
tasks.named(name, GroovyCompile) {
javaLauncher = getJavaLauncherFor(17)
}
}

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23'

testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sdk-2.2')
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sqs-2.0')
testImplementation project(':dd-java-agent:instrumentation:spring:spring-messaging-4.0')

testImplementation group: 'org.springframework', name: 'spring-context', version: '6.1.10'
testImplementation group: 'org.springframework', name: 'spring-test', version: '6.1.10'
testImplementation group: 'org.springframework', name: 'spring-core', version: '6.1.10'
testImplementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.1.0'
testImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
testImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.20.162'
testImplementation group: 'org.testcontainers', name: 'localstack', version: libs.versions.testcontainers.get()
testImplementation 'org.slf4j:slf4j-api:2.0.13'
testImplementation 'ch.qos.logback:logback-classic:1.4.14'
testImplementation 'ch.qos.logback:logback-core:1.4.14'

latestDepTestImplementation group: 'org.springframework', name: 'spring-context', version: '6.+'
latestDepTestImplementation group: 'org.springframework', name: 'spring-test', version: '6.+'
latestDepTestImplementation group: 'org.springframework', name: 'spring-core', version: '6.+'
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.+'
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.+'
}
Loading