Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ out/
# Visual Studio Code #
######################
.vscode
.cursor

# Others #
##########
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@
2 org.springframework.lang.*
2 org.springframework.messaging.*
0 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
0 org.springframework.messaging.support.GenericMessage
2 org.springframework.objenesis.*
2 org.springframework.orm.*
2 org.springframework.remoting.*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -44,8 +45,13 @@ public String[] helperClassNames() {

@Override
public Map<String, String> contextStore() {
return singletonMap(
Map<String, String> contextStore = new java.util.HashMap<>(2);
contextStore.put(
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
contextStore.put(
"software.amazon.awssdk.services.sqs.model.Message",
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
return contextStore;
}

@Override
Expand All @@ -63,7 +69,11 @@ public static void onExit(
String queueUrl =
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
if (queueUrl != null) {
messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId());
ContextStore<Message, State> messageStateStore =
InstrumentationContext.get(Message.class, State.class);
messages =
new TracingList(
messageStateStore, messages, queueUrl, result.responseMetadata().requestId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,12 +31,18 @@
public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);

private final ContextStore<Message, State> messageStateStore;
protected final L delegate;
private final String queueUrl;
private final String requestId;
private AgentSpanContext batchContext;

public TracingIterator(L delegate, String queueUrl, String requestId) {
public TracingIterator(
ContextStore<Message, State> messageStateStore,
L delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -99,6 +107,11 @@ protected void startNewMessageSpan(Message message) {
BROKER_DECORATE.beforeFinish(queueSpan);
queueSpan.finish();
}

// Capture state after data streams checkpoint is set
State state = State.FACTORY.create();
state.captureAndSetContinuation(span);
messageStateStore.put(message, state);
}
} catch (Exception e) {
log.debug("Problem tracing new SQS message span", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingList implements List<Message> {
private final ContextStore<Message, State> messageStateStore;
private final List<Message> delegate;
private final String queueUrl;
private final String requestId;

public TracingList(List<Message> delegate, String queueUrl, String requestId) {
public TracingList(
ContextStore<Message, State> messageStateStore,
List<Message> delegate,
String queueUrl,
String requestId) {
this.messageStateStore = messageStateStore;
this.delegate = delegate;
this.queueUrl = queueUrl;
this.requestId = requestId;
Expand Down Expand Up @@ -125,12 +133,14 @@ public ListIterator<Message> listIterator() {
@Override
public ListIterator<Message> listIterator(int index) {
// every iteration will add spans. Not only the very first one
return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId);
return new TracingListIterator(
messageStateStore, delegate.listIterator(index), queueUrl, requestId);
}

@Override
public List<Message> subList(int fromIndex, int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId);
return new TracingList(
messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.ListIterator;
import software.amazon.awssdk.services.sqs.model.Message;

public class TracingListIterator extends TracingIterator<ListIterator<Message>>
implements ListIterator<Message> {

public TracingListIterator(ListIterator<Message> delegate, String queueUrl, String requestId) {
super(delegate, queueUrl, requestId);
public TracingListIterator(
ContextStore<Message, State> messageStateStore,
ListIterator<Message> delegate,
String queueUrl,
String requestId) {
super(messageStateStore, delegate, queueUrl, requestId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND;
import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
Expand Down Expand Up @@ -53,12 +57,33 @@ public String[] helperClassNames() {
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap("org.springframework.messaging.Message", State.class.getName());
}

public static class HandleMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
AgentSpanContext parentContext;
AgentSpan parent = activeSpan();

// First try to get context from continuation (preferred method)
State state = InstrumentationContext.get(Message.class, State.class).get(message);
if (null != state) {
AgentScope.Continuation continuation = state.getAndResetContinuation();
if (null != continuation) {
try (AgentScope scope = continuation.activate()) {
AgentSpan span = startSpan(SPRING_INBOUND);
DECORATE.afterStart(span);
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
return activateSpan(span);
}
}
}

// Fallback to existing context or header extraction
if (null != parent) {
// prefer existing context, assume it was already extracted from this message
parentContext = parent.context();
Expand Down
59 changes: 59 additions & 0 deletions dd-java-agent/instrumentation/spring/spring-sqs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
muzzle {
pass {
group = 'io.awspring.cloud'
module = 'spring-cloud-aws-sqs'
versions = "[3.0.0,)"
assertInverse = true
}
}

def TEST_JAVA = 17


apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23'

testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sdk-2.2')
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sqs-2.0')
testImplementation project(':dd-java-agent:instrumentation:spring:spring-messaging-4.0')

testImplementation group: 'org.springframework', name: 'spring-context', version: '6.1.10'
testImplementation group: 'org.springframework', name: 'spring-test', version: '6.1.10'
testImplementation group: 'org.springframework', name: 'spring-core', version: '6.1.10'
testImplementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.1.0'
testImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
testImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.20.162'
testImplementation group: 'org.testcontainers', name: 'localstack', version: libs.versions.testcontainers.get()
testImplementation 'org.slf4j:slf4j-api:2.0.13'
testImplementation 'ch.qos.logback:logback-classic:1.4.14'
testImplementation 'ch.qos.logback:logback-core:1.4.14'

latestDepTestImplementation group: 'org.springframework', name: 'spring-context', version: '6.+'
latestDepTestImplementation group: 'org.springframework', name: 'spring-test', version: '6.+'
latestDepTestImplementation group: 'org.springframework', name: 'spring-core', version: '6.+'
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.+'
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.+'
}

// Compile only *test* Groovy source sets (e.g., compileTestGroovy, compileLatestDepTestGroovy) with JDK 17
tasks.withType(org.gradle.api.tasks.compile.GroovyCompile).configureEach {
if (name.endsWith('TestGroovy')) {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(TEST_JAVA)
}
}
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(TEST_JAVA)
}
jvmArgs += ['--add-opens=java.base/java.util=ALL-UNNAMED']
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package datadog.trace.instrumentation.springsqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.springframework.messaging.Message;

@AutoService(InstrumenterModule.class)
public class AbstractMessagingMessageConverterToMessagingInstrumentation
extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {

public AbstractMessagingMessageConverterToMessagingInstrumentation() {
super("spring-sqs");
Comment thread
piochelepiotr marked this conversation as resolved.
Outdated
}

@Override
public String hierarchyMarkerType() {
return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("toMessagingMessage")),
getClass().getName() + "$ToMessagingMessageAdvice");
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStore = new HashMap<>(2);
contextStore.put("software.amazon.awssdk.services.sqs.model.Message", State.class.getName());
contextStore.put("org.springframework.messaging.Message", State.class.getName());
return contextStore;
}

public static class ToMessagingMessageAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) Object sqsMessage, @Advice.Return Message springMessage) {
// Transfer state from SQS message to Spring message
if (null != sqsMessage
&& null != springMessage
&& sqsMessage
.getClass()
.getName()
.equals("software.amazon.awssdk.services.sqs.model.Message")) {

ContextStore<software.amazon.awssdk.services.sqs.model.Message, State> from =
InstrumentationContext.get(
software.amazon.awssdk.services.sqs.model.Message.class, State.class);
State state = from.get((software.amazon.awssdk.services.sqs.model.Message) sqsMessage);
if (null != state) {
from.put((software.amazon.awssdk.services.sqs.model.Message) sqsMessage, null);
ContextStore<Message, State> to = InstrumentationContext.get(Message.class, State.class);
to.put(springMessage, state);
}
}
}
}
}
Loading