Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@
*/
public abstract class AsyncResultDecorator extends BaseDecorator {

private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultExtension computeValue(Class<?> type) {
return AsyncResultExtensions.registered().stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Look for asynchronous result and decorate it with span finisher. If the result is not
* asynchronous, it will be return unmodified and span will be finished.
Expand All @@ -33,12 +22,9 @@ protected AsyncResultExtension computeValue(Class<?> type) {
*/
public Object wrapAsyncResultOrFinishSpan(
final Object result, final Class<?> methodReturnType, final AgentSpan span) {
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) {
Object applied = extension.apply(result, span);
if (applied != null) {
return applied;
}
Object applied = AsyncResultExtensions.wrapAsyncResult(result, methodReturnType, span);
if (applied != null) {
return applied;
}
// If no extension was applied, immediately finish the span and return the original result
span.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,32 @@ public final class AsyncResultExtensions {
private static final List<AsyncResultExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension()));

private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultExtension computeValue(Class<?> type) {
return EXTENSIONS.stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Wraps a supported async result so the span is finished when the async computation completes.
*
* @return the wrapped async result, or {@code null} if the result type is unsupported or no
* wrapping is applied
*/
public static Object wrapAsyncResult(
final Object result, final Class<?> resultType, final AgentSpan span) {
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(resultType)) != null) {
return extension.apply(result, span);
}
return null;
}

/**
* Registers an extension to add supported async types.
*
Expand All @@ -36,11 +62,6 @@ public static void register(AsyncResultExtension extension) {
}
}

/** Returns the list of currently registered extensions. */
public static List<AsyncResultExtension> registered() {
return EXTENSIONS;
}

static final class CompletableAsyncResultExtension implements AsyncResultExtension {
@Override
public boolean supports(Class<?> result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static void init() {}

@Override
public boolean supports(Class<?> result) {
return result == Flux.class || result == Mono.class;
return Flux.class.isAssignableFrom(result) || Mono.class.isAssignableFrom(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion

plugins {
id 'org.jetbrains.kotlin.jvm'
}

muzzle {
pass {
group = 'org.springframework'
module = 'spring-messaging'
versions = "[4.0.0.RELEASE,)"
assertInverse = true
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
}
}

apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/test-with-kotlin.gradle"

testJvmConstraints {
minJavaVersion = JavaVersion.VERSION_17
Expand All @@ -16,13 +27,24 @@ testJvmConstraints {
addTestSuiteForDir('latestDepTest', 'test')

["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
def kotlinTaskName = name.replace("Groovy", "Kotlin")
tasks.named(name, GroovyCompile) {
configureCompiler(it, 17)
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
}
}

kotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_1_8
apiVersion = KotlinVersion.KOTLIN_1_9
languageVersion = KotlinVersion.KOTLIN_1_9
}
}

dependencies {
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')

// capture SQS send and receive spans, propagate trace details in messages
Expand All @@ -36,6 +58,33 @@ dependencies {
}
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'

// Spring Kafka + embedded Kafka broker for coroutine tests
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
exclude group: 'org.apache.kafka'
}

// KotlinAwareHandlerInstrumentation relies on the reactive-streams and reactor instrumentation
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
testImplementation project(':dd-java-agent:instrumentation:reactor-core-3.1')

testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'

testImplementation libs.kotlin
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
testImplementation "io.projectreactor:reactor-core:3.+"

testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')

latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.trace.instrumentation.springmessaging;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;

/**
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
* it during subscription.
*
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
*/
@AutoService(InstrumenterModule.class)
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public KotlinAwareHandlerInstrumentation() {
super("spring-messaging", "spring-messaging-4", "spring-messaging-kotlin");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
}

@Override
public String instrumentedType() {
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("doInvoke")),
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
}

public static class DoInvokeAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return Object result) {
if (result instanceof Publisher) {
InstrumentationContext.get(Publisher.class, Context.class)
.put((Publisher<?>) result, Context.current());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import java.util.concurrent.CompletionStage;
import net.bytebuddy.asm.Advice;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
Expand Down Expand Up @@ -55,12 +54,13 @@ public String[] helperClassNames() {
return new String[] {
packageName + ".SpringMessageDecorator",
packageName + ".SpringMessageExtractAdapter",
packageName + ".SpringMessageExtractAdapter$1"
packageName + ".SpringMessageExtractAdapter$1",
};
}

@AppliesOn(CONTEXT_TRACKING)
public static class ContextPropagationAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Message<?> message, @Advice.Local("ctxScope") ContextScope scope) {
Expand All @@ -77,6 +77,7 @@ public static void onExit(@Advice.Local("ctxScope") ContextScope scope) {
}

public static class HandleMessageAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(@Advice.This InvocableHandlerMethod thiz) {
AgentSpan span = startSpan(SPRING_INBOUND);
Expand All @@ -95,15 +96,20 @@ public static void onExit(
}
AgentSpan span = scope.span();
scope.close();
if (result instanceof CompletionStage) {
result = ((CompletionStage<?>) result).whenComplete(AsyncResultExtensions.finishSpan(span));
} else {
if (null != error) {
DECORATE.onError(span, error);
if (null != error) {
DECORATE.onError(span, error);
}
if (result != null) {
Object wrappedResult =
AsyncResultExtensions.wrapAsyncResult(result, result.getClass(), span);
if (wrappedResult != null) {
result = wrappedResult;
// span will be finished by the wrapper
return;
}
DECORATE.beforeFinish(span);
span.finish();
}
DECORATE.beforeFinish(span);
span.finish();
}
}
}
Loading
Loading