Skip to content

Commit 343a36e

Browse files
Enable Data Streams without adding context into Kafka headers (#6533)
* Enable data streams without adding context into Kafka headers * Address PR comments * spotless apply * add helper classes
1 parent b773b0f commit 343a36e

File tree

7 files changed

+73
-45
lines changed

7 files changed

+73
-45
lines changed

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public String instrumentedType() {
4747
@Override
4848
public String[] helperClassNames() {
4949
return new String[] {
50+
packageName + ".TextMapInjectAdapterInterface",
5051
packageName + ".KafkaConsumerInfo",
5152
packageName + ".KafkaConsumerInstrumentationHelper",
5253
packageName + ".KafkaDecorator",

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_PRODUCE;
1414
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
1515
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
16-
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
1716
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
1817
import static java.util.Collections.singletonMap;
1918
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@@ -59,7 +58,9 @@ public String instrumentedType() {
5958
public String[] helperClassNames() {
6059
return new String[] {
6160
packageName + ".KafkaDecorator",
61+
packageName + ".TextMapInjectAdapterInterface",
6262
packageName + ".TextMapInjectAdapter",
63+
packageName + ".NoopTextMapInjectAdapter",
6364
packageName + ".KafkaProducerCallback",
6465
"datadog.trace.instrumentation.kafka_common.StreamingContext",
6566
packageName + ".AvroSchemaExtractor",
@@ -113,6 +114,7 @@ public static AgentScope onEnter(
113114
span.setTag(InstrumentationTags.TOMBSTONE, true);
114115
}
115116

117+
TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER;
116118
// Do not inject headers for batch versions below 2
117119
// This is how similar check is being done in Kafka client itself:
118120
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
@@ -123,50 +125,48 @@ public static AgentScope onEnter(
123125
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
124126
&& Config.get().isKafkaClientPropagationEnabled()
125127
&& !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) {
126-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
127-
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
128-
if (clusterId != null) {
129-
sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId);
130-
}
131-
sortedTags.put(TOPIC_TAG, record.topic());
132-
sortedTags.put(TYPE_TAG, "kafka");
133-
try {
134-
propagate().inject(span, record.headers(), SETTER);
135-
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
136-
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
137-
// inject the context in the headers, but delay sending the stats until we know the
138-
// message size.
139-
// The stats are saved in the pathway context and sent in PayloadSizeAdvice.
140-
propagate()
141-
.injectPathwayContextWithoutSendingStats(
142-
span, record.headers(), SETTER, sortedTags);
143-
AvroSchemaExtractor.tryExtractProducer(record, span);
144-
}
145-
} catch (final IllegalStateException e) {
146-
// headers must be read-only from reused record. try again with new one.
147-
record =
148-
new ProducerRecord<>(
149-
record.topic(),
150-
record.partition(),
151-
record.timestamp(),
152-
record.key(),
153-
record.value(),
154-
record.headers());
155-
156-
propagate().inject(span, record.headers(), SETTER);
157-
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
158-
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
159-
propagate()
160-
.injectPathwayContextWithoutSendingStats(
161-
span, record.headers(), SETTER, sortedTags);
162-
AvroSchemaExtractor.tryExtractProducer(record, span);
163-
}
128+
setter = TextMapInjectAdapter.SETTER;
129+
}
130+
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
131+
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
132+
if (clusterId != null) {
133+
sortedTags.put(KAFKA_CLUSTER_ID_TAG, clusterId);
134+
}
135+
sortedTags.put(TOPIC_TAG, record.topic());
136+
sortedTags.put(TYPE_TAG, "kafka");
137+
try {
138+
propagate().inject(span, record.headers(), setter);
139+
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
140+
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
141+
// inject the context in the headers, but delay sending the stats until we know the
142+
// message size.
143+
// The stats are saved in the pathway context and sent in PayloadSizeAdvice.
144+
propagate()
145+
.injectPathwayContextWithoutSendingStats(span, record.headers(), setter, sortedTags);
146+
AvroSchemaExtractor.tryExtractProducer(record, span);
164147
}
165-
if (TIME_IN_QUEUE_ENABLED) {
166-
SETTER.injectTimeInQueue(record.headers());
148+
} catch (final IllegalStateException e) {
149+
// headers must be read-only from reused record. try again with new one.
150+
record =
151+
new ProducerRecord<>(
152+
record.topic(),
153+
record.partition(),
154+
record.timestamp(),
155+
record.key(),
156+
record.value(),
157+
record.headers());
158+
159+
propagate().inject(span, record.headers(), setter);
160+
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
161+
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
162+
propagate()
163+
.injectPathwayContextWithoutSendingStats(span, record.headers(), setter, sortedTags);
164+
AvroSchemaExtractor.tryExtractProducer(record, span);
167165
}
168166
}
169-
167+
if (TIME_IN_QUEUE_ENABLED) {
168+
setter.injectTimeInQueue(record.headers());
169+
}
170170
return activateSpan(span);
171171
}
172172

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import org.apache.kafka.common.header.Headers;
4+
5+
public class NoopTextMapInjectAdapter implements TextMapInjectAdapterInterface {
6+
7+
public static final NoopTextMapInjectAdapter NOOP_SETTER = new NoopTextMapInjectAdapter();
8+
9+
@Override
10+
public void set(final Headers headers, final String key, final String value) {}
11+
12+
@Override
13+
public void set(Headers headers, String key, byte[] value) {}
14+
15+
public void injectTimeInQueue(Headers headers) {}
16+
}

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_PRODUCED_KEY;
44

5-
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
65
import java.nio.ByteBuffer;
76
import java.nio.charset.StandardCharsets;
87
import org.apache.kafka.common.header.Headers;
98

10-
public class TextMapInjectAdapter implements AgentPropagation.BinarySetter<Headers> {
11-
9+
public class TextMapInjectAdapter implements TextMapInjectAdapterInterface {
1210
public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();
1311

1412
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
4+
import org.apache.kafka.common.header.Headers;
5+
6+
public interface TextMapInjectAdapterInterface extends AgentPropagation.BinarySetter<Headers> {
7+
public void injectTimeInQueue(Headers headers);
8+
}

dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,10 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
916916
void onMessage(ConsumerRecord<String, String> record) {
917917
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
918918
records.add(record)
919+
if (isDataStreamsEnabled()) {
920+
// even if header propagation is disabled, we want data streams to work.
921+
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
922+
}
919923
}
920924
})
921925

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public String instrumentedType() {
6464
@Override
6565
public String[] helperClassNames() {
6666
return new String[] {
67+
"datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapterInterface",
6768
"datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator",
6869
"datadog.trace.instrumentation.kafka_common.Utils",
6970
"datadog.trace.instrumentation.kafka_common.StreamingContext",

0 commit comments

Comments
 (0)