diff --git a/.gitignore b/.gitignore index c0dcdb32d1b..0a31bde39f8 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ out/ # Visual Studio Code # ###################### .vscode +.cursor # Others # ########## diff --git a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie index e479329c814..3bba510b412 100644 --- a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie +++ b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie @@ -334,6 +334,7 @@ 2 org.springframework.lang.* 2 org.springframework.messaging.* 0 org.springframework.messaging.handler.invocation.InvocableHandlerMethod +0 org.springframework.messaging.support.GenericMessage 2 org.springframework.objenesis.* 2 org.springframework.orm.* 2 org.springframework.remoting.* diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsAsyncClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsAsyncClientInstrumentation.java new file mode 100644 index 00000000000..8f8fa80851d --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsAsyncClientInstrumentation.java @@ -0,0 +1,67 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +/** + * Instrumentation for SqsAsyncClient receiveMessage calls to track when Spring-managed clients are + * making receive operations and mark the responses accordingly. + */ +@AutoService(InstrumenterModule.class) +public class SqsAsyncClientInstrumentation extends AbstractSqsInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.DefaultSqsAsyncClient"; + } + + @Override + public Map contextStore() { + Map contextStore = new java.util.HashMap<>(2); + contextStore.put("software.amazon.awssdk.services.sqs.SqsAsyncClient", Boolean.class.getName()); + // Map queue URL to Spring management status + contextStore.put("java.lang.String", "java.lang.Boolean"); + return contextStore; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument the receiveMessage method to map queue URLs to Spring management status + transformer.applyAdvice( + isMethod() + .and(named("receiveMessage")) + .and( + takesArgument( + 0, named("software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest"))), + getClass().getName() + "$ReceiveMessageAdvice"); + } + + public static class ReceiveMessageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void onExit( + @Advice.This SqsAsyncClient client, @Advice.Argument(0) ReceiveMessageRequest req) { + + Boolean isSpringClient = + InstrumentationContext.get(SqsAsyncClient.class, Boolean.class).get(client); + + if (Boolean.TRUE.equals(isSpringClient)) { + // Map the queue URL to Spring management status + final ContextStore queueUrlFlags = + InstrumentationContext.get(String.class, Boolean.class); + queueUrlFlags.put(req.queueUrl(), Boolean.TRUE); + } + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java index e0e00d33e7b..ea6c3eb87e9 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java @@ -1,14 +1,15 @@ package datadog.trace.instrumentation.aws.v2.sqs; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.InstrumenterConfig; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.List; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -44,8 +45,16 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return singletonMap( + Map contextStore = new java.util.HashMap<>(3); + // Keep original String context for backward compatibility with TracingExecutionInterceptor + contextStore.put( "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String"); + contextStore.put( + "software.amazon.awssdk.services.sqs.model.Message", + "datadog.trace.bootstrap.instrumentation.java.concurrent.State"); + // Map queue URL to Spring management status (shared with SqsAsyncClientInstrumentation) + contextStore.put("java.lang.String", "java.lang.Boolean"); + return contextStore; } @Override @@ -63,7 +72,20 @@ public static void onExit( String queueUrl = InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result); if (queueUrl != null) { - messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId()); + // Check if this queue URL is from a Spring-managed client + Boolean isFromSpringClient = + InstrumentationContext.get(String.class, Boolean.class).get(queueUrl); + + ContextStore messageStateStore = null; + if (Boolean.TRUE.equals(isFromSpringClient)) { + // Only continue span if message has been retrieved by spring-messaging. + // Only set messageStateStore for Spring clients + messageStateStore = InstrumentationContext.get(Message.class, State.class); + } + + messages = + new TracingList( + messageStateStore, messages, queueUrl, result.responseMetadata().requestId()); } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index 3991bfc63b1..854b421fba4 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -18,9 +18,11 @@ import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +31,18 @@ public class TracingIterator> implements Iterator { private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); + private final ContextStore messageStateStore; protected final L delegate; private final String queueUrl; private final String requestId; private AgentSpanContext batchContext; - public TracingIterator(L delegate, String queueUrl, String requestId) { + public TracingIterator( + ContextStore messageStateStore, + L delegate, + String queueUrl, + String requestId) { + this.messageStateStore = messageStateStore; this.delegate = delegate; this.queueUrl = queueUrl; this.requestId = requestId; @@ -99,6 +107,13 @@ protected void startNewMessageSpan(Message message) { BROKER_DECORATE.beforeFinish(queueSpan); queueSpan.finish(); } + + if (messageStateStore != null) { + // Capture state after data streams checkpoint is set for spring applications + State state = State.FACTORY.create(); + state.captureAndSetContinuation(span); + messageStateStore.put(message, state); + } } } catch (Exception e) { log.debug("Problem tracing new SQS message span", e); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java index abb0e637f27..340a0bd12dd 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -7,11 +9,17 @@ import software.amazon.awssdk.services.sqs.model.Message; public class TracingList implements List { + private final ContextStore messageStateStore; private final List delegate; private final String queueUrl; private final String requestId; - public TracingList(List delegate, String queueUrl, String requestId) { + public TracingList( + ContextStore messageStateStore, + List delegate, + String queueUrl, + String requestId) { + this.messageStateStore = messageStateStore; this.delegate = delegate; this.queueUrl = queueUrl; this.requestId = requestId; @@ -125,12 +133,14 @@ public ListIterator listIterator() { @Override public ListIterator listIterator(int index) { // every iteration will add spans. Not only the very first one - return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId); + return new TracingListIterator( + messageStateStore, delegate.listIterator(index), queueUrl, requestId); } @Override public List subList(int fromIndex, int toIndex) { - return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId); + return new TracingList( + messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId); } @Override diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java index 184d05815fc..5c0fac639e6 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java @@ -2,14 +2,20 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.ListIterator; import software.amazon.awssdk.services.sqs.model.Message; public class TracingListIterator extends TracingIterator> implements ListIterator { - public TracingListIterator(ListIterator delegate, String queueUrl, String requestId) { - super(delegate, queueUrl, requestId); + public TracingListIterator( + ContextStore messageStateStore, + ListIterator delegate, + String queueUrl, + String requestId) { + super(messageStateStore, delegate, queueUrl, requestId); } @Override diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy index fe196f12a6f..aa24de058f9 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/test/groovy/SqsClientTest.groovy @@ -198,7 +198,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { def message = Message.builder().messageAttributes(['_datadog': MessageAttributeValue.builder().dataType('String').stringValue( "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}" ).build()]).build() - def messages = new TracingList([message], + def messages = new TracingList(null, [message], "http://localhost:${address.port}/000000000000/somequeue", "00000000-0000-0000-0000-000000000000") @@ -241,7 +241,7 @@ abstract class SqsClientTest extends VersionedNamingTestBase { def message = Message.builder().messageAttributes(['_datadog': MessageAttributeValue.builder().dataType('Binary').binaryValue(SdkBytes.fromByteBuffer( headerValue )).build()]).build() - def messages = new TracingList([message], + def messages = new TracingList(null, [message], "http://localhost:${address.port}/000000000000/somequeue", "00000000-0000-0000-0000-000000000000") diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index edc4bcfe589..14fcfff86ed 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -8,15 +8,19 @@ import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE; import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND; import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER; +import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; import net.bytebuddy.asm.Advice; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -53,12 +57,33 @@ public String[] helperClassNames() { }; } + @Override + public Map contextStore() { + return singletonMap("org.springframework.messaging.Message", State.class.getName()); + } + public static class HandleMessageAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope onEnter( @Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message message) { AgentSpanContext parentContext; AgentSpan parent = activeSpan(); + + // First try to get context from continuation (preferred method) + State state = InstrumentationContext.get(Message.class, State.class).get(message); + if (null != state) { + AgentScope.Continuation continuation = state.getAndResetContinuation(); + if (null != continuation) { + try (AgentScope scope = continuation.activate()) { + AgentSpan span = startSpan(SPRING_INBOUND); + DECORATE.afterStart(span); + span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); + return activateSpan(span); + } + } + } + + // Fallback to existing context or header extraction if (null != parent) { // prefer existing context, assume it was already extracted from this message parentContext = parent.context(); diff --git a/dd-java-agent/instrumentation/spring/spring-sqs-3.0/build.gradle b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/build.gradle new file mode 100644 index 00000000000..23118bebd72 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/build.gradle @@ -0,0 +1,50 @@ +muzzle { + pass { + group = 'io.awspring.cloud' + module = 'spring-cloud-aws-sqs' + versions = "[3.0.0,)" + assertInverse = true + } +} + +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') + +["compileTestGroovy", "compileLatestDepTestGroovy"].each { name -> + tasks.named(name, GroovyCompile) { + javaLauncher = getJavaLauncherFor(17) + } +} + +dependencies { + compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162' + compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23' + + testImplementation project(':dd-java-agent:instrumentation:trace-annotation') + testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sdk-2.2') + testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sqs-2.0') + testImplementation project(':dd-java-agent:instrumentation:spring:spring-messaging-4.0') + + testImplementation group: 'org.springframework', name: 'spring-context', version: '6.1.10' + testImplementation group: 'org.springframework', name: 'spring-test', version: '6.1.10' + testImplementation group: 'org.springframework', name: 'spring-core', version: '6.1.10' + testImplementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.1.0' + testImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162' + testImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.20.162' + testImplementation group: 'org.testcontainers', name: 'localstack', version: libs.versions.testcontainers.get() + testImplementation 'org.slf4j:slf4j-api:2.0.13' + testImplementation 'ch.qos.logback:logback-classic:1.4.14' + testImplementation 'ch.qos.logback:logback-core:1.4.14' + + latestDepTestImplementation group: 'org.springframework', name: 'spring-context', version: '6.+' + latestDepTestImplementation group: 'org.springframework', name: 'spring-test', version: '6.+' + latestDepTestImplementation group: 'org.springframework', name: 'spring-core', version: '6.+' + latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.+' + latestDepTestImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.+' +} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java new file mode 100644 index 00000000000..559956bbdac --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java @@ -0,0 +1,78 @@ +package datadog.trace.instrumentation.springsqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.springframework.messaging.Message; + +@AutoService(InstrumenterModule.class) +public class AbstractMessagingMessageConverterToMessagingInstrumentation + extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + + public AbstractMessagingMessageConverterToMessagingInstrumentation() { + super("spring-sqs", "spring-sqs-3.0"); + } + + @Override + public String hierarchyMarkerType() { + return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("toMessagingMessage")), + getClass().getName() + "$ToMessagingMessageAdvice"); + } + + @Override + public Map contextStore() { + Map contextStore = new HashMap<>(2); + contextStore.put("software.amazon.awssdk.services.sqs.model.Message", State.class.getName()); + contextStore.put("org.springframework.messaging.Message", State.class.getName()); + return contextStore; + } + + public static class ToMessagingMessageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Object sqsMessage, @Advice.Return Message springMessage) { + // Transfer state from SQS message to Spring message + if (null != sqsMessage + && null != springMessage + && sqsMessage + .getClass() + .getName() + .equals("software.amazon.awssdk.services.sqs.model.Message")) { + + ContextStore from = + InstrumentationContext.get( + software.amazon.awssdk.services.sqs.model.Message.class, State.class); + State state = from.get((software.amazon.awssdk.services.sqs.model.Message) sqsMessage); + if (null != state) { + from.put((software.amazon.awssdk.services.sqs.model.Message) sqsMessage, null); + ContextStore to = InstrumentationContext.get(Message.class, State.class); + to.put(springMessage, state); + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/SqsMessageListenerContainerFactoryInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/SqsMessageListenerContainerFactoryInstrumentation.java new file mode 100644 index 00000000000..45894c886e8 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/main/java/datadog/trace/instrumentation/springsqs/SqsMessageListenerContainerFactoryInstrumentation.java @@ -0,0 +1,58 @@ +package datadog.trace.instrumentation.springsqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +/** + * Instrumentation for SqsMessageListenerContainerFactory to mark SqsAsyncClient instances as being + * used for Spring SQS message listening. + */ +@AutoService(InstrumenterModule.class) +public class SqsMessageListenerContainerFactoryInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SqsMessageListenerContainerFactoryInstrumentation() { + super("spring-sqs"); + } + + @Override + public String instrumentedType() { + return "io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory"; + } + + @Override + public Map contextStore() { + return singletonMap("software.amazon.awssdk.services.sqs.SqsAsyncClient", "java.lang.Boolean"); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument the setSqsAsyncClient method to mark the client as being used for Spring + transformer.applyAdvice( + isMethod() + .and(named("setSqsAsyncClient")) + .and(takesArgument(0, named("software.amazon.awssdk.services.sqs.SqsAsyncClient"))), + getClass().getName() + "$SetSqsAsyncClientAdvice"); + } + + public static class SetSqsAsyncClientAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) SqsAsyncClient sqsAsyncClient) { + if (sqsAsyncClient != null) { + // Mark this SqsAsyncClient as being used for Spring SQS + InstrumentationContext.get(SqsAsyncClient.class, Boolean.class) + .put(sqsAsyncClient, Boolean.TRUE); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/groovy/SpringSqsTest.groovy b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/groovy/SpringSqsTest.groovy new file mode 100644 index 00000000000..513fc469269 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/groovy/SpringSqsTest.groovy @@ -0,0 +1,188 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.agent.test.utils.PortUtils +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import +import org.springframework.stereotype.Component +import org.testcontainers.containers.localstack.LocalStackContainer +import org.testcontainers.utility.DockerImageName +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest +import software.amazon.awssdk.services.sqs.model.SendMessageRequest +import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration +import io.awspring.cloud.sqs.annotation.SqsListener +import spock.lang.Shared +import software.amazon.awssdk.services.sqs.SqsAsyncClient + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS + +class SpringSqsTest extends InstrumentationSpecification { + + static final String QUEUE_NAME = "test-queue" + + @Shared + LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:4.2.0")) + .withServices(SQS) + + @Override + def setupSpec() { + localstack.start() + + def endpoint = localstack.getEndpointOverride(SQS).toString() + + PortUtils.waitForPortToOpen( + localstack.getHost(), + localstack.getMappedPort(4566), + 10, + TimeUnit.SECONDS) + + def sqsClient = SqsAsyncClient.builder() + .endpointOverride(new URI(endpoint)) + .region(software.amazon.awssdk.regions.Region.US_EAST_1) + .credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("test", "test"))) + .build() + sqsClient.createQueue(CreateQueueRequest.builder().queueName(QUEUE_NAME).build()).get() + } + + @Override + def cleanupSpec() { + if (null != localstack) { + localstack.close() + } + } + + def "test basic SQS message send and receive"() { + setup: + def endpoint = localstack.getEndpointOverride(SQS).toString() + def sqsClient = SqsAsyncClient.builder() + .endpointOverride(new URI(endpoint)) + .region(software.amazon.awssdk.regions.Region.US_EAST_1) + .credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("test", "test"))) + .build() + def queueUrl = sqsClient + .getQueueUrl(GetQueueUrlRequest.builder().queueName(QUEUE_NAME).build()) + .get() + .queueUrl() + + + when: + + def latch = new CountDownLatch(1) + def received = new AtomicReference() + + TestConfig.endpoint = endpoint + TestConfig.latch = latch + TestConfig.received = received + + // Create a configuration class that properly sets up Spring SQS + def ctx = new AnnotationConfigApplicationContext(TestConfig) + + // various setup actions are traced + TEST_WRITER.waitForTraces(2) + TEST_WRITER.clear() + + runUnderTrace("parent") { + sqsClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueUrl).messageBody("hello").build()) + .get() + } + + then: + latch.await(2, TimeUnit.SECONDS) + received.get() == "hello" + assertTraces(3) { + trace(2, true) { + span(0) { + childOf(span(1)) + operationName "aws.http" + resourceName "Sqs.SendMessage" + spanType "http" + } + span(1) { + operationName "parent" + } + } + trace(2, true) { + span(0) { + operationName "aws.http" + resourceName "Sqs.ReceiveMessage" + spanType "queue" + } + span(1) { + // the main test is here. spring.consume needs to be the child of ReceiveMessage + childOf(span(0)) + operationName "spring.consume" + resourceName "TestListener.onMessage" + spanType "queue" + } + } + trace(1) { + span(0) { + operationName "aws.http" + resourceName "Sqs.DeleteMessageBatch" + spanType "http" + } + } + } + + cleanup: + ctx?.close() + sqsClient?.close() + } + + @Configuration + @Import(SqsBootstrapConfiguration) + static class TestConfig { + static String endpoint + static CountDownLatch latch + static AtomicReference received + + @Bean + SqsAsyncClient sqsAsyncClient() { + return SqsAsyncClient.builder() + .endpointOverride(new URI(endpoint)) + .region(software.amazon.awssdk.regions.Region.US_EAST_1) + .credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("test", "test"))) + .build() + } + + @Bean + SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { + def factory = new SqsMessageListenerContainerFactory() + factory.setSqsAsyncClient(sqsAsyncClient()) + return factory + } + + @Bean + TestListener testListener() { + return new TestListener(latch, received) + } + } + + @Component + static class TestListener { + private final CountDownLatch latch + private final AtomicReference received + + TestListener(CountDownLatch latch, AtomicReference received) { + this.latch = latch + this.received = received + } + + @SqsListener(queueNames = QUEUE_NAME) + void onMessage(String payload) { + received.set(payload) + latch.countDown() + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/resources/logback-test.xml b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..590e3e87316 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs-3.0/src/test/resources/logback-test.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n + + + + + + + + + + + diff --git a/settings.gradle.kts b/settings.gradle.kts index 3f5b677ed2b..839643c4b70 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -547,6 +547,7 @@ include( ":dd-java-agent:instrumentation:spring:spring-jms-3.1", ":dd-java-agent:instrumentation:spring:spring-messaging-4.0", ":dd-java-agent:instrumentation:spring:spring-rabbit-1.5", + ":dd-java-agent:instrumentation:spring:spring-sqs-3.0", ":dd-java-agent:instrumentation:spring:spring-scheduling-3.1", ":dd-java-agent:instrumentation:spring:spring-security:spring-security-5.0", ":dd-java-agent:instrumentation:spring:spring-security:spring-security-6.0",