Skip to content

Commit 3123b19

Browse files
authored
fix: spring messaging kotlin async aware instrumentation (#11047)
Add support for KotlinAwareInvocableHandlerMethod to pass the spring.consume context to a kotlin suspend consume fun Keep spring.consume span open for async result using AsyncResultDecorator Merge branch 'master' into ygree/kotlin-aware-spring-kafka-batch-listener-instrumentation Assert span length in KafkaBatchListenerCoroutineTest Move wrapAsyncResult to AsyncResultExtensions Remove unused registered() Strengthen spring messaging async span tests Refine Spring messaging Kotlin handler instrumentation Merge branch 'master' into ygree/spring-messaging-async-aware-instrumentation Co-authored-by: yury.gribkov <yury.gribkov@datadoghq.com>
1 parent 0d8066b commit 3123b19

File tree

11 files changed

+485
-38
lines changed

11 files changed

+485
-38
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/AsyncResultDecorator.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,6 @@
1111
*/
1212
public abstract class AsyncResultDecorator extends BaseDecorator {
1313

14-
private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
15-
new ClassValue<AsyncResultExtension>() {
16-
@Override
17-
protected AsyncResultExtension computeValue(Class<?> type) {
18-
return AsyncResultExtensions.registered().stream()
19-
.filter(extension -> extension.supports(type))
20-
.findFirst()
21-
.orElse(null);
22-
}
23-
};
24-
2514
/**
2615
* Look for asynchronous result and decorate it with span finisher. If the result is not
2716
* asynchronous, it will be return unmodified and span will be finished.
@@ -33,12 +22,9 @@ protected AsyncResultExtension computeValue(Class<?> type) {
3322
*/
3423
public Object wrapAsyncResultOrFinishSpan(
3524
final Object result, final Class<?> methodReturnType, final AgentSpan span) {
36-
AsyncResultExtension extension;
37-
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) {
38-
Object applied = extension.apply(result, span);
39-
if (applied != null) {
40-
return applied;
41-
}
25+
Object applied = AsyncResultExtensions.wrapAsyncResult(result, methodReturnType, span);
26+
if (applied != null) {
27+
return applied;
4228
}
4329
// If no extension was applied, immediately finish the span and return the original result
4430
span.finish();

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/AsyncResultExtensions.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,32 @@ public final class AsyncResultExtensions {
1616
private static final List<AsyncResultExtension> EXTENSIONS =
1717
new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension()));
1818

19+
private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
20+
new ClassValue<AsyncResultExtension>() {
21+
@Override
22+
protected AsyncResultExtension computeValue(Class<?> type) {
23+
return EXTENSIONS.stream()
24+
.filter(extension -> extension.supports(type))
25+
.findFirst()
26+
.orElse(null);
27+
}
28+
};
29+
30+
/**
31+
* Wraps a supported async result so the span is finished when the async computation completes.
32+
*
33+
* @return the wrapped async result, or {@code null} if the result type is unsupported or no
34+
* wrapping is applied
35+
*/
36+
public static Object wrapAsyncResult(
37+
final Object result, final Class<?> resultType, final AgentSpan span) {
38+
AsyncResultExtension extension;
39+
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) {
40+
return extension.apply(result, span);
41+
}
42+
return null;
43+
}
44+
1945
/**
2046
* Registers an extension to add supported async types.
2147
*
@@ -36,11 +62,6 @@ public static void register(AsyncResultExtension extension) {
3662
}
3763
}
3864

39-
/** Returns the list of currently registered extensions. */
40-
public static List<AsyncResultExtension> registered() {
41-
return EXTENSIONS;
42-
}
43-
4465
static final class CompletableAsyncResultExtension implements AsyncResultExtension {
4566
@Override
4667
public boolean supports(Class<?> result) {

dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorAsyncResultExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public static void init() {}
2323

2424
@Override
2525
public boolean supports(Class<?> result) {
26-
return result == Flux.class || result == Mono.class;
26+
return Flux.class.isAssignableFrom(result) || Mono.class.isAssignableFrom(result);
2727
}
2828

2929
@Override

dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
3+
4+
plugins {
5+
id 'org.jetbrains.kotlin.jvm'
6+
}
7+
18
muzzle {
29
pass {
310
group = 'org.springframework'
411
module = 'spring-messaging'
512
versions = "[4.0.0.RELEASE,)"
613
assertInverse = true
14+
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
15+
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
16+
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
717
}
818
}
919

1020
apply from: "$rootDir/gradle/java.gradle"
21+
apply from: "$rootDir/gradle/test-with-kotlin.gradle"
1122

1223
testJvmConstraints {
1324
minJavaVersion = JavaVersion.VERSION_17
@@ -16,13 +27,24 @@ testJvmConstraints {
1627
addTestSuiteForDir('latestDepTest', 'test')
1728

1829
["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
30+
def kotlinTaskName = name.replace("Groovy", "Kotlin")
1931
tasks.named(name, GroovyCompile) {
2032
configureCompiler(it, 17)
33+
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
34+
}
35+
}
36+
37+
kotlin {
38+
compilerOptions {
39+
jvmTarget = JvmTarget.JVM_1_8
40+
apiVersion = KotlinVersion.KOTLIN_1_9
41+
languageVersion = KotlinVersion.KOTLIN_1_9
2142
}
2243
}
2344

2445
dependencies {
2546
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
47+
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
2648
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')
2749

2850
// capture SQS send and receive spans, propagate trace details in messages
@@ -36,6 +58,33 @@ dependencies {
3658
}
3759
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'
3860

61+
// Spring Kafka + embedded Kafka broker for coroutine tests
62+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
63+
exclude group: 'org.apache.kafka'
64+
}
65+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
66+
exclude group: 'org.apache.kafka'
67+
}
68+
69+
// KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation
70+
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
71+
testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1')
72+
73+
testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
74+
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
75+
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
76+
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
77+
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'
78+
79+
testImplementation libs.kotlin
80+
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
81+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
82+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
83+
testImplementation "io.projectreactor:reactor-core:3.+"
84+
85+
testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
86+
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')
87+
3988
latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
4089
exclude group: 'org.slf4j', module: 'slf4j-api'
4190
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package datadog.trace.instrumentation.springmessaging;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
6+
import com.google.auto.service.AutoService;
7+
import datadog.context.Context;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.bootstrap.InstrumentationContext;
11+
import java.util.Collections;
12+
import java.util.Map;
13+
import net.bytebuddy.asm.Advice;
14+
import org.reactivestreams.Publisher;
15+
16+
/**
17+
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
18+
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
19+
* it during subscription.
20+
*
21+
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
22+
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
23+
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
24+
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
25+
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
26+
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
27+
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
28+
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
29+
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
30+
*/
31+
@AutoService(InstrumenterModule.class)
32+
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
33+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
34+
35+
public KotlinAwareHandlerInstrumentation() {
36+
super("spring-messaging", "spring-messaging-4", "spring-messaging-kotlin");
37+
}
38+
39+
@Override
40+
public Map<String, String> contextStore() {
41+
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
42+
}
43+
44+
@Override
45+
public String instrumentedType() {
46+
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
47+
}
48+
49+
@Override
50+
public void methodAdvice(MethodTransformer transformer) {
51+
transformer.applyAdvice(
52+
isMethod().and(named("doInvoke")),
53+
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
54+
}
55+
56+
public static class DoInvokeAdvice {
57+
58+
@Advice.OnMethodExit(suppress = Throwable.class)
59+
public static void onExit(@Advice.Return Object result) {
60+
if (result instanceof Publisher) {
61+
InstrumentationContext.get(Publisher.class, Context.class)
62+
.put((Publisher<?>) result, Context.current());
63+
}
64+
}
65+
}
66+
}

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2222
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2323
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
24-
import java.util.concurrent.CompletionStage;
2524
import net.bytebuddy.asm.Advice;
2625
import org.springframework.messaging.Message;
2726
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -55,12 +54,13 @@ public String[] helperClassNames() {
5554
return new String[] {
5655
packageName + ".SpringMessageDecorator",
5756
packageName + ".SpringMessageExtractAdapter",
58-
packageName + ".SpringMessageExtractAdapter$1"
57+
packageName + ".SpringMessageExtractAdapter$1",
5958
};
6059
}
6160

6261
@AppliesOn(CONTEXT_TRACKING)
6362
public static class ContextPropagationAdvice {
63+
6464
@Advice.OnMethodEnter(suppress = Throwable.class)
6565
public static void onEnter(
6666
@Advice.Argument(0) Message<?> message, @Advice.Local("ctxScope") ContextScope scope) {
@@ -77,6 +77,7 @@ public static void onExit(@Advice.Local("ctxScope") ContextScope scope) {
7777
}
7878

7979
public static class HandleMessageAdvice {
80+
8081
@Advice.OnMethodEnter(suppress = Throwable.class)
8182
public static AgentScope onEnter(@Advice.This InvocableHandlerMethod thiz) {
8283
AgentSpan span = startSpan(SPRING_INBOUND);
@@ -95,15 +96,20 @@ public static void onExit(
9596
}
9697
AgentSpan span = scope.span();
9798
scope.close();
98-
if (result instanceof CompletionStage) {
99-
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
100-
} else {
101-
if (null != error) {
102-
DECORATE.onError(span, error);
99+
if (null != error) {
100+
DECORATE.onError(span, error);
101+
}
102+
if (result != null) {
103+
Object wrappedResult =
104+
AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span);
105+
if (wrappedResult != null) {
106+
result = wrappedResult;
107+
// span will be finished by the wrapper
108+
return;
103109
}
104-
DECORATE.beforeFinish(span);
105-
span.finish();
106110
}
111+
DECORATE.beforeFinish(span);
112+
span.finish();
107113
}
108114
}
109115
}

0 commit comments

Comments
 (0)