Skip to content

Commit 682dd03

Browse files
committed
add sqs spring messaging context propagation support
1 parent e992b65 commit 682dd03

12 files changed

Lines changed: 417 additions & 9 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ out/
4545
# Visual Studio Code #
4646
######################
4747
.vscode
48+
.cursor
4849

4950
# Others #
5051
##########

dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@
334334
2 org.springframework.lang.*
335335
2 org.springframework.messaging.*
336336
0 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
337+
0 org.springframework.messaging.support.GenericMessage
337338
2 org.springframework.objenesis.*
338339
2 org.springframework.orm.*
339340
2 org.springframework.remoting.*

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static java.util.Collections.singletonMap;
54
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
65

76
import com.google.auto.service.AutoService;
87
import datadog.trace.agent.tooling.Instrumenter;
98
import datadog.trace.agent.tooling.InstrumenterModule;
109
import datadog.trace.api.InstrumenterConfig;
10+
import datadog.trace.bootstrap.ContextStore;
1111
import datadog.trace.bootstrap.InstrumentationContext;
12+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
1213
import java.util.List;
1314
import java.util.Map;
1415
import net.bytebuddy.asm.Advice;
@@ -44,8 +45,13 @@ public String[] helperClassNames() {
4445

4546
@Override
4647
public Map<String, String> contextStore() {
47-
return singletonMap(
48+
Map<String, String> contextStore = new java.util.HashMap<>(2);
49+
contextStore.put(
4850
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
51+
contextStore.put(
52+
"software.amazon.awssdk.services.sqs.model.Message",
53+
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
54+
return contextStore;
4955
}
5056

5157
@Override
@@ -63,7 +69,11 @@ public static void onExit(
6369
String queueUrl =
6470
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
6571
if (queueUrl != null) {
66-
messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId());
72+
ContextStore<Message, State> messageStateStore =
73+
InstrumentationContext.get(Message.class, State.class);
74+
messages =
75+
new TracingList(
76+
messageStateStore, messages, queueUrl, result.responseMetadata().requestId());
6777
}
6878
}
6979
}

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import datadog.trace.api.Config;
2020
import datadog.trace.api.datastreams.DataStreamsTags;
21+
import datadog.trace.bootstrap.ContextStore;
2122
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
25+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2426
import java.util.Iterator;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
@@ -29,12 +31,18 @@
2931
public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
3032
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);
3133

34+
private final ContextStore<Message, State> messageStateStore;
3235
protected final L delegate;
3336
private final String queueUrl;
3437
private final String requestId;
3538
private AgentSpanContext batchContext;
3639

37-
public TracingIterator(L delegate, String queueUrl, String requestId) {
40+
public TracingIterator(
41+
ContextStore<Message, State> messageStateStore,
42+
L delegate,
43+
String queueUrl,
44+
String requestId) {
45+
this.messageStateStore = messageStateStore;
3846
this.delegate = delegate;
3947
this.queueUrl = queueUrl;
4048
this.requestId = requestId;
@@ -99,6 +107,11 @@ protected void startNewMessageSpan(Message message) {
99107
BROKER_DECORATE.beforeFinish(queueSpan);
100108
queueSpan.finish();
101109
}
110+
111+
// Capture state after data streams checkpoint is set
112+
State state = State.FACTORY.create();
113+
state.captureAndSetContinuation(span);
114+
messageStateStore.put(message, state);
102115
}
103116
} catch (Exception e) {
104117
log.debug("Problem tracing new SQS message span", e);

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

3+
import datadog.trace.bootstrap.ContextStore;
4+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
35
import java.util.Collection;
46
import java.util.Iterator;
57
import java.util.List;
68
import java.util.ListIterator;
79
import software.amazon.awssdk.services.sqs.model.Message;
810

911
public class TracingList implements List<Message> {
12+
private final ContextStore<Message, State> messageStateStore;
1013
private final List<Message> delegate;
1114
private final String queueUrl;
1215
private final String requestId;
1316

14-
public TracingList(List<Message> delegate, String queueUrl, String requestId) {
17+
public TracingList(
18+
ContextStore<Message, State> messageStateStore,
19+
List<Message> delegate,
20+
String queueUrl,
21+
String requestId) {
22+
this.messageStateStore = messageStateStore;
1523
this.delegate = delegate;
1624
this.queueUrl = queueUrl;
1725
this.requestId = requestId;
@@ -125,12 +133,14 @@ public ListIterator<Message> listIterator() {
125133
@Override
126134
public ListIterator<Message> listIterator(int index) {
127135
// every iteration will add spans. Not only the very first one
128-
return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId);
136+
return new TracingListIterator(
137+
messageStateStore, delegate.listIterator(index), queueUrl, requestId);
129138
}
130139

131140
@Override
132141
public List<Message> subList(int fromIndex, int toIndex) {
133-
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId);
142+
return new TracingList(
143+
messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId);
134144
}
135145

136146
@Override

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
44

5+
import datadog.trace.bootstrap.ContextStore;
6+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
57
import java.util.ListIterator;
68
import software.amazon.awssdk.services.sqs.model.Message;
79

810
public class TracingListIterator extends TracingIterator<ListIterator<Message>>
911
implements ListIterator<Message> {
1012

11-
public TracingListIterator(ListIterator<Message> delegate, String queueUrl, String requestId) {
12-
super(delegate, queueUrl, requestId);
13+
public TracingListIterator(
14+
ContextStore<Message, State> messageStateStore,
15+
ListIterator<Message> delegate,
16+
String queueUrl,
17+
String requestId) {
18+
super(messageStateStore, delegate, queueUrl, requestId);
1319
}
1420

1521
@Override

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@
88
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
99
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND;
1010
import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER;
11+
import static java.util.Collections.singletonMap;
1112
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1213
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1314

1415
import com.google.auto.service.AutoService;
1516
import datadog.trace.agent.tooling.Instrumenter;
1617
import datadog.trace.agent.tooling.InstrumenterModule;
18+
import datadog.trace.bootstrap.InstrumentationContext;
1719
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
1820
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1921
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
22+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
23+
import java.util.Map;
2024
import net.bytebuddy.asm.Advice;
2125
import org.springframework.messaging.Message;
2226
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -53,12 +57,33 @@ public String[] helperClassNames() {
5357
};
5458
}
5559

60+
@Override
61+
public Map<String, String> contextStore() {
62+
return singletonMap("org.springframework.messaging.Message", State.class.getName());
63+
}
64+
5665
public static class HandleMessageAdvice {
5766
@Advice.OnMethodEnter(suppress = Throwable.class)
5867
public static AgentScope onEnter(
5968
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
6069
AgentSpanContext parentContext;
6170
AgentSpan parent = activeSpan();
71+
72+
// First try to get context from continuation (preferred method)
73+
State state = InstrumentationContext.get(Message.class, State.class).get(message);
74+
if (null != state) {
75+
AgentScope.Continuation continuation = state.getAndResetContinuation();
76+
if (null != continuation) {
77+
try (AgentScope scope = continuation.activate()) {
78+
AgentSpan span = startSpan(SPRING_INBOUND);
79+
DECORATE.afterStart(span);
80+
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
81+
return activateSpan(span);
82+
}
83+
}
84+
}
85+
86+
// Fallback to existing context or header extraction
6287
if (null != parent) {
6388
// prefer existing context, assume it was already extracted from this message
6489
parentContext = parent.context();
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
muzzle {
2+
pass {
3+
group = 'io.awspring.cloud'
4+
module = 'spring-cloud-aws-sqs'
5+
versions = "[3.0.0,)"
6+
assertInverse = true
7+
}
8+
}
9+
10+
def TEST_JAVA = 17
11+
12+
13+
apply from: "$rootDir/gradle/java.gradle"
14+
15+
addTestSuiteForDir('latestDepTest', 'test')
16+
17+
dependencies {
18+
compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
19+
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23'
20+
21+
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
22+
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sdk-2.2')
23+
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sqs-2.0')
24+
testImplementation project(':dd-java-agent:instrumentation:spring:spring-messaging-4.0')
25+
26+
testImplementation group: 'org.springframework', name: 'spring-context', version: '6.1.10'
27+
testImplementation group: 'org.springframework', name: 'spring-test', version: '6.1.10'
28+
testImplementation group: 'org.springframework', name: 'spring-core', version: '6.1.10'
29+
testImplementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.1.0'
30+
testImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
31+
testImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.20.162'
32+
testImplementation group: 'org.testcontainers', name: 'localstack', version: libs.versions.testcontainers.get()
33+
testImplementation 'org.slf4j:slf4j-api:2.0.13'
34+
testImplementation 'ch.qos.logback:logback-classic:1.4.14'
35+
testImplementation 'ch.qos.logback:logback-core:1.4.14'
36+
37+
latestDepTestImplementation group: 'org.springframework', name: 'spring-context', version: '6.+'
38+
latestDepTestImplementation group: 'org.springframework', name: 'spring-test', version: '6.+'
39+
latestDepTestImplementation group: 'org.springframework', name: 'spring-core', version: '6.+'
40+
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.+'
41+
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.+'
42+
}
43+
44+
// Compile only *test* Groovy source sets (e.g., compileTestGroovy, compileLatestDepTestGroovy) with JDK 17
45+
tasks.withType(org.gradle.api.tasks.compile.GroovyCompile).configureEach {
46+
if (name.endsWith('TestGroovy')) {
47+
javaLauncher = javaToolchains.launcherFor {
48+
languageVersion = JavaLanguageVersion.of(TEST_JAVA)
49+
}
50+
}
51+
}
52+
53+
tasks.withType(Test).configureEach {
54+
usesService(testcontainersLimit)
55+
javaLauncher = javaToolchains.launcherFor {
56+
languageVersion = JavaLanguageVersion.of(TEST_JAVA)
57+
}
58+
jvmArgs += ['--add-opens=java.base/java.util=ALL-UNNAMED']
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package datadog.trace.instrumentation.springsqs;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.bootstrap.ContextStore;
11+
import datadog.trace.bootstrap.InstrumentationContext;
12+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
import org.springframework.messaging.Message;
19+
20+
@AutoService(InstrumenterModule.class)
21+
public class AbstractMessagingMessageConverterToMessagingInstrumentation
22+
extends InstrumenterModule.Tracing
23+
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
24+
25+
public AbstractMessagingMessageConverterToMessagingInstrumentation() {
26+
super("spring-sqs");
27+
}
28+
29+
@Override
30+
public String hierarchyMarkerType() {
31+
return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter";
32+
}
33+
34+
@Override
35+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
36+
return extendsClass(named(hierarchyMarkerType()));
37+
}
38+
39+
@Override
40+
public void methodAdvice(MethodTransformer transformer) {
41+
transformer.applyAdvice(
42+
isMethod().and(named("toMessagingMessage")),
43+
getClass().getName() + "$ToMessagingMessageAdvice");
44+
}
45+
46+
@Override
47+
public Map<String, String> contextStore() {
48+
Map<String, String> contextStore = new HashMap<>(2);
49+
contextStore.put("software.amazon.awssdk.services.sqs.model.Message", State.class.getName());
50+
contextStore.put("org.springframework.messaging.Message", State.class.getName());
51+
return contextStore;
52+
}
53+
54+
public static class ToMessagingMessageAdvice {
55+
@Advice.OnMethodExit(suppress = Throwable.class)
56+
public static void onExit(
57+
@Advice.Argument(0) Object sqsMessage, @Advice.Return Message springMessage) {
58+
// Transfer state from SQS message to Spring message
59+
if (null != sqsMessage
60+
&& null != springMessage
61+
&& sqsMessage
62+
.getClass()
63+
.getName()
64+
.equals("software.amazon.awssdk.services.sqs.model.Message")) {
65+
66+
ContextStore<software.amazon.awssdk.services.sqs.model.Message, State> from =
67+
InstrumentationContext.get(
68+
software.amazon.awssdk.services.sqs.model.Message.class, State.class);
69+
State state = from.get((software.amazon.awssdk.services.sqs.model.Message) sqsMessage);
70+
if (null != state) {
71+
from.put((software.amazon.awssdk.services.sqs.model.Message) sqsMessage, null);
72+
ContextStore<Message, State> to = InstrumentationContext.get(Message.class, State.class);
73+
to.put(springMessage, state);
74+
}
75+
}
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)