From e60984153d7be5fbdb46e40a13e7a0f3fd3c0044 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 29 Sep 2025 10:16:48 -0600 Subject: [PATCH 1/3] instrument sqs spring context propagation --- .../aws/v1/sqs/TracingIterator.java | 1 + .../aws/v2/sqs/SqsMessageInstrumentation.java | 56 +++++++++ ...ssageSourceToMessagingInstrumentation.java | 110 ++++++++++++++++++ .../SpringMessageHandlerInstrumentation.java | 78 ++++++++++++- ...oSpringMessageTransferInstrumentation.java | 105 +++++++++++++++++ 5 files changed, 347 insertions(+), 3 deletions(-) create mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java create mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java index 98aeca1ec84..a6ac3ea538d 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java @@ -89,6 +89,7 @@ protected void startNewMessageSpan(Message message) { DataStreamsTags tags = create("sqs", INBOUND, urlFileName(queueUrl)); AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(tags, 0, 0)); + System.out.println("Setting a checkpoint in thread" + Thread.currentThread().getId()); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, queueUrl); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java new file mode 100644 index 00000000000..3884445ed9a --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java @@ -0,0 +1,56 @@ +package datadog.trace.instrumentation.aws.v2.sqs; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.services.sqs.model.Message; + +@AutoService(InstrumenterModule.class) +public class SqsMessageInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SqsMessageInstrumentation() { + super("aws-java-sqs-2.0"); + } + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.services.sqs.model.Message"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureActiveScope"); + } + + @Override + public Map contextStore() { + return singletonMap("software.amazon.awssdk.services.sqs.model.Message", State.class.getName()); + } + + public static class CaptureActiveScope { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void captureActiveScope(@Advice.This Message message) { + AgentSpan span = activeSpan(); + if (span != null) { + State state = State.FACTORY.create(); + state.captureAndSetContinuation(span); + InstrumentationContext.get(Message.class, State.class).put(message, state); + System.out.println("[SQS] Captured state for SQS message: " + message.messageId() + + " with span: " + span.getSpanId() + " on thread: " + Thread.currentThread().getId()); + } else { + System.out.println("[SQS] No active span found when creating SQS message: " + + message.messageId() + " on thread: " + Thread.currentThread().getId()); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java new file mode 100644 index 00000000000..dd7215597db --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java @@ -0,0 +1,110 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import java.util.TreeMap; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.springframework.messaging.Message; + +// @AutoService(InstrumenterModule.class) // Temporarily disabled to test SqsToSpringMessageTransferInstrumentation +public class AbstractMessageConvertingMessageSourceToMessagingInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + + public AbstractMessageConvertingMessageSourceToMessagingInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public String hierarchyMarkerType() { + return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument toMessagingMessage method + transformer.applyAdvice( + named("toMessagingMessage"), + getClass().getName() + "$ToMessagingMessageAdvice"); + + } + + @Override + public Map contextStore() { + Map contextStore = new TreeMap<>(); + // contextStore.put("Object", State.class.getName()); + // contextStore.put("org.springframework.messaging.Message", State.class.getName()); + return contextStore; + } + + public static class ToMessagingMessageAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) Object sqsMessage, @Advice.This Object converter) { + System.out.println("[ToMessaging] toMessagingMessage called with SQS message: " + + sqsMessage + " on thread: " + Thread.currentThread().getId()); + + // Print the actual child class being used + System.out.println("[ToMessaging] Converter class: " + converter.getClass().getName()); + System.out.println("[ToMessaging] Converter class hierarchy:"); + Class currentClass = converter.getClass(); + int level = 0; + while (currentClass != null && level < 3) { + System.out.println("[ToMessaging] Level " + level + ": " + currentClass.getName()); + currentClass = currentClass.getSuperclass(); + level++; + } + + // Print stack trace to see the call flow + System.out.println("[ToMessaging] Stack trace:"); + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + for (int i = 0; i < Math.min(stackTrace.length, 15); i++) { + System.out.println("[ToMessaging] at " + stackTrace[i]); + } + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Object sqsMessage, + @Advice.Return Message springMessage) { + System.out.println("[ToMessaging] toMessagingMessage completed - SQS: " + sqsMessage + + " -> Spring: " + springMessage + " on thread: " + Thread.currentThread().getId()); + + // Transfer state from SQS message to Spring message + // if (null != sqsMessage && null != springMessage && + // sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { + // + // ContextStore from = + // InstrumentationContext.get(Object.class, State.class); + // State state = from.get(sqsMessage); + // if (null != state) { + // from.put(sqsMessage, null); + // // InstrumentationContext.get(Message.class, State.class).put(springMessage, state); + // System.out.println("[ToMessaging] Transferred state from SQS message to Spring message on thread: " + + // Thread.currentThread().getId()); + // } else { + // System.out.println("[ToMessaging] No state found in SQS message during conversion on thread: " + + // Thread.currentThread().getId()); + // } + // } else { + // System.out.println("[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " + + // Thread.currentThread().getId()); + // } + } + } +} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index edc4bcfe589..dce4f7c5fb3 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -1,22 +1,28 @@ package datadog.trace.instrumentation.springmessaging; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.api.datastreams.DataStreamsContext.create; +import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*; import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE; import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND; import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER; +import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; import net.bytebuddy.asm.Advice; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -53,22 +59,88 @@ public String[] helperClassNames() { }; } + @Override + public Map contextStore() { + return singletonMap("org.springframework.messaging.Message", State.class.getName()); + } + public static class HandleMessageAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AgentScope onEnter( @Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message message) { AgentSpanContext parentContext; AgentSpan parent = activeSpan(); + + // First try to get context from continuation (preferred method) + State state = InstrumentationContext.get(Message.class, State.class).get(message); + if (null != state) { + System.out.println("[Spring] Found state in Spring message, attempting to activate continuation on thread: " + + Thread.currentThread().getId()); + AgentScope.Continuation continuation = state.getAndResetContinuation(); + if (null != continuation) { + try (AgentScope scope = continuation.activate()) { + AgentSpan span = startSpan(SPRING_INBOUND); + DECORATE.afterStart(span); + span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); + System.out.println("[Spring] Successfully activated continuation from Spring Message with span: " + + span.getSpanId() + " on thread: " + Thread.currentThread().getId()); + return activateSpan(span); + } + } else { + System.out.println("[Spring] No continuation found in state on thread: " + Thread.currentThread().getId()); + } + } else { + System.out.println("[Spring] No state found in Spring message 2, falling back to header extraction on thread: " + + Thread.currentThread().getId()); + } + + // Fallback to existing context or header extraction if (null != parent) { // prefer existing context, assume it was already extracted from this message parentContext = parent.context(); + System.out.println("[Spring] Using existing active span context on thread: " + Thread.currentThread().getId()); } else { // otherwise try to re-extract the message context to avoid disconnected trace parentContext = extractContextAndGetSpanContext(message, GETTER); + System.out.println("[Spring] Extracted context from message headers on thread: " + Thread.currentThread().getId()); } + AgentSpan span = startSpan(SPRING_INBOUND, parentContext); DECORATE.afterStart(span); span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); + + // Extract SQS queue information - try different header patterns + Object queueUrl = message.getHeaders().get("Sqs_QueueUrl"); + Object queueName = message.getHeaders().get("Sqs_QueueName"); + + // If not found in Sqs_ prefixed headers, try aws. prefixed headers + if (queueUrl == null) { + queueUrl = message.getHeaders().get("aws.queue.url"); + } + if (queueName == null) { + queueName = message.getHeaders().get("aws.queue.name"); + } + + // If still not found, try to extract from QueueAttributes + if (queueUrl == null || queueName == null) { + Object queueAttributes = message.getHeaders().get("Sqs_QueueAttributes"); + if (queueAttributes != null) { + String attributesStr = queueAttributes.toString(); + // Extract queue name from attributes if available + if (queueName == null && attributesStr.contains("queueName=")) { + queueName = attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0]; + } + } + } + + // Add SQS queue tags to the span + if (queueUrl != null) { + span.setTag("aws.sqs.queue_url", queueUrl.toString()); + } + if (queueName != null) { + span.setTag("aws.sqs.queue_name", queueName.toString()); + } + return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java new file mode 100644 index 00000000000..4e9fbd94034 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java @@ -0,0 +1,105 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import java.util.TreeMap; +import net.bytebuddy.asm.Advice; +import org.springframework.messaging.Message; + +@AutoService(InstrumenterModule.class) +public class SqsToSpringMessageTransferInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SqsToSpringMessageTransferInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public String instrumentedType() { + return "io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Try to instrument toMessagingMessage method + transformer.applyAdvice( + named("toMessagingMessage"), + getClass().getName() + "$TransferState"); + + // Instrument constructor to see if class is being instantiated + transformer.applyAdvice( + isConstructor(), + getClass().getName() + "$ConstructorAdvice"); + + // Also try to instrument any method to see what's available + transformer.applyAdvice( + net.bytebuddy.matcher.ElementMatchers.any(), + getClass().getName() + "$AnyMethodAdvice"); + } + + @Override + public Map contextStore() { + Map contextStore = new TreeMap<>(); + // contextStore.put("Object", State.class.getName()); + // contextStore.put("org.springframework.messaging.Message", State.class.getName()); + return contextStore; + } + + public static class TransferState { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void transfer( + @Advice.Argument(0) Object sqsMessage, + @Advice.Return Message springMessage) { + System.out.println("[SQS->Spring] Transferred state from SQS message to Spring message on thread: " + + Thread.currentThread().getId()); + + //if (null != sqsMessage && null != springMessage && + // sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { + // ContextStore from = + // InstrumentationContext.get(Object.class, State.class); + // State state = from.get(sqsMessage); + // if (null != state) { + // from.put(sqsMessage, null); + // InstrumentationContext.get(Message.class, State.class).put(springMessage, state); + // System.out.println("[SQS->Spring] Transferred state from SQS message to Spring message on thread: " + + // Thread.currentThread().getId()); + // } else { + // System.out.println("[SQS->Spring] No state found in SQS message during conversion on thread: " + + // Thread.currentThread().getId()); + // } + //} else { + // System.out.println("[SQS->Spring] Skipping transfer - not an SQS message or null message on thread: " + + // Thread.currentThread().getId()); + //} + } + } + + public static class ConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstructor(@Advice.This Object converter) { + System.out.println("[SQS->Spring] SqsMessagingMessageConverter constructor called on thread: " + + Thread.currentThread().getId() + " - " + converter.getClass().getName()); + } + } + + public static class AnyMethodAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Origin String method) { + System.out.println("[SQS->Spring] Method called: " + method + " on thread: " + Thread.currentThread().getId()); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Origin String method) { + System.out.println("[SQS->Spring] Method completed: " + method + " on thread: " + Thread.currentThread().getId()); + } + } +} From 76ad3bdc462e1fb9a9d2b114c906b31d72eb90d1 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 29 Sep 2025 13:30:11 -0600 Subject: [PATCH 2/3] working capture of SQS context --- .gitignore | 1 + ...ssageSourceToMessagingInstrumentation.java | 110 ------------------ ...oSpringMessageTransferInstrumentation.java | 105 ----------------- .../spring/spring-sqs/build.gradle | 25 ++++ ...geConverterToMessagingInstrumentation.java | 86 ++++++++++++++ settings.gradle.kts | 1 + 6 files changed, 113 insertions(+), 215 deletions(-) delete mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java delete mode 100644 dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java create mode 100644 dd-java-agent/instrumentation/spring/spring-sqs/build.gradle create mode 100644 dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java diff --git a/.gitignore b/.gitignore index c0dcdb32d1b..0a31bde39f8 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ out/ # Visual Studio Code # ###################### .vscode +.cursor # Others # ########## diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java deleted file mode 100644 index dd7215597db..00000000000 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/AbstractMessageConvertingMessageSourceToMessagingInstrumentation.java +++ /dev/null @@ -1,110 +0,0 @@ -package datadog.trace.instrumentation.springmessaging; - -import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; -import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; - -import com.google.auto.service.AutoService; -import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.ContextStore; -import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.java.concurrent.State; -import java.util.Map; -import java.util.TreeMap; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.springframework.messaging.Message; - -// @AutoService(InstrumenterModule.class) // Temporarily disabled to test SqsToSpringMessageTransferInstrumentation -public class AbstractMessageConvertingMessageSourceToMessagingInstrumentation extends InstrumenterModule.Tracing - implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - - public AbstractMessageConvertingMessageSourceToMessagingInstrumentation() { - super("spring-messaging", "spring-messaging-4"); - } - - @Override - public String hierarchyMarkerType() { - return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter"; - } - - @Override - public ElementMatcher hierarchyMatcher() { - return extendsClass(named(hierarchyMarkerType())); - } - - @Override - public void methodAdvice(MethodTransformer transformer) { - // Instrument toMessagingMessage method - transformer.applyAdvice( - named("toMessagingMessage"), - getClass().getName() + "$ToMessagingMessageAdvice"); - - } - - @Override - public Map contextStore() { - Map contextStore = new TreeMap<>(); - // contextStore.put("Object", State.class.getName()); - // contextStore.put("org.springframework.messaging.Message", State.class.getName()); - return contextStore; - } - - public static class ToMessagingMessageAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(0) Object sqsMessage, @Advice.This Object converter) { - System.out.println("[ToMessaging] toMessagingMessage called with SQS message: " + - sqsMessage + " on thread: " + Thread.currentThread().getId()); - - // Print the actual child class being used - System.out.println("[ToMessaging] Converter class: " + converter.getClass().getName()); - System.out.println("[ToMessaging] Converter class hierarchy:"); - Class currentClass = converter.getClass(); - int level = 0; - while (currentClass != null && level < 3) { - System.out.println("[ToMessaging] Level " + level + ": " + currentClass.getName()); - currentClass = currentClass.getSuperclass(); - level++; - } - - // Print stack trace to see the call flow - System.out.println("[ToMessaging] Stack trace:"); - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - for (int i = 0; i < Math.min(stackTrace.length, 15); i++) { - System.out.println("[ToMessaging] at " + stackTrace[i]); - } - } - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit( - @Advice.Argument(0) Object sqsMessage, - @Advice.Return Message springMessage) { - System.out.println("[ToMessaging] toMessagingMessage completed - SQS: " + sqsMessage + - " -> Spring: " + springMessage + " on thread: " + Thread.currentThread().getId()); - - // Transfer state from SQS message to Spring message - // if (null != sqsMessage && null != springMessage && - // sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { - // - // ContextStore from = - // InstrumentationContext.get(Object.class, State.class); - // State state = from.get(sqsMessage); - // if (null != state) { - // from.put(sqsMessage, null); - // // InstrumentationContext.get(Message.class, State.class).put(springMessage, state); - // System.out.println("[ToMessaging] Transferred state from SQS message to Spring message on thread: " + - // Thread.currentThread().getId()); - // } else { - // System.out.println("[ToMessaging] No state found in SQS message during conversion on thread: " + - // Thread.currentThread().getId()); - // } - // } else { - // System.out.println("[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " + - // Thread.currentThread().getId()); - // } - } - } -} diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java deleted file mode 100644 index 4e9fbd94034..00000000000 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SqsToSpringMessageTransferInstrumentation.java +++ /dev/null @@ -1,105 +0,0 @@ -package datadog.trace.instrumentation.springmessaging; - -import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; - -import com.google.auto.service.AutoService; -import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.ContextStore; -import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.java.concurrent.State; -import java.util.Map; -import java.util.TreeMap; -import net.bytebuddy.asm.Advice; -import org.springframework.messaging.Message; - -@AutoService(InstrumenterModule.class) -public class SqsToSpringMessageTransferInstrumentation extends InstrumenterModule.Tracing - implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { - - public SqsToSpringMessageTransferInstrumentation() { - super("spring-messaging", "spring-messaging-4"); - } - - @Override - public String instrumentedType() { - return "io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter"; - } - - @Override - public void methodAdvice(MethodTransformer transformer) { - // Try to instrument toMessagingMessage method - transformer.applyAdvice( - named("toMessagingMessage"), - getClass().getName() + "$TransferState"); - - // Instrument constructor to see if class is being instantiated - transformer.applyAdvice( - isConstructor(), - getClass().getName() + "$ConstructorAdvice"); - - // Also try to instrument any method to see what's available - transformer.applyAdvice( - net.bytebuddy.matcher.ElementMatchers.any(), - getClass().getName() + "$AnyMethodAdvice"); - } - - @Override - public Map contextStore() { - Map contextStore = new TreeMap<>(); - // contextStore.put("Object", State.class.getName()); - // contextStore.put("org.springframework.messaging.Message", State.class.getName()); - return contextStore; - } - - public static class TransferState { - @Advice.OnMethodExit(suppress = Throwable.class) - public static void transfer( - @Advice.Argument(0) Object sqsMessage, - @Advice.Return Message springMessage) { - System.out.println("[SQS->Spring] Transferred state from SQS message to Spring message on thread: " + - Thread.currentThread().getId()); - - //if (null != sqsMessage && null != springMessage && - // sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { - // ContextStore from = - // InstrumentationContext.get(Object.class, State.class); - // State state = from.get(sqsMessage); - // if (null != state) { - // from.put(sqsMessage, null); - // InstrumentationContext.get(Message.class, State.class).put(springMessage, state); - // System.out.println("[SQS->Spring] Transferred state from SQS message to Spring message on thread: " + - // Thread.currentThread().getId()); - // } else { - // System.out.println("[SQS->Spring] No state found in SQS message during conversion on thread: " + - // Thread.currentThread().getId()); - // } - //} else { - // System.out.println("[SQS->Spring] Skipping transfer - not an SQS message or null message on thread: " + - // Thread.currentThread().getId()); - //} - } - } - - public static class ConstructorAdvice { - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onConstructor(@Advice.This Object converter) { - System.out.println("[SQS->Spring] SqsMessagingMessageConverter constructor called on thread: " + - Thread.currentThread().getId() + " - " + converter.getClass().getName()); - } - } - - public static class AnyMethodAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Origin String method) { - System.out.println("[SQS->Spring] Method called: " + method + " on thread: " + Thread.currentThread().getId()); - } - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void onExit(@Advice.Origin String method) { - System.out.println("[SQS->Spring] Method completed: " + method + " on thread: " + Thread.currentThread().getId()); - } - } -} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs/build.gradle b/dd-java-agent/instrumentation/spring/spring-sqs/build.gradle new file mode 100644 index 00000000000..82c44601c81 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs/build.gradle @@ -0,0 +1,25 @@ +muzzle { + pass { + group = 'io.awspring.cloud' + module = 'spring-cloud-aws-sqs' + versions = "[3.0.0,)" + assertInverse = true + } +} + +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') + +[compileTestGroovy, compileLatestDepTestGroovy].each { + it.javaLauncher = getJavaLauncherFor(17) +} + +dependencies { + compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162' + compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23' +} diff --git a/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java new file mode 100644 index 00000000000..18a91aefb42 --- /dev/null +++ b/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java @@ -0,0 +1,86 @@ +package datadog.trace.instrumentation.springsqs; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.util.Map; +import java.util.HashMap; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.springframework.messaging.Message; + +@AutoService(InstrumenterModule.class) +public class AbstractMessagingMessageConverterToMessagingInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + + public AbstractMessagingMessageConverterToMessagingInstrumentation() { + super("spring-sqs"); + } + + @Override + public String hierarchyMarkerType() { + return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument toMessagingMessage method + transformer.applyAdvice( + named("toMessagingMessage"), + getClass().getName() + "$ToMessagingMessageAdvice"); + + } + + @Override + public Map contextStore() { + Map contextStore = new HashMap<>(2); + contextStore.put("software.amazon.awssdk.services.sqs.model.Message", State.class.getName()); + contextStore.put("org.springframework.messaging.Message", State.class.getName()); + return contextStore; + } + + public static class ToMessagingMessageAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Object sqsMessage, + @Advice.Return Message springMessage) { + // Transfer state from SQS message to Spring message + if (null != sqsMessage && null != springMessage && + sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { + + ContextStore from = + InstrumentationContext.get(software.amazon.awssdk.services.sqs.model.Message.class, State.class); + State state = from.get((software.amazon.awssdk.services.sqs.model.Message) sqsMessage); + if (null != state) { + from.put((software.amazon.awssdk.services.sqs.model.Message) sqsMessage, null); + // Transfer state from SQS Message to Spring Message + ContextStore to = + InstrumentationContext.get(Message.class, State.class); + to.put(springMessage, state); + System.out.println("[ToMessaging] Transferred state from SQS message to Spring message on thread: " + + Thread.currentThread().getId()); + } else { + System.out.println("[ToMessaging] No state found in SQS message during conversion on thread: " + + Thread.currentThread().getId()); + } + } else { + System.out.println("[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " + + Thread.currentThread().getId()); + } + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 0db78b03163..a3c74aa48f1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -541,6 +541,7 @@ include( ":dd-java-agent:instrumentation:spring:spring-jms-3.1", ":dd-java-agent:instrumentation:spring:spring-messaging-4.0", ":dd-java-agent:instrumentation:spring:spring-rabbit-1.5", + ":dd-java-agent:instrumentation:spring:spring-sqs", ":dd-java-agent:instrumentation:spring:spring-scheduling-3.1", ":dd-java-agent:instrumentation:spring:spring-security:spring-security-5.0", ":dd-java-agent:instrumentation:spring:spring-security:spring-security-6.0", From 6575d6769fbf029170380a8c4a844e3980115a17 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 29 Sep 2025 14:16:39 -0600 Subject: [PATCH 3/3] working version with data streams propagation --- .../aws/v2/sqs/SqsMessageInstrumentation.java | 56 ------------------- .../sqs/SqsReceiveResultInstrumentation.java | 16 +++++- .../aws/v2/sqs/TracingIterator.java | 26 ++++++++- .../aws/v2/sqs/TracingList.java | 16 +++++- .../aws/v2/sqs/TracingListIterator.java | 10 +++- .../SpringMessageHandlerInstrumentation.java | 52 +++++++++-------- ...geConverterToMessagingInstrumentation.java | 45 ++++++++------- 7 files changed, 113 insertions(+), 108 deletions(-) delete mode 100644 dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java deleted file mode 100644 index 3884445ed9a..00000000000 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java +++ /dev/null @@ -1,56 +0,0 @@ -package datadog.trace.instrumentation.aws.v2.sqs; - -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; -import static java.util.Collections.singletonMap; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; - -import com.google.auto.service.AutoService; -import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.java.concurrent.State; -import java.util.Map; -import net.bytebuddy.asm.Advice; -import software.amazon.awssdk.services.sqs.model.Message; - -@AutoService(InstrumenterModule.class) -public class SqsMessageInstrumentation extends InstrumenterModule.Tracing - implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { - - public SqsMessageInstrumentation() { - super("aws-java-sqs-2.0"); - } - - @Override - public String instrumentedType() { - return "software.amazon.awssdk.services.sqs.model.Message"; - } - - @Override - public void methodAdvice(MethodTransformer transformer) { - transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureActiveScope"); - } - - @Override - public Map contextStore() { - return singletonMap("software.amazon.awssdk.services.sqs.model.Message", State.class.getName()); - } - - public static class CaptureActiveScope { - @Advice.OnMethodExit(suppress = Throwable.class) - public static void captureActiveScope(@Advice.This Message message) { - AgentSpan span = activeSpan(); - if (span != null) { - State state = State.FACTORY.create(); - state.captureAndSetContinuation(span); - InstrumentationContext.get(Message.class, State.class).put(message, state); - System.out.println("[SQS] Captured state for SQS message: " + message.messageId() + - " with span: " + span.getSpanId() + " on thread: " + Thread.currentThread().getId()); - } else { - System.out.println("[SQS] No active span found when creating SQS message: " + - message.messageId() + " on thread: " + Thread.currentThread().getId()); - } - } - } -} diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java index e0e00d33e7b..53d2a4f5736 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java @@ -1,14 +1,15 @@ package datadog.trace.instrumentation.aws.v2.sqs; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.InstrumenterConfig; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.List; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -44,8 +45,13 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return singletonMap( + Map contextStore = new java.util.HashMap<>(2); + contextStore.put( "software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String"); + contextStore.put( + "software.amazon.awssdk.services.sqs.model.Message", + "datadog.trace.bootstrap.instrumentation.java.concurrent.State"); + return contextStore; } @Override @@ -63,7 +69,11 @@ public static void onExit( String queueUrl = InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result); if (queueUrl != null) { - messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId()); + ContextStore messageStateStore = + InstrumentationContext.get(Message.class, State.class); + messages = + new TracingList( + messageStateStore, messages, queueUrl, result.responseMetadata().requestId()); } } } diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index 3991bfc63b1..d597d93e31c 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -18,9 +18,11 @@ import datadog.trace.api.Config; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +31,18 @@ public class TracingIterator> implements Iterator { private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); + private final ContextStore messageStateStore; protected final L delegate; private final String queueUrl; private final String requestId; private AgentSpanContext batchContext; - public TracingIterator(L delegate, String queueUrl, String requestId) { + public TracingIterator( + ContextStore messageStateStore, + L delegate, + String queueUrl, + String requestId) { + this.messageStateStore = messageStateStore; this.delegate = delegate; this.queueUrl = queueUrl; this.requestId = requestId; @@ -99,6 +107,22 @@ protected void startNewMessageSpan(Message message) { BROKER_DECORATE.beforeFinish(queueSpan); queueSpan.finish(); } + + // Capture state after data streams checkpoint is set + try { + State state = State.FACTORY.create(); + state.captureAndSetContinuation(span); + messageStateStore.put(message, state); + System.out.println( + "[TracingIterator] Captured state for SQS message: " + + message.messageId() + + " with span: " + + span.getSpanId() + + " on thread: " + + Thread.currentThread().getId()); + } catch (Exception stateException) { + log.debug("Problem capturing state for SQS message", stateException); + } } } catch (Exception e) { log.debug("Problem tracing new SQS message span", e); diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java index abb0e637f27..340a0bd12dd 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java @@ -1,5 +1,7 @@ package datadog.trace.instrumentation.aws.v2.sqs; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -7,11 +9,17 @@ import software.amazon.awssdk.services.sqs.model.Message; public class TracingList implements List { + private final ContextStore messageStateStore; private final List delegate; private final String queueUrl; private final String requestId; - public TracingList(List delegate, String queueUrl, String requestId) { + public TracingList( + ContextStore messageStateStore, + List delegate, + String queueUrl, + String requestId) { + this.messageStateStore = messageStateStore; this.delegate = delegate; this.queueUrl = queueUrl; this.requestId = requestId; @@ -125,12 +133,14 @@ public ListIterator listIterator() { @Override public ListIterator listIterator(int index) { // every iteration will add spans. Not only the very first one - return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId); + return new TracingListIterator( + messageStateStore, delegate.listIterator(index), queueUrl, requestId); } @Override public List subList(int fromIndex, int toIndex) { - return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId); + return new TracingList( + messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId); } @Override diff --git a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java index 184d05815fc..5c0fac639e6 100644 --- a/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java +++ b/dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java @@ -2,14 +2,20 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.ListIterator; import software.amazon.awssdk.services.sqs.model.Message; public class TracingListIterator extends TracingIterator> implements ListIterator { - public TracingListIterator(ListIterator delegate, String queueUrl, String requestId) { - super(delegate, queueUrl, requestId); + public TracingListIterator( + ContextStore messageStateStore, + ListIterator delegate, + String queueUrl, + String requestId) { + super(messageStateStore, delegate, queueUrl, requestId); } @Override diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java index dce4f7c5fb3..bb600d57f3a 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -1,8 +1,6 @@ package datadog.trace.instrumentation.springmessaging; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static datadog.trace.api.datastreams.DataStreamsContext.create; -import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*; import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE; @@ -15,12 +13,10 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.java.concurrent.State; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -70,49 +66,60 @@ public static AgentScope onEnter( @Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message message) { AgentSpanContext parentContext; AgentSpan parent = activeSpan(); - + // First try to get context from continuation (preferred method) State state = InstrumentationContext.get(Message.class, State.class).get(message); if (null != state) { - System.out.println("[Spring] Found state in Spring message, attempting to activate continuation on thread: " + - Thread.currentThread().getId()); + System.out.println( + "[Spring] Found state in Spring message, attempting to activate continuation on thread: " + + Thread.currentThread().getId()); AgentScope.Continuation continuation = state.getAndResetContinuation(); if (null != continuation) { try (AgentScope scope = continuation.activate()) { AgentSpan span = startSpan(SPRING_INBOUND); DECORATE.afterStart(span); span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); - System.out.println("[Spring] Successfully activated continuation from Spring Message with span: " + - span.getSpanId() + " on thread: " + Thread.currentThread().getId()); + System.out.println( + "[Spring] Successfully activated continuation from Spring Message with span: " + + span.getSpanId() + + " on thread: " + + Thread.currentThread().getId()); return activateSpan(span); } } else { - System.out.println("[Spring] No continuation found in state on thread: " + Thread.currentThread().getId()); + System.out.println( + "[Spring] No continuation found in state on thread: " + + Thread.currentThread().getId()); } } else { - System.out.println("[Spring] No state found in Spring message 2, falling back to header extraction on thread: " + - Thread.currentThread().getId()); + System.out.println( + "[Spring] No state found in Spring message 2, falling back to header extraction on thread: " + + Thread.currentThread().getId()); } - + // Fallback to existing context or header extraction if (null != parent) { // prefer existing context, assume it was already extracted from this message parentContext = parent.context(); - System.out.println("[Spring] Using existing active span context on thread: " + Thread.currentThread().getId()); + System.out.println( + "[Spring] Using existing active span context on thread: " + + Thread.currentThread().getId()); } else { // otherwise try to re-extract the message context to avoid disconnected trace parentContext = extractContextAndGetSpanContext(message, GETTER); - System.out.println("[Spring] Extracted context from message headers on thread: " + Thread.currentThread().getId()); + System.out.println( + "[Spring] Extracted context from message headers on thread: " + + Thread.currentThread().getId()); } - + AgentSpan span = startSpan(SPRING_INBOUND, parentContext); DECORATE.afterStart(span); span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); - + // Extract SQS queue information - try different header patterns Object queueUrl = message.getHeaders().get("Sqs_QueueUrl"); Object queueName = message.getHeaders().get("Sqs_QueueName"); - + // If not found in Sqs_ prefixed headers, try aws. prefixed headers if (queueUrl == null) { queueUrl = message.getHeaders().get("aws.queue.url"); @@ -120,7 +127,7 @@ public static AgentScope onEnter( if (queueName == null) { queueName = message.getHeaders().get("aws.queue.name"); } - + // If still not found, try to extract from QueueAttributes if (queueUrl == null || queueName == null) { Object queueAttributes = message.getHeaders().get("Sqs_QueueAttributes"); @@ -128,11 +135,12 @@ public static AgentScope onEnter( String attributesStr = queueAttributes.toString(); // Extract queue name from attributes if available if (queueName == null && attributesStr.contains("queueName=")) { - queueName = attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0]; + queueName = + attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0]; } } } - + // Add SQS queue tags to the span if (queueUrl != null) { span.setTag("aws.sqs.queue_url", queueUrl.toString()); @@ -140,7 +148,7 @@ public static AgentScope onEnter( if (queueName != null) { span.setTag("aws.sqs.queue_name", queueName.toString()); } - + return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java index 18a91aefb42..22a212b7c80 100644 --- a/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java @@ -2,8 +2,6 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; @@ -11,15 +9,16 @@ import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.java.concurrent.State; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.springframework.messaging.Message; @AutoService(InstrumenterModule.class) -public class AbstractMessagingMessageConverterToMessagingInstrumentation extends InstrumenterModule.Tracing +public class AbstractMessagingMessageConverterToMessagingInstrumentation + extends InstrumenterModule.Tracing implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { public AbstractMessagingMessageConverterToMessagingInstrumentation() { @@ -40,9 +39,7 @@ public ElementMatcher hierarchyMatcher() { public void methodAdvice(MethodTransformer transformer) { // Instrument toMessagingMessage method transformer.applyAdvice( - named("toMessagingMessage"), - getClass().getName() + "$ToMessagingMessageAdvice"); - + named("toMessagingMessage"), getClass().getName() + "$ToMessagingMessageAdvice"); } @Override @@ -56,30 +53,36 @@ public Map contextStore() { public static class ToMessagingMessageAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( - @Advice.Argument(0) Object sqsMessage, - @Advice.Return Message springMessage) { + @Advice.Argument(0) Object sqsMessage, @Advice.Return Message springMessage) { // Transfer state from SQS message to Spring message - if (null != sqsMessage && null != springMessage && - sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) { - + if (null != sqsMessage + && null != springMessage + && sqsMessage + .getClass() + .getName() + .equals("software.amazon.awssdk.services.sqs.model.Message")) { + ContextStore from = - InstrumentationContext.get(software.amazon.awssdk.services.sqs.model.Message.class, State.class); + InstrumentationContext.get( + software.amazon.awssdk.services.sqs.model.Message.class, State.class); State state = from.get((software.amazon.awssdk.services.sqs.model.Message) sqsMessage); if (null != state) { from.put((software.amazon.awssdk.services.sqs.model.Message) sqsMessage, null); // Transfer state from SQS Message to Spring Message - ContextStore to = - InstrumentationContext.get(Message.class, State.class); + ContextStore to = InstrumentationContext.get(Message.class, State.class); to.put(springMessage, state); - System.out.println("[ToMessaging] Transferred state from SQS message to Spring message on thread: " + - Thread.currentThread().getId()); + System.out.println( + "[ToMessaging] Transferred state from SQS message to Spring message on thread: " + + Thread.currentThread().getId()); } else { - System.out.println("[ToMessaging] No state found in SQS message during conversion on thread: " + - Thread.currentThread().getId()); + System.out.println( + "[ToMessaging] No state found in SQS message during conversion on thread: " + + Thread.currentThread().getId()); } } else { - System.out.println("[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " + - Thread.currentThread().getId()); + System.out.println( + "[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " + + Thread.currentThread().getId()); } } }