Skip to content

Commit 860047f

Browse files
piochelepiotrdevflow.devflow-routing-intake
andauthored
Capture kafka producer and consumer configs (#10697)
wip wip remove test application update integration Fix producer instrumentation FQN and use allowlist for config capture - Fix MetadataState class name to use fully qualified name in KafkaProducerInstrumentation (helperClassNames and contextStore), which was causing producer spans to break due to context store mismatch - Replace sensitive keys denylist with an allowlist of safe config keys - Mask values of non-allowed keys with "****" instead of dropping them, to enable monitoring of which configs are seen but not yet allowed Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Apply spotless formatting Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Fix Groovy evaluation order bug in DataStreamsWritingTest In Groovy, `map[expr1] = expr2` evaluates expr2 before expr1, which swapped msgpack key/value reads. Use temp variables instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Fix CodeNarc PackageName violation in KafkaConfigHelperTest Move test to default package to avoid underscore in package name, matching the convention used by other kafka test files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Fix review issues: type values, debug logging, dedup map, and null checks - Use "kafka_producer"/"kafka_consumer" consistently in KafkaConfigHelper - Remove log.warn debug logging from MsgPackDatastreamsPayloadWriter - Remove unbounded reportedKafkaConfigs dedup map; configs flow through StatsBucket like other stats points - Make MASKED_VALUE and ALLOWED_KEYS public in KafkaConfigHelper - Remove redundant null checks in writeKafkaConfigs (constructor normalizes) - Update tests to reflect removal of dedup behavior Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Fix context store type mismatch in producer context propagation advices The ContextPropagationAdvice classes were still using InstrumentationContext.get(Metadata.class, String.class) after the context store value type was changed from String to MetadataState, causing an instrumentation error at runtime. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Use putIfAbsent for MetadataState context store to avoid double lookup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Use factory overload of putIfAbsent to avoid unnecessary MetadataState allocation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: devflow.devflow-routing-intake <devflow.devflow-routing-intake@kubernetes.us1.ddbuild.io>
1 parent daa425b commit 860047f

File tree

39 files changed

+1404
-47
lines changed

39 files changed

+1404
-47
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import datadog.trace.api.datastreams.DataStreamsTags;
1111
import datadog.trace.bootstrap.InstrumentationContext;
1212
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
13+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1314
import java.util.HashMap;
1415
import java.util.Map;
1516
import net.bytebuddy.asm.Advice;
@@ -42,7 +43,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4243
@Override
4344
public Map<String, String> contextStore() {
4445
Map<String, String> contextStores = new HashMap<>();
45-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
46+
contextStores.put(
47+
"org.apache.kafka.clients.Metadata",
48+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4649
contextStores.put(
4750
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
4851
KafkaConsumerInfo.class.getName());
@@ -56,7 +59,11 @@ public String instrumentedType() {
5659

5760
@Override
5861
public String[] helperClassNames() {
59-
return new String[] {packageName + ".KafkaConsumerInfo"};
62+
return new String[] {
63+
packageName + ".KafkaConsumerInfo",
64+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
65+
"datadog.trace.instrumentation.kafka_common.MetadataState",
66+
};
6067
}
6168

6269
@Override
@@ -90,7 +97,9 @@ public static void trackCommitOffset(
9097
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
9198
String clusterId = null;
9299
if (consumerMetadata != null) {
93-
clusterId = InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
100+
MetadataState metadataState =
101+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
102+
clusterId = metadataState != null ? metadataState.clusterId : null;
94103
}
95104

96105
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2424
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2525
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
26+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
27+
import datadog.trace.instrumentation.kafka_common.MetadataState;
2628
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
@@ -60,7 +62,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
6062
@Override
6163
public Map<String, String> contextStore() {
6264
Map<String, String> contextStores = new HashMap<>();
63-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
65+
contextStores.put(
66+
"org.apache.kafka.clients.Metadata",
67+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6468
contextStores.put(
6569
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
6670
contextStores.put(
@@ -82,6 +86,9 @@ public String[] helperClassNames() {
8286
packageName + ".KafkaDecorator",
8387
packageName + ".KafkaConsumerInfo",
8488
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
89+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
90+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
91+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8592
};
8693
}
8794

@@ -148,6 +155,16 @@ public static void captureGroup(
148155
.put(coordinator, kafkaConsumerInfo);
149156
}
150157
}
158+
159+
if (Config.get().isDataStreamsEnabled()) {
160+
MetadataState state =
161+
InstrumentationContext.get(Metadata.class, MetadataState.class)
162+
.putIfAbsent(metadata, MetadataState::new);
163+
KafkaConfigHelper.storePendingConsumerConfig(
164+
state,
165+
normalizedConsumerGroup,
166+
KafkaConfigHelper.extractConsumerConfig(consumerConfig));
167+
}
151168
}
152169

153170
public static void muzzleCheck(ConsumerRecord record) {
@@ -191,6 +208,16 @@ public static void captureGroup(
191208
.put(coordinator, kafkaConsumerInfo);
192209
}
193210
}
211+
212+
if (Config.get().isDataStreamsEnabled()) {
213+
MetadataState state =
214+
InstrumentationContext.get(Metadata.class, MetadataState.class)
215+
.putIfAbsent(metadata, MetadataState::new);
216+
KafkaConfigHelper.storePendingConsumerConfig(
217+
state,
218+
normalizedConsumerGroup,
219+
KafkaConfigHelper.extractConsumerConfigFromMap(consumerConfig));
220+
}
194221
}
195222

196223
public static void muzzleCheck(ConsumerRecord record) {
@@ -214,8 +241,9 @@ public static AgentScope onEnter(@Advice.This KafkaConsumer consumer) {
214241
if (kafkaConsumerInfo != null && Config.get().isDataStreamsEnabled()) {
215242
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
216243
if (consumerMetadata != null) {
217-
String clusterId =
218-
InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
244+
MetadataState metadataState =
245+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
246+
String clusterId = metadataState != null ? metadataState.clusterId : null;
219247
if (clusterId != null) {
220248
ClusterIdHolder.set(clusterId);
221249
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import datadog.trace.agent.tooling.Instrumenter;
1616
import datadog.trace.agent.tooling.InstrumenterModule;
1717
import datadog.trace.bootstrap.InstrumentationContext;
18+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1819
import java.util.HashMap;
1920
import java.util.Iterator;
2021
import java.util.List;
@@ -46,7 +47,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4647
@Override
4748
public Map<String, String> contextStore() {
4849
Map<String, String> contextStores = new HashMap<>(2);
49-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
50+
contextStores.put(
51+
"org.apache.kafka.clients.Metadata",
52+
"datadog.trace.instrumentation.kafka_common.MetadataState");
5053
contextStores.put(
5154
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
5255
return contextStores;
@@ -73,6 +76,8 @@ public String[] helperClassNames() {
7376
packageName + ".TextMapInjectAdapter",
7477
"datadog.trace.instrumentation.kafka_common.Utils",
7578
"datadog.trace.instrumentation.kafka_common.StreamingContext",
79+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
80+
"datadog.trace.instrumentation.kafka_common.MetadataState",
7681
};
7782
}
7883

@@ -113,7 +118,7 @@ public static void wrap(
113118
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
114119
String clusterId =
115120
KafkaConsumerInstrumentationHelper.extractClusterId(
116-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
121+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
117122
String bootstrapServers =
118123
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
119124
iterable =
@@ -135,7 +140,7 @@ public static void wrap(
135140
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
136141
String clusterId =
137142
KafkaConsumerInstrumentationHelper.extractClusterId(
138-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
143+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
139144
String bootstrapServers =
140145
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
141146
iterable =
@@ -157,7 +162,7 @@ public static void wrap(
157162
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
158163
String clusterId =
159164
KafkaConsumerInstrumentationHelper.extractClusterId(
160-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
165+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
161166
String bootstrapServers =
162167
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
163168
iterator =

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datadog.trace.api.Config;
44
import datadog.trace.bootstrap.ContextStore;
5+
import datadog.trace.instrumentation.kafka_common.MetadataState;
56
import org.apache.kafka.clients.Metadata;
67

78
public class KafkaConsumerInstrumentationHelper {
@@ -13,11 +14,13 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1314
}
1415

1516
public static String extractClusterId(
16-
KafkaConsumerInfo kafkaConsumerInfo, ContextStore<Metadata, String> metadataContextStore) {
17+
KafkaConsumerInfo kafkaConsumerInfo,
18+
ContextStore<Metadata, MetadataState> metadataContextStore) {
1719
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
1820
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
1921
if (consumerMetadata != null) {
20-
return metadataContextStore.get(consumerMetadata);
22+
MetadataState state = metadataContextStore.get(consumerMetadata);
23+
return state != null ? state.clusterId : null;
2124
}
2225
}
2326
return null;

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
1919
import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER;
2020
import static java.util.Collections.singletonMap;
21+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
2122
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
2223
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
2324
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@@ -42,6 +43,8 @@
4243
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
4344
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
4445
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
46+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
47+
import datadog.trace.instrumentation.kafka_common.MetadataState;
4548
import java.util.Map;
4649
import net.bytebuddy.asm.Advice;
4750
import net.bytebuddy.matcher.ElementMatcher;
@@ -88,17 +91,29 @@ public String[] helperClassNames() {
8891
"datadog.trace.instrumentation.kafka_common.StreamingContext",
8992
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
9093
"datadog.trace.instrumentation.kafka_common.Utils",
94+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
95+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
96+
"datadog.trace.instrumentation.kafka_common.MetadataState",
9197
packageName + ".AvroSchemaExtractor",
9298
};
9399
}
94100

95101
@Override
96102
public Map<String, String> contextStore() {
97-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
103+
return singletonMap(
104+
"org.apache.kafka.clients.Metadata",
105+
"datadog.trace.instrumentation.kafka_common.MetadataState");
98106
}
99107

100108
@Override
101109
public void methodAdvice(MethodTransformer transformer) {
110+
transformer.applyAdvice(
111+
isConstructor()
112+
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerConfig")))
113+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Serializer")))
114+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Serializer"))),
115+
KafkaProducerInstrumentation.class.getName() + "$ProducerConstructorAdvice");
116+
102117
transformer.applyAdvices(
103118
isMethod()
104119
.and(isPublic())
@@ -126,7 +141,9 @@ public static AgentScope onEnter(
126141
@Advice.FieldValue("metadata") Metadata metadata,
127142
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
128143
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
129-
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
144+
MetadataState metadataState =
145+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
146+
String clusterId = metadataState != null ? metadataState.clusterId : null;
130147

131148
// Set cluster ID for Schema Registry instrumentation
132149
if (clusterId != null) {
@@ -183,7 +200,9 @@ public static void onEnter(
183200
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record) {
184201
AgentSpan span = activeSpan();
185202
if (span == null) return;
186-
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
203+
MetadataState metadataState =
204+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
205+
String clusterId = metadataState != null ? metadataState.clusterId : null;
187206
TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER;
188207
// Do not inject headers for batch versions below 2
189208
// This is how similar check is being done in Kafka client itself:
@@ -243,6 +262,21 @@ record =
243262
}
244263
}
245264

265+
public static class ProducerConstructorAdvice {
266+
@Advice.OnMethodExit(suppress = Throwable.class)
267+
public static void captureConfiguration(
268+
@Advice.FieldValue("metadata") Metadata metadata,
269+
@Advice.Argument(0) ProducerConfig producerConfig) {
270+
if (Config.get().isDataStreamsEnabled()) {
271+
MetadataState state =
272+
InstrumentationContext.get(Metadata.class, MetadataState.class)
273+
.putIfAbsent(metadata, MetadataState::new);
274+
KafkaConfigHelper.storePendingProducerConfig(
275+
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
276+
}
277+
}
278+
}
279+
246280
public static class PayloadSizeAdvice {
247281

248282
/**

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import datadog.trace.agent.tooling.Instrumenter;
1313
import datadog.trace.agent.tooling.InstrumenterModule;
1414
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
16+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1517
import java.util.Map;
1618
import net.bytebuddy.asm.Advice;
1719
import net.bytebuddy.description.type.TypeDescription;
@@ -51,12 +53,19 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
5153

5254
@Override
5355
public String[] helperClassNames() {
54-
return new String[] {packageName + ".KafkaDecorator"};
56+
return new String[] {
57+
packageName + ".KafkaDecorator",
58+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
59+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
60+
"datadog.trace.instrumentation.kafka_common.MetadataState",
61+
};
5562
}
5663

5764
@Override
5865
public Map<String, String> contextStore() {
59-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
66+
return singletonMap(
67+
"org.apache.kafka.clients.Metadata",
68+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6069
}
6170

6271
@Override
@@ -78,8 +87,12 @@ public static class MetadataUpdateBefore22Advice {
7887
public static void onEnter(
7988
@Advice.This final Metadata metadata, @Advice.Argument(0) final Cluster newCluster) {
8089
if (newCluster != null && !newCluster.isBootstrapConfigured()) {
81-
InstrumentationContext.get(Metadata.class, String.class)
82-
.put(metadata, newCluster.clusterResource().clusterId());
90+
String clusterId = newCluster.clusterResource().clusterId();
91+
MetadataState state =
92+
InstrumentationContext.get(Metadata.class, MetadataState.class)
93+
.putIfAbsent(metadata, MetadataState::new);
94+
state.clusterId = clusterId;
95+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
8396
}
8497
}
8598

@@ -95,8 +108,12 @@ public static class MetadataUpdate22AndAfterAdvice {
95108
public static void onEnter(
96109
@Advice.This final Metadata metadata, @Advice.Argument(1) final MetadataResponse response) {
97110
if (response != null) {
98-
InstrumentationContext.get(Metadata.class, String.class)
99-
.put(metadata, response.clusterId());
111+
String clusterId = response.clusterId();
112+
MetadataState state =
113+
InstrumentationContext.get(Metadata.class, MetadataState.class)
114+
.putIfAbsent(metadata, MetadataState::new);
115+
state.clusterId = clusterId;
116+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
100117
}
101118
}
102119

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727
@Override
2828
public Map<String, String> contextStore() {
2929
Map<String, String> contextStores = new HashMap<>(2);
30-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
30+
contextStores.put(
31+
"org.apache.kafka.clients.Metadata",
32+
"datadog.trace.instrumentation.kafka_common.MetadataState");
3133
contextStores.put(
3234
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
3335
KafkaConsumerInfo.class.getName());
@@ -41,7 +43,11 @@ public String instrumentedType() {
4143

4244
@Override
4345
public String[] helperClassNames() {
44-
return new String[] {packageName + ".KafkaConsumerInfo"};
46+
return new String[] {
47+
packageName + ".KafkaConsumerInfo",
48+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
49+
"datadog.trace.instrumentation.kafka_common.MetadataState",
50+
};
4551
}
4652

4753
@Override

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4141
@Override
4242
public Map<String, String> contextStore() {
4343
Map<String, String> contextStores = new HashMap<>(4);
44-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
44+
contextStores.put(
45+
"org.apache.kafka.clients.Metadata",
46+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4547
contextStores.put(
4648
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
4749
// new- here we are storing the callbackinvoker and consumerdelegate in the context store
@@ -77,6 +79,9 @@ public String[] helperClassNames() {
7779
packageName + ".KafkaConsumerInfo",
7880
packageName + ".KafkaConsumerInstrumentationHelper",
7981
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
82+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
83+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
84+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8085
};
8186
}
8287

0 commit comments

Comments
 (0)