Skip to content

Commit d36f7e7

Browse files
piochelepiotrdevflow.devflow-routing-intake
andauthored
Add Kafka cluster ID and offset tags to producer/consumer spans (#11107)
Add Kafka cluster ID, topic, consumer group, and offset tags to spans Tag producer spans with messaging.kafka.cluster.id, topic, and broker-assigned partition/offset (via callback). Tag consumer spans with messaging.kafka.cluster.id, topic, consumer group, and offset. These tags enable linking from a span directly to a specific Kafka message. Previously the cluster ID was only extracted when Data Streams was enabled; MetadataState is now always initialized on the producer so cluster ID flows through regardless of DSM configuration. tag: ai generated Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Update test assertions for new Kafka span tags Add assertions for messaging.kafka.cluster.id, partition, and offset tags that are now set on producer and consumer spans. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent 81e2294 commit d36f7e7

19 files changed

Lines changed: 83 additions & 27 deletions

File tree

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
302302
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
303303
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
304304
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
305-
if (partitioned) {
306-
"$InstrumentationTags.PARTITION" { it >= 0 }
307-
}
305+
"$InstrumentationTags.PARTITION" { it >= 0 }
306+
"$InstrumentationTags.OFFSET" { it >= 0 }
307+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
308308
if (tombstone) {
309309
"$InstrumentationTags.TOMBSTONE" true
310310
}
@@ -381,6 +381,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
381381
"$InstrumentationTags.OFFSET" { offset.containsWithinBounds(it as int) }
382382
"$InstrumentationTags.CONSUMER_GROUP" "sender"
383383
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
384+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
384385
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
385386
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 }
386387
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3-
import datadog.trace.api.Config;
43
import datadog.trace.bootstrap.ContextStore;
54
import datadog.trace.instrumentation.kafka_common.MetadataState;
65
import org.apache.kafka.clients.Metadata;
@@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1615
public static String extractClusterId(
1716
KafkaConsumerInfo kafkaConsumerInfo,
1817
ContextStore<Metadata, MetadataState> metadataContextStore) {
19-
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
18+
if (kafkaConsumerInfo != null) {
2019
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
2120
if (consumerMetadata != null) {
2221
MetadataState state = metadataContextStore.get(consumerMetadata);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
@@ -117,6 +118,7 @@ public void onConsume(
117118
final AgentSpan span,
118119
final ConsumerRecord record,
119120
String consumerGroup,
121+
String clusterId,
120122
String bootstrapServers) {
121123
if (record != null) {
122124
final String topic = record.topic() == null ? "kafka" : record.topic();
@@ -127,7 +129,9 @@ public void onConsume(
127129
if (consumerGroup != null) {
128130
span.setTag(CONSUMER_GROUP, consumerGroup);
129131
}
130-
132+
if (clusterId != null) {
133+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
134+
}
131135
if (bootstrapServers != null) {
132136
span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
133137
}
@@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) {
152156
}
153157

154158
public void onProduce(
155-
final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) {
159+
final AgentSpan span,
160+
final ProducerRecord record,
161+
final ProducerConfig producerConfig,
162+
final String clusterId) {
156163
if (record != null) {
157164
if (record.partition() != null) {
158165
span.setTag(PARTITION, record.partition());
@@ -163,6 +170,9 @@ public void onProduce(
163170
PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent(
164171
producerConfig, BOOTSTRAP_SERVERS_JOINER));
165172
}
173+
if (clusterId != null) {
174+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
175+
}
166176
final String topic = record.topic() == null ? "kafka" : record.topic();
167177
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
168178
span.setTag(MESSAGING_DESTINATION_NAME, topic);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

33
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
46
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
57

68
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -30,6 +32,10 @@ public KafkaProducerCallback(
3032

3133
@Override
3234
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
35+
if (metadata != null) {
36+
span.setTag(PARTITION, metadata.partition());
37+
span.setTag(OFFSET, metadata.offset());
38+
}
3339
PRODUCER_DECORATE.onError(span, exception);
3440
PRODUCER_DECORATE.beforeFinish(span);
3541
span.finish();

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static AgentScope onEnter(
167167
callbackParentSpan = localActiveSpan;
168168
}
169169
PRODUCER_DECORATE.afterStart(span);
170-
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
170+
PRODUCER_DECORATE.onProduce(span, record, producerConfig, clusterId);
171171

172172
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
173173

@@ -267,10 +267,10 @@ public static class ProducerConstructorAdvice {
267267
public static void captureConfiguration(
268268
@Advice.FieldValue("metadata") Metadata metadata,
269269
@Advice.Argument(0) ProducerConfig producerConfig) {
270+
MetadataState state =
271+
InstrumentationContext.get(Metadata.class, MetadataState.class)
272+
.putIfAbsent(metadata, MetadataState::new);
270273
if (Config.get().isDataStreamsEnabled()) {
271-
MetadataState state =
272-
InstrumentationContext.get(Metadata.class, MetadataState.class)
273-
.putIfAbsent(metadata, MetadataState::new);
274274
KafkaConfigHelper.storePendingProducerConfig(
275275
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
276276
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
142142
span.setTag(InstrumentationTags.TOMBSTONE, true);
143143
}
144144
decorator.afterStart(span);
145-
decorator.onConsume(span, val, group, bootstrapServers);
145+
decorator.onConsume(span, val, group, clusterId, bootstrapServers);
146146
if (InstrumenterConfig.get().isLegacyContextManagerEnabled()) {
147147
activateNext(span);
148148
} else {

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,9 +1205,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
12051205
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
12061206
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
12071207
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"
1208-
if (partitioned) {
1209-
"$InstrumentationTags.PARTITION" { it >= 0 }
1210-
}
1208+
"$InstrumentationTags.PARTITION" { it >= 0 }
1209+
"$InstrumentationTags.OFFSET" { it >= 0 }
1210+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
12111211
if (tombstone) {
12121212
"$InstrumentationTags.TOMBSTONE" true
12131213
}
@@ -1284,6 +1284,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
12841284
"$InstrumentationTags.OFFSET" { offset.containsWithinBounds(it as int) }
12851285
"$InstrumentationTags.CONSUMER_GROUP" "sender"
12861286
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
1287+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
12871288
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
12881289
"$InstrumentationTags.RECORD_END_TO_END_DURATION_MS" { it >= 0 }
12891290
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$SHARED_TOPIC"

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaReactorForkedTest.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ class KafkaReactorForkedTest extends InstrumentationSpecification {
193193
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
194194
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
195195
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC"
196+
"$InstrumentationTags.PARTITION" { it >= 0 }
197+
"$InstrumentationTags.OFFSET" { it >= 0 }
198+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
196199
peerServiceFrom(InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS)
197200
defaultTags()
198201
}
@@ -222,6 +225,7 @@ class KafkaReactorForkedTest extends InstrumentationSpecification {
222225
"$InstrumentationTags.OFFSET" { Integer }
223226
"$InstrumentationTags.CONSUMER_GROUP" "sender"
224227
"$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" config.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
228+
"$InstrumentationTags.KAFKA_CLUSTER_ID" { String }
225229
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
226230
"$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$KafkaClientTestBase.SHARED_TOPIC"
227231
defaultTags(true)

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3-
import datadog.trace.api.Config;
43
import datadog.trace.bootstrap.ContextStore;
54
import datadog.trace.instrumentation.kafka_common.MetadataState;
65
import org.apache.kafka.clients.Metadata;
@@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1615
public static String extractClusterId(
1716
KafkaConsumerInfo kafkaConsumerInfo,
1817
ContextStore<Metadata, MetadataState> metadataContextStore) {
19-
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
18+
if (kafkaConsumerInfo != null) {
2019
Metadata metadata = kafkaConsumerInfo.getmetadata().get();
2120
if (metadata != null) {
2221
MetadataState state = metadataContextStore.get(metadata);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP;
44
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS;
5+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID;
56
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME;
67
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET;
78
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION;
@@ -117,6 +118,7 @@ public void onConsume(
117118
final AgentSpan span,
118119
final ConsumerRecord record,
119120
String consumerGroup,
121+
String clusterId,
120122
String bootstrapServers) {
121123
if (record != null) {
122124
final String topic = record.topic() == null ? "kafka" : record.topic();
@@ -127,7 +129,9 @@ public void onConsume(
127129
if (consumerGroup != null) {
128130
span.setTag(CONSUMER_GROUP, consumerGroup);
129131
}
130-
132+
if (clusterId != null) {
133+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
134+
}
131135
if (bootstrapServers != null) {
132136
span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
133137
}
@@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) {
152156
}
153157

154158
public void onProduce(
155-
final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) {
159+
final AgentSpan span,
160+
final ProducerRecord record,
161+
final ProducerConfig producerConfig,
162+
final String clusterId) {
156163
if (record != null) {
157164
if (record.partition() != null) {
158165
span.setTag(PARTITION, record.partition());
@@ -163,6 +170,9 @@ public void onProduce(
163170
PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent(
164171
producerConfig, BOOTSTRAP_SERVERS_JOINER));
165172
}
173+
if (clusterId != null) {
174+
span.setTag(KAFKA_CLUSTER_ID, clusterId);
175+
}
166176
final String topic = record.topic() == null ? "kafka" : record.topic();
167177
span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX));
168178
span.setTag(MESSAGING_DESTINATION_NAME, topic);

0 commit comments

Comments
 (0)