11import datadog.trace.agent.test.AgentTestRunner
2- import datadog.trace.api.Config
32import org.apache.kafka.clients.consumer.ConsumerRecord
43import org.apache.kafka.common.serialization.Serdes
54import org.apache.kafka.streams.KafkaStreams
65import org.apache.kafka.streams.StreamsConfig
76import org.apache.kafka.streams.kstream.KStream
8- import org.apache.kafka.streams.kstream.KStreamBuilder
97import org.apache.kafka.streams.kstream.ValueMapper
108import org.junit.ClassRule
119import org.springframework.kafka.core.DefaultKafkaConsumerFactory
1210import org.springframework.kafka.core.DefaultKafkaProducerFactory
1311import org.springframework.kafka.core.KafkaTemplate
1412import org.springframework.kafka.listener.KafkaMessageListenerContainer
1513import org.springframework.kafka.listener.MessageListener
16- import org.springframework.kafka.listener.config.ContainerProperties
1714import org.springframework.kafka.test.rule.KafkaEmbedded
1815import org.springframework.kafka.test.utils.ContainerTestUtils
1916import org.springframework.kafka.test.utils.KafkaTestUtils
@@ -41,7 +38,15 @@ class KafkaStreamsTest extends AgentTestRunner {
4138
4239 // CONFIGURE CONSUMER
4340 def consumerFactory = new DefaultKafkaConsumerFactory<String , String > (KafkaTestUtils . consumerProps(" sender" , " false" , embeddedKafka))
44- def consumerContainer = new KafkaMessageListenerContainer<> (consumerFactory, new ContainerProperties (STREAM_PROCESSED ))
41+
42+ def containerProperties
43+ try {
44+ // Different class names for test and latestDepTest.
45+ containerProperties = Class . forName(" org.springframework.kafka.listener.config.ContainerProperties" ). newInstance(STREAM_PROCESSED )
46+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
47+ containerProperties = Class . forName(" org.springframework.kafka.listener.ContainerProperties" ). newInstance(STREAM_PROCESSED )
48+ }
49+ def consumerContainer = new KafkaMessageListenerContainer<> (consumerFactory, containerProperties)
4550
4651 // create a thread safe queue to store the processed message
4752 def records = new LinkedBlockingQueue<ConsumerRecord<String , String > > ()
@@ -65,9 +70,15 @@ class KafkaStreamsTest extends AgentTestRunner {
6570 ContainerTestUtils . waitForAssignment(consumerContainer, embeddedKafka. getPartitionsPerTopic())
6671
6772 // CONFIGURE PROCESSOR
68- final KStreamBuilder builder = new KStreamBuilder ()
73+ def builder
74+ try {
75+ // Different class names for test and latestDepTest.
76+ builder = Class . forName(" org.apache.kafka.streams.kstream.KStreamBuilder" ). newInstance()
77+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
78+ builder = Class . forName(" org.apache.kafka.streams.StreamsBuilder" ). newInstance()
79+ }
6980 KStream<String , String > textLines = builder. stream(STREAM_PENDING )
70- textLines
81+ def values = textLines
7182 .mapValues(new ValueMapper<String , String > () {
7283 @Override
7384 String apply (String textLine ) {
@@ -76,8 +87,18 @@ class KafkaStreamsTest extends AgentTestRunner {
7687 return textLine. toLowerCase()
7788 }
7889 })
79- .to(Serdes.String (), Serdes.String (), STREAM_PROCESSED )
80- KafkaStreams streams = new KafkaStreams (builder, config)
90+
91+ KafkaStreams streams
92+ try {
93+ // Different api for test and latestDepTest.
94+ values. to(Serdes.String (), Serdes.String (), STREAM_PROCESSED )
95+ streams = new KafkaStreams (builder, config)
96+ } catch (MissingMethodException e) {
97+ def producer = Class . forName(" org.apache.kafka.streams.kstream.Produced" )
98+ .with(Serdes.String (), Serdes.String ())
99+ values. to(STREAM_PROCESSED , producer)
100+ streams = new KafkaStreams (builder. build(), config)
101+ }
81102 streams. start()
82103
83104 // CONFIGURE PRODUCER
@@ -94,101 +115,89 @@ class KafkaStreamsTest extends AgentTestRunner {
94115 received. value() == greeting. toLowerCase()
95116 received. key() == null
96117
97- TEST_WRITER . waitForTraces(3 )
98- TEST_WRITER . size() == 3
99-
100- def t1 = TEST_WRITER . get(0 )
101- t1. size() == 1
102- def t2 = TEST_WRITER . get(1 )
103- t2. size() == 2
104- def t3 = TEST_WRITER . get(2 )
105- t3. size() == 1
106-
107- and : // PRODUCER span 0
108- def t1span1 = t1[0 ]
109-
110- t1span1. context(). operationName == " kafka.produce"
111- t1span1. serviceName == " kafka"
112- t1span1. resourceName == " Produce Topic $STREAM_PENDING "
113- t1span1. type == " queue"
114- ! t1span1. context(). getErrorFlag()
115- t1span1. context(). parentId == " 0"
116-
117- def t1tags1 = t1span1. context(). tags
118- t1tags1[" component" ] == " java-kafka"
119- t1tags1[" span.kind" ] == " producer"
120- t1tags1[" span.type" ] == " queue"
121- t1tags1[" thread.name" ] != null
122- t1tags1[" thread.id" ] != null
123- t1tags1[Config . RUNTIME_ID_TAG ] == Config . get(). runtimeId
124- t1tags1. size() == 6
125-
126- and : // STREAMING span 0
127- def t2span1 = t2[0 ]
128-
129- t2span1. context(). operationName == " kafka.produce"
130- t2span1. serviceName == " kafka"
131- t2span1. resourceName == " Produce Topic $STREAM_PROCESSED "
132- t2span1. type == " queue"
133- ! t2span1. context(). getErrorFlag()
134-
135- def t2tags1 = t2span1. context(). tags
136- t2tags1[" component" ] == " java-kafka"
137- t2tags1[" span.kind" ] == " producer"
138- t2tags1[" span.type" ] == " queue"
139- t2tags1[" thread.name" ] != null
140- t2tags1[" thread.id" ] != null
141- t2tags1. size() == 5
142-
143- and : // STREAMING span 1
144- def t2span2 = t2[1 ]
145- t2span1. context(). parentId == t2span2. context(). spanId
146-
147- t2span2. context(). operationName == " kafka.consume"
148- t2span2. serviceName == " kafka"
149- t2span2. resourceName == " Consume Topic $STREAM_PENDING "
150- t2span2. type == " queue"
151- ! t2span2. context(). getErrorFlag()
152- t2span2. context(). parentId == t1span1. context(). spanId
153-
154- def t2tags2 = t2span2. context(). tags
155- t2tags2[" component" ] == " java-kafka"
156- t2tags2[" span.kind" ] == " consumer"
157- t2tags2[" span.type" ] == " queue"
158- t2tags2[" partition" ] >= 0
159- t2tags2[" offset" ] == 0
160- t2tags2[" thread.name" ] != null
161- t2tags2[" thread.id" ] != null
162- t2tags2[Config . RUNTIME_ID_TAG ] == Config . get(). runtimeId
163- t2tags2[" asdf" ] == " testing"
164- t2tags2. size() == 9
165-
166- and : // CONSUMER span 0
167- def t3span1 = t3[0 ]
168-
169- t3span1. context(). operationName == " kafka.consume"
170- t3span1. serviceName == " kafka"
171- t3span1. resourceName == " Consume Topic $STREAM_PROCESSED "
172- t3span1. type == " queue"
173- ! t3span1. context(). getErrorFlag()
174- t3span1. context(). parentId == t2span1. context(). spanId
175-
176- def t3tags1 = t3span1. context(). tags
177- t3tags1[" component" ] == " java-kafka"
178- t3tags1[" span.kind" ] == " consumer"
179- t3tags1[" span.type" ] == " queue"
180- t3tags1[" partition" ] >= 0
181- t3tags1[" offset" ] == 0
182- t3tags1[" thread.name" ] != null
183- t3tags1[" thread.id" ] != null
184- t3tags1[Config . RUNTIME_ID_TAG ] == Config . get(). runtimeId
185- t3tags1[" testing" ] == 123
186- t3tags1. size() == 9
118+ assertTraces(3 ) {
119+ trace(0 , 1 ) {
120+ // PRODUCER span 0
121+ span(0 ) {
122+ serviceName " kafka"
123+ operationName " kafka.produce"
124+ resourceName " Produce Topic $STREAM_PENDING "
125+ spanType " queue"
126+ errored false
127+ parent()
128+ tags {
129+ " component" " java-kafka"
130+ " span.kind" " producer"
131+ " span.type" " queue"
132+ defaultTags()
133+ }
134+ }
135+ }
136+ trace(1 , 2 ) {
137+
138+ // STREAMING span 0
139+ span(0 ) {
140+ serviceName " kafka"
141+ operationName " kafka.produce"
142+ resourceName " Produce Topic $STREAM_PROCESSED "
143+ spanType " queue"
144+ errored false
145+ childOf span(1 )
146+
147+ tags {
148+ " component" " java-kafka"
149+ " span.kind" " producer"
150+ " span.type" " queue"
151+ defaultTags()
152+ }
153+ }
154+
155+ // STREAMING span 1
156+ span(1 ) {
157+ serviceName " kafka"
158+ operationName " kafka.consume"
159+ resourceName " Consume Topic $STREAM_PENDING "
160+ spanType " queue"
161+ errored false
162+ childOf TEST_WRITER [0 ][0 ]
163+
164+ tags {
165+ " component" " java-kafka"
166+ " span.kind" " consumer"
167+ " span.type" " queue"
168+ " partition" { it >= 0 }
169+ " offset" 0
170+ defaultTags(true )
171+ " asdf" " testing"
172+ }
173+ }
174+ }
175+ trace(2 , 1 ) {
176+ // CONSUMER span 0
177+ span(0 ) {
178+ serviceName " kafka"
179+ operationName " kafka.consume"
180+ resourceName " Consume Topic $STREAM_PROCESSED "
181+ spanType " queue"
182+ errored false
183+ childOf TEST_WRITER [1 ][0 ]
184+ tags {
185+ " component" " java-kafka"
186+ " span.kind" " consumer"
187+ " span.type" " queue"
188+ " partition" { it >= 0 }
189+ " offset" 0
190+ defaultTags(true )
191+ " testing" 123
192+ }
193+ }
194+ }
195+ }
187196
188197 def headers = received. headers()
189198 headers. iterator(). hasNext()
190- new String (headers. headers(" x-datadog-trace-id" ). iterator(). next(). value()) == " $t 2span1 . traceId "
191- new String (headers. headers(" x-datadog-parent-id" ). iterator(). next(). value()) == " $t 2span1 . spanId "
199+ new String (headers. headers(" x-datadog-trace-id" ). iterator(). next(). value()) == " ${ TEST_WRITER[1][0] .traceId} "
200+ new String (headers. headers(" x-datadog-parent-id" ). iterator(). next(). value()) == " ${ TEST_WRITER[1][0] .spanId} "
192201
193202
194203 cleanup :
0 commit comments