From f2a32a7836f3079baba87a174a810d110edcf8af Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Mon, 13 Apr 2026 15:28:51 -0600 Subject: [PATCH] Add Kafka cluster ID, topic, consumer group, and offset tags to spans Tag producer spans with messaging.kafka.cluster.id, topic, and broker-assigned partition/offset (via callback). Tag consumer spans with messaging.kafka.cluster.id, topic, consumer group, and offset. These tags enable linking from a span directly to a specific Kafka message. Previously the cluster ID was only extracted when Data Streams was enabled; MetadataState is now always initialized on the producer so cluster ID flows through regardless of DSM configuration. tag: ai generated Co-Authored-By: Claude Sonnet 4.6 --- .../KafkaConsumerInstrumentationHelper.java | 3 +-- .../kafka_clients/KafkaDecorator.java | 14 ++++++++++++-- .../kafka_clients/KafkaProducerCallback.java | 6 ++++++ .../KafkaProducerInstrumentation.java | 8 ++++---- .../kafka_clients/TracingIterator.java | 2 +- .../KafkaConsumerInstrumentationHelper.java | 3 +-- .../kafka_clients38/KafkaDecorator.java | 14 ++++++++++++-- .../kafka_clients38/KafkaProducerCallback.java | 6 ++++++ .../kafka_clients38/ProducerAdvice.java | 2 +- .../kafka_clients38/ProducerConstructorAdvice.java | 6 +++--- .../kafka_clients38/TracingIterator.java | 2 +- .../instrumentation/api/InstrumentationTags.java | 1 + 12 files changed, 49 insertions(+), 18 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java index 9b3faf8e33a..685a4d4f03f 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.kafka_clients; -import datadog.trace.api.Config; import datadog.trace.bootstrap.ContextStore; import datadog.trace.instrumentation.kafka_common.MetadataState; import org.apache.kafka.clients.Metadata; @@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) { public static String extractClusterId( KafkaConsumerInfo kafkaConsumerInfo, ContextStore metadataContextStore) { - if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) { + if (kafkaConsumerInfo != null) { Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata(); if (consumerMetadata != null) { MetadataState state = metadataContextStore.get(consumerMetadata); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index 18add9fdf3d..53c579a4dee 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; @@ -117,6 +118,7 @@ public void onConsume( final AgentSpan span, final ConsumerRecord record, String consumerGroup, + String clusterId, String bootstrapServers) { if (record != null) { final String topic = record.topic() == null ? "kafka" : record.topic(); @@ -127,7 +129,9 @@ public void onConsume( if (consumerGroup != null) { span.setTag(CONSUMER_GROUP, consumerGroup); } - + if (clusterId != null) { + span.setTag(KAFKA_CLUSTER_ID, clusterId); + } if (bootstrapServers != null) { span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); } @@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) { } public void onProduce( - final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) { + final AgentSpan span, + final ProducerRecord record, + final ProducerConfig producerConfig, + final String clusterId) { if (record != null) { if (record.partition() != null) { span.setTag(PARTITION, record.partition()); @@ -163,6 +170,9 @@ public void onProduce( PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent( producerConfig, BOOTSTRAP_SERVERS_JOINER)); } + if (clusterId != null) { + span.setTag(KAFKA_CLUSTER_ID, clusterId); + } final String topic = record.topic() == null ? "kafka" : record.topic(); span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); span.setTag(MESSAGING_DESTINATION_NAME, topic); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java index 83962b9c56e..d96d99916d1 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerCallback.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.kafka_clients; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE; import datadog.trace.api.datastreams.DataStreamsTags; @@ -30,6 +32,10 @@ public KafkaProducerCallback( @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (metadata != null) { + span.setTag(PARTITION, metadata.partition()); + span.setTag(OFFSET, metadata.offset()); + } PRODUCER_DECORATE.onError(span, exception); PRODUCER_DECORATE.beforeFinish(span); span.finish(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 87d74ac7cac..2bf584932eb 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -167,7 +167,7 @@ public static AgentScope onEnter( callbackParentSpan = localActiveSpan; } PRODUCER_DECORATE.afterStart(span); - PRODUCER_DECORATE.onProduce(span, record, producerConfig); + PRODUCER_DECORATE.onProduce(span, record, producerConfig, clusterId); callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); @@ -267,10 +267,10 @@ public static class ProducerConstructorAdvice { public static void captureConfiguration( @Advice.FieldValue("metadata") Metadata metadata, @Advice.Argument(0) ProducerConfig producerConfig) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class) + .putIfAbsent(metadata, MetadataState::new); if (Config.get().isDataStreamsEnabled()) { - MetadataState state = - InstrumentationContext.get(Metadata.class, MetadataState.class) - .putIfAbsent(metadata, MetadataState::new); KafkaConfigHelper.storePendingProducerConfig( state, KafkaConfigHelper.extractProducerConfig(producerConfig)); } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index 90de628e7bd..fca185438fe 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -142,7 +142,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { span.setTag(InstrumentationTags.TOMBSTONE, true); } decorator.afterStart(span); - decorator.onConsume(span, val, group, bootstrapServers); + decorator.onConsume(span, val, group, clusterId, bootstrapServers); if (InstrumenterConfig.get().isLegacyContextManagerEnabled()) { activateNext(span); } else { diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java index 94f95eff851..11c020e78e3 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.kafka_clients38; -import datadog.trace.api.Config; import datadog.trace.bootstrap.ContextStore; import datadog.trace.instrumentation.kafka_common.MetadataState; import org.apache.kafka.clients.Metadata; @@ -16,7 +15,7 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) { public static String extractClusterId( KafkaConsumerInfo kafkaConsumerInfo, ContextStore metadataContextStore) { - if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) { + if (kafkaConsumerInfo != null) { Metadata metadata = kafkaConsumerInfo.getmetadata().get(); if (metadata != null) { MetadataState state = metadataContextStore.get(metadata); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java index 8a3cd612718..d2d6f53b8a9 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaDecorator.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.CONSUMER_GROUP; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_CLUSTER_ID; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.MESSAGING_DESTINATION_NAME; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; @@ -117,6 +118,7 @@ public void onConsume( final AgentSpan span, final ConsumerRecord record, String consumerGroup, + String clusterId, String bootstrapServers) { if (record != null) { final String topic = record.topic() == null ? "kafka" : record.topic(); @@ -127,7 +129,9 @@ public void onConsume( if (consumerGroup != null) { span.setTag(CONSUMER_GROUP, consumerGroup); } - + if (clusterId != null) { + span.setTag(KAFKA_CLUSTER_ID, clusterId); + } if (bootstrapServers != null) { span.setTag(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); } @@ -152,7 +156,10 @@ public void onTimeInQueue(final AgentSpan span, final ConsumerRecord record) { } public void onProduce( - final AgentSpan span, final ProducerRecord record, final ProducerConfig producerConfig) { + final AgentSpan span, + final ProducerRecord record, + final ProducerConfig producerConfig, + final String clusterId) { if (record != null) { if (record.partition() != null) { span.setTag(PARTITION, record.partition()); @@ -163,6 +170,9 @@ public void onProduce( PRODUCER_BOOSTRAP_SERVERS_CACHE.computeIfAbsent( producerConfig, BOOTSTRAP_SERVERS_JOINER)); } + if (clusterId != null) { + span.setTag(KAFKA_CLUSTER_ID, clusterId); + } final String topic = record.topic() == null ? "kafka" : record.topic(); span.setResourceName(PRODUCER_RESOURCE_NAME_CACHE.computeIfAbsent(topic, PRODUCER_PREFIX)); span.setTag(MESSAGING_DESTINATION_NAME, topic); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java index 58c6bbbb8a1..898508dc541 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaProducerCallback.java @@ -1,6 +1,8 @@ package datadog.trace.instrumentation.kafka_clients38; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.OFFSET; +import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.PRODUCER_DECORATE; import datadog.trace.api.datastreams.DataStreamsTags; @@ -29,6 +31,10 @@ public KafkaProducerCallback( @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (metadata != null) { + span.setTag(PARTITION, metadata.partition()); + span.setTag(OFFSET, metadata.offset()); + } PRODUCER_DECORATE.onError(span, exception); PRODUCER_DECORATE.beforeFinish(span); span.finish(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index 483bd934b93..bf9930e8c2b 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -56,7 +56,7 @@ public static AgentScope onEnter( callbackParentSpan = localActiveSpan; } PRODUCER_DECORATE.afterStart(span); - PRODUCER_DECORATE.onProduce(span, record, producerConfig); + PRODUCER_DECORATE.onProduce(span, record, producerConfig, clusterId); callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java index f37611a4332..a97caca83b2 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerConstructorAdvice.java @@ -14,10 +14,10 @@ public class ProducerConstructorAdvice { public static void captureConfiguration( @Advice.FieldValue("metadata") Metadata metadata, @Advice.Argument(0) ProducerConfig producerConfig) { + MetadataState state = + InstrumentationContext.get(Metadata.class, MetadataState.class) + .putIfAbsent(metadata, MetadataState::new); if (Config.get().isDataStreamsEnabled()) { - MetadataState state = - InstrumentationContext.get(Metadata.class, MetadataState.class) - .putIfAbsent(metadata, MetadataState::new); KafkaConfigHelper.storePendingProducerConfig( state, KafkaConfigHelper.extractProducerConfig(producerConfig)); } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java index d3b4dd75a8b..28bf79c3c13 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -141,7 +141,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { span.setTag(InstrumentationTags.TOMBSTONE, true); } decorator.afterStart(span); - decorator.onConsume(span, val, group, bootstrapServers); + decorator.onConsume(span, val, group, clusterId, bootstrapServers); if (InstrumenterConfig.get().isLegacyContextManagerEnabled()) { activateNext(span); } else { diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java index c7397a18114..0c1054e7776 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InstrumentationTags.java @@ -102,6 +102,7 @@ public class InstrumentationTags { public static final String MESSAGE = "message"; public static final String HANDLER_TYPE = "handler.type"; public static final String KAFKA_BOOTSTRAP_SERVERS = "messaging.kafka.bootstrap.servers"; + public static final String KAFKA_CLUSTER_ID = "messaging.kafka.cluster.id"; public static final String MESSAGING_DESTINATION_NAME = "messaging.destination.name"; public static final String QUARTZ_JOB_NAME = "quartz.job.name"; public static final String QUARTZ_JOB_GROUP = "quartz.job.group";