Skip to content

Commit 23c38f7

Browse files
committed
Add support for KotlinAwareInvocableHandlerMethod to pass the spring.consume context to a kotlin suspend consume fun
1 parent c04d61b commit 23c38f7

File tree

4 files changed

+355
-0
lines changed

4 files changed

+355
-0
lines changed

dd-java-agent/instrumentation/spring/spring-messaging-4.0/build.gradle

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1+
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
2+
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
3+
4+
plugins {
5+
id 'org.jetbrains.kotlin.jvm'
6+
}
7+
18
muzzle {
29
pass {
310
group = 'org.springframework'
411
module = 'spring-messaging'
512
versions = "[4.0.0.RELEASE,)"
613
assertInverse = true
14+
// KotlinAwareHandlerInstrumentation references Publisher from reactive-streams,
15+
// which is not bundled in spring-messaging but is always present when Spring Kafka is.
16+
extraDependency 'org.reactivestreams:reactive-streams:1.0.4'
717
}
818
}
919

1020
apply from: "$rootDir/gradle/java.gradle"
21+
apply from: "$rootDir/gradle/test-with-kotlin.gradle"
1122

1223
testJvmConstraints {
1324
minJavaVersion = JavaVersion.VERSION_17
@@ -16,13 +27,24 @@ testJvmConstraints {
1627
addTestSuiteForDir('latestDepTest', 'test')
1728

1829
["compileTestGroovy", "compileLatestDepTestGroovy"].each { name ->
30+
def kotlinTaskName = name.replace("Groovy", "Kotlin")
1931
tasks.named(name, GroovyCompile) {
2032
configureCompiler(it, 17)
33+
classpath += files(tasks.named(kotlinTaskName).map { it.destinationDirectory })
34+
}
35+
}
36+
37+
kotlin {
38+
compilerOptions {
39+
jvmTarget = JvmTarget.JVM_1_8
40+
apiVersion = KotlinVersion.KOTLIN_1_9
41+
languageVersion = KotlinVersion.KOTLIN_1_9
2142
}
2243
}
2344

2445
dependencies {
2546
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE'
47+
compileOnly 'org.reactivestreams:reactive-streams:1.0.4'
2648
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-common')
2749

2850
// capture SQS send and receive spans, propagate trace details in messages
@@ -36,6 +58,32 @@ dependencies {
3658
}
3759
testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3'
3860

61+
// Spring Kafka + embedded Kafka broker for coroutine tests
62+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.3.4', {
63+
exclude group: 'org.apache.kafka'
64+
}
65+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.3.4', {
66+
exclude group: 'org.apache.kafka'
67+
}
68+
69+
// KotlinAwareHandlerInstrumentation relies on the reactive-streams instrumentation
70+
testImplementation project(':dd-java-agent:instrumentation:reactive-streams-1.0')
71+
72+
testImplementation 'org.apache.kafka:kafka-server-common:3.8.0:test'
73+
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
74+
testImplementation 'org.apache.kafka:kafka-clients:3.8.0:test'
75+
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0'
76+
testImplementation 'org.apache.kafka:kafka_2.13:3.8.0:test'
77+
78+
testImplementation libs.kotlin
79+
testImplementation "org.jetbrains.kotlin:kotlin-reflect"
80+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.8.+"
81+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.+"
82+
testImplementation "io.projectreactor:reactor-core:3.+"
83+
84+
testRuntimeOnly project(':dd-java-agent:instrumentation:kotlin-coroutines-1.3')
85+
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka:kafka-clients-3.8')
86+
3987
latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '6.+', {
4088
exclude group: 'org.slf4j', module: 'slf4j-api'
4189
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package datadog.trace.instrumentation.springmessaging;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
6+
import com.google.auto.service.AutoService;
7+
import datadog.context.Context;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.bootstrap.InstrumentationContext;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import java.util.Map;
14+
import net.bytebuddy.asm.Advice;
15+
import org.reactivestreams.Publisher;
16+
17+
/**
18+
* Instruments {@code KotlinAwareInvocableHandlerMethod.doInvoke()} to attach the current {@link
19+
* Context} to the returned {@link Publisher} so that the reactive-streams instrumentation activates
20+
* it during subscription.
21+
*
22+
* <p>When a Spring Kafka listener is a Kotlin {@code suspend fun}, {@code
23+
* KotlinAwareInvocableHandlerMethod.doInvoke()} returns a cold {@code Mono} immediately, before the
24+
* listener body runs. By the time the {@code Mono} is subscribed (and the underlying {@code
25+
* AbstractCoroutine} is constructed), the {@code spring.consume} scope opened by {@link
26+
* SpringMessageHandlerInstrumentation} has already been closed. This advice captures {@link
27+
* Context#current()} at {@code doInvoke()} exit — while {@code spring.consume} is still active —
28+
* and stores it on the Publisher. The reactive-streams {@code PublisherInstrumentation} then
29+
* retrieves and activates it during subscription so that {@code DatadogThreadContextElement} picks
30+
* up the correct parent context when the underlying {@code AbstractCoroutine} is constructed.
31+
*/
32+
@AutoService(InstrumenterModule.class)
33+
public class KotlinAwareHandlerInstrumentation extends InstrumenterModule.Tracing
34+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
35+
36+
public KotlinAwareHandlerInstrumentation() {
37+
super("spring-messaging", "spring-messaging-4");
38+
}
39+
40+
@Override
41+
public Map<String, String> contextStore() {
42+
return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
43+
}
44+
45+
@Override
46+
public List<Instrumenter> typeInstrumentations() {
47+
return Collections.singletonList(new KotlinAwareHandlerInstrumentation());
48+
}
49+
50+
@Override
51+
public String instrumentedType() {
52+
return "org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod";
53+
}
54+
55+
@Override
56+
public void methodAdvice(MethodTransformer transformer) {
57+
transformer.applyAdvice(
58+
isMethod().and(named("doInvoke")),
59+
KotlinAwareHandlerInstrumentation.class.getName() + "$DoInvokeAdvice");
60+
}
61+
62+
public static class DoInvokeAdvice {
63+
@Advice.OnMethodExit(suppress = Throwable.class)
64+
public static void onExit(@Advice.Return Object result) {
65+
if (result instanceof Publisher) {
66+
InstrumentationContext.get(Publisher.class, Context.class)
67+
.put((Publisher<?>) result, Context.current());
68+
}
69+
}
70+
}
71+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import datadog.trace.agent.test.InstrumentationSpecification
2+
import datadog.trace.agent.test.asserts.TraceAssert
3+
import listener.KafkaBatchCoroutineConfig
4+
import listener.KafkaBatchCoroutineListener
5+
import datadog.trace.api.DDSpanTypes
6+
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
7+
import datadog.trace.bootstrap.instrumentation.api.Tags
8+
import datadog.trace.core.DDSpan
9+
import org.apache.kafka.clients.producer.ProducerRecord
10+
import org.springframework.context.annotation.AnnotationConfigApplicationContext
11+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry
12+
import org.springframework.kafka.core.KafkaTemplate
13+
import org.springframework.kafka.test.EmbeddedKafkaBroker
14+
import org.springframework.kafka.test.utils.ContainerTestUtils
15+
16+
import java.util.concurrent.TimeUnit
17+
18+
class KafkaBatchListenerCoroutineTest extends InstrumentationSpecification {
19+
20+
private static final String TOPIC = "batch-coroutine-topic"
21+
private static final String CONSUMER_GROUP = "batch-coroutine-group"
22+
23+
def "batch @KafkaListener suspend fun - spans must be in the same trace as kafka.consume"() {
24+
setup:
25+
def appContext = new AnnotationConfigApplicationContext(KafkaBatchCoroutineConfig)
26+
def listener = appContext.getBean(KafkaBatchCoroutineListener)
27+
def template = appContext.getBean(KafkaTemplate)
28+
def broker = appContext.getBean(EmbeddedKafkaBroker)
29+
def registry = appContext.getBean(KafkaListenerEndpointRegistry)
30+
31+
// Wait until listener container has been assigned partitions before sending.
32+
registry.listenerContainers.each { container ->
33+
ContainerTestUtils.waitForAssignment(container, broker.partitionsPerTopic)
34+
}
35+
36+
TEST_WRITER.clear()
37+
38+
when: "two messages are sent before the consumer polls so they arrive in one batch"
39+
registry.listenerContainers.each { it.stop() }
40+
template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
41+
template.send(new ProducerRecord(TOPIC, "key", "hello-batch"))
42+
template.flush()
43+
registry.listenerContainers.each { it.start() }
44+
45+
then: "the listener processes the batch within 15 s"
46+
listener.latch.await(15, TimeUnit.SECONDS)
47+
listener.receivedValues == ["hello-batch", "hello-batch"]
48+
49+
and: "child.work is a child of spring.consume"
50+
DDSpan produce1Span, produce2Span, springConsumeParent
51+
assertTraces(10, SORT_TRACES_BY_ID) {
52+
trace(1) {
53+
produceSpan(it)
54+
produce1Span = span(0)
55+
}
56+
trace(1) {
57+
produceSpan(it)
58+
produce2Span = span(0)
59+
}
60+
61+
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
62+
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
63+
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
64+
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
65+
66+
trace(1) {
67+
// consume messages in one batch
68+
springConsumeSpan(it)
69+
springConsumeParent = span(0)
70+
}
71+
// child work span connected to the spring consume span
72+
trace(1) { childWorkSpan(it, springConsumeParent) }
73+
74+
trace(1) { kafkaConsumeSpan(it, produce1Span, 0) }
75+
trace(1) { kafkaConsumeSpan(it, produce2Span, 1) }
76+
}
77+
78+
cleanup:
79+
appContext.close()
80+
}
81+
82+
private static void produceSpan(TraceAssert trace) {
83+
trace.span {
84+
operationName "kafka.produce"
85+
resourceName "Produce Topic $TOPIC"
86+
spanType "queue"
87+
errored false
88+
measured true
89+
parent()
90+
tags {
91+
"$Tags.COMPONENT" "java-kafka"
92+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
93+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
94+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
95+
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
96+
defaultTags()
97+
}
98+
}
99+
}
100+
101+
private static void kafkaConsumeSpan(TraceAssert trace, DDSpan parent, int offset) {
102+
trace.span {
103+
operationName "kafka.consume"
104+
resourceName "Consume Topic $TOPIC"
105+
spanType "queue"
106+
errored false
107+
measured true
108+
childOf parent
109+
tags {
110+
"$Tags.COMPONENT" "java-kafka"
111+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
112+
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" TOPIC
113+
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" { String }
114+
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
115+
"$InstrumentationTags.CONSUMER_GROUP" CONSUMER_GROUP
116+
"$InstrumentationTags.OFFSET" offset
117+
"$InstrumentationTags.PARTITION" { Integer }
118+
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { Long }
119+
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { Long }
120+
defaultTags(true)
121+
}
122+
}
123+
}
124+
125+
private static void springConsumeSpan(TraceAssert trace) {
126+
trace.span {
127+
operationName "spring.consume"
128+
resourceName "KafkaBatchCoroutineListener.consume"
129+
spanType DDSpanTypes.MESSAGE_CONSUMER
130+
errored false
131+
measured true
132+
parent()
133+
tags {
134+
"$Tags.COMPONENT" "spring-messaging"
135+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
136+
defaultTags(true)
137+
}
138+
}
139+
}
140+
141+
private static void childWorkSpan(TraceAssert trace, DDSpan parent) {
142+
trace.span {
143+
operationName "child.work"
144+
childOf parent
145+
tags { defaultTags() }
146+
}
147+
}
148+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package listener
2+
3+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan
4+
import org.apache.kafka.clients.consumer.ConsumerConfig
5+
import org.apache.kafka.clients.consumer.ConsumerRecord
6+
import org.apache.kafka.common.serialization.StringDeserializer
7+
import org.apache.kafka.common.serialization.StringSerializer
8+
import org.springframework.context.annotation.Bean
9+
import org.springframework.context.annotation.Configuration
10+
import org.springframework.kafka.annotation.EnableKafka
11+
import org.springframework.kafka.annotation.KafkaListener
12+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
13+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
14+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
15+
import org.springframework.kafka.core.KafkaTemplate
16+
import org.springframework.kafka.test.EmbeddedKafkaBroker
17+
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker
18+
import org.springframework.kafka.test.utils.KafkaTestUtils
19+
import org.springframework.stereotype.Component
20+
import java.util.concurrent.CountDownLatch
21+
22+
const val BATCH_COROUTINE_TOPIC = "batch-coroutine-topic"
23+
24+
@Configuration(proxyBeanMethods = false)
25+
@EnableKafka
26+
class KafkaBatchCoroutineConfig {
27+
28+
@Bean(destroyMethod = "destroy")
29+
fun embeddedKafkaBroker(): EmbeddedKafkaBroker {
30+
val broker = EmbeddedKafkaKraftBroker(1, 2, BATCH_COROUTINE_TOPIC)
31+
broker.afterPropertiesSet()
32+
return broker
33+
}
34+
35+
@Bean
36+
fun producerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaProducerFactory<String, String> {
37+
val props = HashMap<String, Any>(KafkaTestUtils.producerProps(broker.brokersAsString))
38+
props["key.serializer"] = StringSerializer::class.java.name
39+
props["value.serializer"] = StringSerializer::class.java.name
40+
return DefaultKafkaProducerFactory(props)
41+
}
42+
43+
@Bean
44+
fun kafkaTemplate(pf: DefaultKafkaProducerFactory<String, String>) = KafkaTemplate(pf)
45+
46+
@Bean
47+
fun consumerFactory(broker: EmbeddedKafkaBroker): DefaultKafkaConsumerFactory<String, String> {
48+
val props = HashMap<String, Any>(KafkaTestUtils.consumerProps("batch-coroutine-group", "false", broker))
49+
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
50+
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
51+
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
52+
return DefaultKafkaConsumerFactory(props)
53+
}
54+
55+
@Bean
56+
fun batchListenerContainerFactory(
57+
cf: DefaultKafkaConsumerFactory<String, String>
58+
): ConcurrentKafkaListenerContainerFactory<String, String> {
59+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
60+
factory.consumerFactory = cf
61+
factory.isBatchListener = true
62+
return factory
63+
}
64+
65+
@Bean
66+
fun kafkaBatchCoroutineListener() = KafkaBatchCoroutineListener()
67+
}
68+
69+
@Component
70+
class KafkaBatchCoroutineListener {
71+
72+
val latch = CountDownLatch(1)
73+
val receivedValues = mutableListOf<String>()
74+
75+
@KafkaListener(
76+
topics = [BATCH_COROUTINE_TOPIC],
77+
containerFactory = "batchListenerContainerFactory"
78+
)
79+
suspend fun consume(records: List<ConsumerRecord<String, String>>) {
80+
Exception("consume records").printStackTrace(System.err)
81+
// Create a child span inside the coroutine body.
82+
// It should be linked to spring.consume, which should be linked to kafka.consume.
83+
val childSpan = startSpan("child.work")
84+
records.forEach { receivedValues.add(it.value()) }
85+
childSpan.finish()
86+
latch.countDown()
87+
}
88+
}

0 commit comments

Comments
 (0)