Skip to content

Commit c1f2818

Browse files
mhlidddevflow.devflow-routing-intake
andauthored
Support CompletableFuture on Spring Messaging Spans (#10979)
init Merge branch 'master' into mhlidd/async_spring_messaging spotless spotless pt 2 adding child span to async exec Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent b52e28e commit c1f2818

File tree

3 files changed

+100
-15
lines changed

3 files changed

+100
-15
lines changed

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import datadog.trace.agent.tooling.annotation.AppliesOn;
2121
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2222
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
23+
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
24+
import java.util.concurrent.CompletionStage;
2325
import net.bytebuddy.asm.Advice;
2426
import org.springframework.messaging.Message;
2527
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -84,17 +86,24 @@ public static AgentScope onEnter(@Advice.This InvocableHandlerMethod thiz) {
8486
}
8587

8688
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
87-
public static void onExit(@Advice.Enter AgentScope scope, @Advice.Thrown Throwable error) {
89+
public static void onExit(
90+
@Advice.Enter AgentScope scope,
91+
@Advice.Return(readOnly = false) Object result,
92+
@Advice.Thrown Throwable error) {
8893
if (null == scope) {
8994
return;
9095
}
9196
AgentSpan span = scope.span();
92-
if (null != error) {
93-
DECORATE.onError(span, error);
94-
}
9597
scope.close();
96-
DECORATE.beforeFinish(span);
97-
span.finish();
98+
if (result instanceof CompletionStage) {
99+
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
100+
} else {
101+
if (null != error) {
102+
DECORATE.onError(span, error);
103+
}
104+
DECORATE.beforeFinish(span);
105+
span.finish();
106+
}
98107
}
99108
}
100109
}

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/test/groovy/SpringListenerSQSTest.groovy

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
1515
import org.springframework.messaging.support.GenericMessage
1616
import software.amazon.awssdk.services.sqs.SqsAsyncClient
1717

18+
import java.util.concurrent.TimeUnit
19+
20+
1821
class SpringListenerSQSTest extends InstrumentationSpecification {
1922

2023
@Override
@@ -128,7 +131,62 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
128131
}
129132
}
130133

131-
static sendMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) {
134+
def "async handler keeps spring.consume span active during CompletableFuture execution"() {
135+
setup:
136+
def context = new AnnotationConfigApplicationContext(Config)
137+
def address = context.getBean(SQSRestServer).waitUntilStarted().localAddress()
138+
def template = SqsTemplate.newTemplate(context.getBean(SqsAsyncClient))
139+
TEST_WRITER.waitForTraces(2)
140+
TEST_WRITER.clear()
141+
142+
when:
143+
TraceUtils.runUnderTrace("parent") {
144+
template.sendAsync("SpringListenerSQSAsync", "an async message").get()
145+
}
146+
147+
then:
148+
def sendingSpan
149+
assertTraces(4, SORT_TRACES_BY_START) {
150+
sortSpansByStart()
151+
trace(3) {
152+
basicSpan(it, "parent")
153+
getQueueUrl(it, address, span(0), "SpringListenerSQSAsync")
154+
sendMessage(it, address, span(0), "SpringListenerSQSAsync")
155+
sendingSpan = span(2)
156+
}
157+
trace(1) {
158+
receiveMessage(it, address, sendingSpan, "SpringListenerSQSAsync")
159+
}
160+
trace(2) {
161+
span {
162+
serviceName "my-service"
163+
operationName "spring.consume"
164+
resourceName "TestListener.observeAsync"
165+
spanType DDSpanTypes.MESSAGE_CONSUMER
166+
errored false
167+
measured true
168+
childOf(sendingSpan)
169+
// The span duration should be at least 500ms since the async handler sleeps 500ms
170+
assert span(0).durationNano > TimeUnit.MILLISECONDS.toNanos(500)
171+
tags {
172+
"$Tags.COMPONENT" "spring-messaging"
173+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
174+
defaultTags(true)
175+
}
176+
}
177+
// Child span created inside the CompletableFuture proves spring.consume was active
178+
span {
179+
operationName "async.child"
180+
childOf(span(0))
181+
}
182+
}
183+
trace(1) {
184+
deleteMessageBatch(it, address, "SpringListenerSQSAsync")
185+
}
186+
}
187+
}
188+
189+
static sendMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan, String queueName = "SpringListenerSQS") {
132190
traceAssert.span {
133191
serviceName "sqs"
134192
operationName "aws.http"
@@ -148,15 +206,15 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
148206
"aws_service" "Sqs"
149207
"aws.operation" "SendMessage"
150208
"aws.agent" "java-aws-sdk"
151-
"aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS"
209+
"aws.queue.url" "http://localhost:${address.port}/000000000000/${queueName}"
152210
"aws.requestId" "00000000-0000-0000-0000-000000000000"
153211
urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("SendMessage"))
154212
defaultTags()
155213
}
156214
}
157215
}
158216

159-
static getQueueUrl(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) {
217+
static getQueueUrl(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan, String queueName = "SpringListenerSQS") {
160218
traceAssert.span {
161219
serviceName "java-aws-sdk"
162220
operationName "aws.http"
@@ -176,16 +234,16 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
176234
"aws_service" "Sqs"
177235
"aws.operation" "GetQueueUrl"
178236
"aws.agent" "java-aws-sdk"
179-
"aws.queue.name" "SpringListenerSQS"
237+
"aws.queue.name" queueName
180238
"aws.requestId" "00000000-0000-0000-0000-000000000000"
181-
"queuename" "SpringListenerSQS"
239+
"queuename" queueName
182240
urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("GetQueueUrl"))
183241
defaultTags()
184242
}
185243
}
186244
}
187245

188-
static receiveMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) {
246+
static receiveMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan, String queueName = "SpringListenerSQS") {
189247
traceAssert.span {
190248
serviceName "sqs"
191249
operationName "aws.http"
@@ -201,7 +259,7 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
201259
"aws_service" "Sqs"
202260
"aws.operation" "ReceiveMessage"
203261
"aws.agent" "java-aws-sdk"
204-
"aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS"
262+
"aws.queue.url" "http://localhost:${address.port}/000000000000/${queueName}"
205263
"aws.requestId" "00000000-0000-0000-0000-000000000000"
206264
defaultTags(true)
207265
}
@@ -225,7 +283,7 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
225283
}
226284
}
227285

228-
static deleteMessageBatch(TraceAssert traceAssert, InetSocketAddress address) {
286+
static deleteMessageBatch(TraceAssert traceAssert, InetSocketAddress address, String queueName = "SpringListenerSQS") {
229287
traceAssert.span {
230288
serviceName "sqs"
231289
operationName "aws.http"
@@ -245,7 +303,7 @@ class SpringListenerSQSTest extends InstrumentationSpecification {
245303
"aws_service" "Sqs"
246304
"aws.operation" "DeleteMessageBatch"
247305
"aws.agent" "java-aws-sdk"
248-
"aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS"
306+
"aws.queue.url" "http://localhost:${address.port}/000000000000/${queueName}"
249307
"aws.requestId" "00000000-0000-0000-0000-000000000000"
250308
urlTags("http://localhost:${address.port}/", ExpectedQueryParams.getExpectedQueryParams("DeleteMessageBatch"))
251309
defaultTags()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
package listener
22

3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan
5+
36
import io.awspring.cloud.sqs.annotation.SqsListener
47
import org.springframework.stereotype.Component
58

9+
import java.util.concurrent.CompletableFuture
10+
611
@Component
712
class TestListener {
813
@SqsListener(queueNames = "SpringListenerSQS")
914
void observe(String message) {
1015
println "Received $message"
1116
}
17+
18+
@SqsListener(queueNames = "SpringListenerSQSAsync")
19+
CompletableFuture<Void> observeAsync(String message) {
20+
return CompletableFuture.runAsync {
21+
Thread.sleep(500)
22+
// Asserting spring.consume root span is active during async execution
23+
def childSpan = startSpan("async.child")
24+
def childScope = activateSpan(childSpan)
25+
childScope.close()
26+
childSpan.finish()
27+
println "Async received $message"
28+
}
29+
}
1230
}

0 commit comments

Comments
 (0)