Skip to content

Commit ccc7267

Browse files
[skip ci] commit bdbdf30
2 parents 158a9e1 + bdbdf30 commit ccc7267

39 files changed

Lines changed: 1404 additions & 47 deletions

File tree

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import datadog.trace.api.datastreams.DataStreamsTags;
1111
import datadog.trace.bootstrap.InstrumentationContext;
1212
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
13+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1314
import java.util.HashMap;
1415
import java.util.Map;
1516
import net.bytebuddy.asm.Advice;
@@ -42,7 +43,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4243
@Override
4344
public Map<String, String> contextStore() {
4445
Map<String, String> contextStores = new HashMap<>();
45-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
46+
contextStores.put(
47+
"org.apache.kafka.clients.Metadata",
48+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4649
contextStores.put(
4750
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
4851
KafkaConsumerInfo.class.getName());
@@ -56,7 +59,11 @@ public String instrumentedType() {
5659

5760
@Override
5861
public String[] helperClassNames() {
59-
return new String[] {packageName + ".KafkaConsumerInfo"};
62+
return new String[] {
63+
packageName + ".KafkaConsumerInfo",
64+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
65+
"datadog.trace.instrumentation.kafka_common.MetadataState",
66+
};
6067
}
6168

6269
@Override
@@ -90,7 +97,9 @@ public static void trackCommitOffset(
9097
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
9198
String clusterId = null;
9299
if (consumerMetadata != null) {
93-
clusterId = InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
100+
MetadataState metadataState =
101+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
102+
clusterId = metadataState != null ? metadataState.clusterId : null;
94103
}
95104

96105
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2424
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2525
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
26+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
27+
import datadog.trace.instrumentation.kafka_common.MetadataState;
2628
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
@@ -60,7 +62,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
6062
@Override
6163
public Map<String, String> contextStore() {
6264
Map<String, String> contextStores = new HashMap<>();
63-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
65+
contextStores.put(
66+
"org.apache.kafka.clients.Metadata",
67+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6468
contextStores.put(
6569
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
6670
contextStores.put(
@@ -82,6 +86,9 @@ public String[] helperClassNames() {
8286
packageName + ".KafkaDecorator",
8387
packageName + ".KafkaConsumerInfo",
8488
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
89+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
90+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
91+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8592
};
8693
}
8794

@@ -148,6 +155,16 @@ public static void captureGroup(
148155
.put(coordinator, kafkaConsumerInfo);
149156
}
150157
}
158+
159+
if (Config.get().isDataStreamsEnabled()) {
160+
MetadataState state =
161+
InstrumentationContext.get(Metadata.class, MetadataState.class)
162+
.putIfAbsent(metadata, MetadataState::new);
163+
KafkaConfigHelper.storePendingConsumerConfig(
164+
state,
165+
normalizedConsumerGroup,
166+
KafkaConfigHelper.extractConsumerConfig(consumerConfig));
167+
}
151168
}
152169

153170
public static void muzzleCheck(ConsumerRecord record) {
@@ -191,6 +208,16 @@ public static void captureGroup(
191208
.put(coordinator, kafkaConsumerInfo);
192209
}
193210
}
211+
212+
if (Config.get().isDataStreamsEnabled()) {
213+
MetadataState state =
214+
InstrumentationContext.get(Metadata.class, MetadataState.class)
215+
.putIfAbsent(metadata, MetadataState::new);
216+
KafkaConfigHelper.storePendingConsumerConfig(
217+
state,
218+
normalizedConsumerGroup,
219+
KafkaConfigHelper.extractConsumerConfigFromMap(consumerConfig));
220+
}
194221
}
195222

196223
public static void muzzleCheck(ConsumerRecord record) {
@@ -214,8 +241,9 @@ public static AgentScope onEnter(@Advice.This KafkaConsumer consumer) {
214241
if (kafkaConsumerInfo != null && Config.get().isDataStreamsEnabled()) {
215242
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
216243
if (consumerMetadata != null) {
217-
String clusterId =
218-
InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
244+
MetadataState metadataState =
245+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
246+
String clusterId = metadataState != null ? metadataState.clusterId : null;
219247
if (clusterId != null) {
220248
ClusterIdHolder.set(clusterId);
221249
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import datadog.trace.agent.tooling.Instrumenter;
1616
import datadog.trace.agent.tooling.InstrumenterModule;
1717
import datadog.trace.bootstrap.InstrumentationContext;
18+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1819
import java.util.HashMap;
1920
import java.util.Iterator;
2021
import java.util.List;
@@ -46,7 +47,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4647
@Override
4748
public Map<String, String> contextStore() {
4849
Map<String, String> contextStores = new HashMap<>(2);
49-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
50+
contextStores.put(
51+
"org.apache.kafka.clients.Metadata",
52+
"datadog.trace.instrumentation.kafka_common.MetadataState");
5053
contextStores.put(
5154
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
5255
return contextStores;
@@ -73,6 +76,8 @@ public String[] helperClassNames() {
7376
packageName + ".TextMapInjectAdapter",
7477
"datadog.trace.instrumentation.kafka_common.Utils",
7578
"datadog.trace.instrumentation.kafka_common.StreamingContext",
79+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
80+
"datadog.trace.instrumentation.kafka_common.MetadataState",
7681
};
7782
}
7883

@@ -113,7 +118,7 @@ public static void wrap(
113118
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
114119
String clusterId =
115120
KafkaConsumerInstrumentationHelper.extractClusterId(
116-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
121+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
117122
String bootstrapServers =
118123
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
119124
iterable =
@@ -135,7 +140,7 @@ public static void wrap(
135140
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
136141
String clusterId =
137142
KafkaConsumerInstrumentationHelper.extractClusterId(
138-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
143+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
139144
String bootstrapServers =
140145
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
141146
iterable =
@@ -157,7 +162,7 @@ public static void wrap(
157162
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
158163
String clusterId =
159164
KafkaConsumerInstrumentationHelper.extractClusterId(
160-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
165+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
161166
String bootstrapServers =
162167
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
163168
iterator =

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datadog.trace.api.Config;
44
import datadog.trace.bootstrap.ContextStore;
5+
import datadog.trace.instrumentation.kafka_common.MetadataState;
56
import org.apache.kafka.clients.Metadata;
67

78
public class KafkaConsumerInstrumentationHelper {
@@ -13,11 +14,13 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1314
}
1415

1516
public static String extractClusterId(
16-
KafkaConsumerInfo kafkaConsumerInfo, ContextStore<Metadata, String> metadataContextStore) {
17+
KafkaConsumerInfo kafkaConsumerInfo,
18+
ContextStore<Metadata, MetadataState> metadataContextStore) {
1719
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
1820
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
1921
if (consumerMetadata != null) {
20-
return metadataContextStore.get(consumerMetadata);
22+
MetadataState state = metadataContextStore.get(consumerMetadata);
23+
return state != null ? state.clusterId : null;
2124
}
2225
}
2326
return null;

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
1919
import static datadog.trace.instrumentation.kafka_common.Utils.DSM_TRANSACTION_SOURCE_READER;
2020
import static java.util.Collections.singletonMap;
21+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
2122
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
2223
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
2324
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@@ -42,6 +43,8 @@
4243
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
4344
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
4445
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
46+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
47+
import datadog.trace.instrumentation.kafka_common.MetadataState;
4548
import java.util.Map;
4649
import net.bytebuddy.asm.Advice;
4750
import net.bytebuddy.matcher.ElementMatcher;
@@ -88,17 +91,29 @@ public String[] helperClassNames() {
8891
"datadog.trace.instrumentation.kafka_common.StreamingContext",
8992
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
9093
"datadog.trace.instrumentation.kafka_common.Utils",
94+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
95+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
96+
"datadog.trace.instrumentation.kafka_common.MetadataState",
9197
packageName + ".AvroSchemaExtractor",
9298
};
9399
}
94100

95101
@Override
96102
public Map<String, String> contextStore() {
97-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
103+
return singletonMap(
104+
"org.apache.kafka.clients.Metadata",
105+
"datadog.trace.instrumentation.kafka_common.MetadataState");
98106
}
99107

100108
@Override
101109
public void methodAdvice(MethodTransformer transformer) {
110+
transformer.applyAdvice(
111+
isConstructor()
112+
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerConfig")))
113+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Serializer")))
114+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Serializer"))),
115+
KafkaProducerInstrumentation.class.getName() + "$ProducerConstructorAdvice");
116+
102117
transformer.applyAdvices(
103118
isMethod()
104119
.and(isPublic())
@@ -126,7 +141,9 @@ public static AgentScope onEnter(
126141
@Advice.FieldValue("metadata") Metadata metadata,
127142
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
128143
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
129-
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
144+
MetadataState metadataState =
145+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
146+
String clusterId = metadataState != null ? metadataState.clusterId : null;
130147

131148
// Set cluster ID for Schema Registry instrumentation
132149
if (clusterId != null) {
@@ -183,7 +200,9 @@ public static void onEnter(
183200
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record) {
184201
AgentSpan span = activeSpan();
185202
if (span == null) return;
186-
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
203+
MetadataState metadataState =
204+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
205+
String clusterId = metadataState != null ? metadataState.clusterId : null;
187206
TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER;
188207
// Do not inject headers for batch versions below 2
189208
// This is how similar check is being done in Kafka client itself:
@@ -243,6 +262,21 @@ record =
243262
}
244263
}
245264

265+
public static class ProducerConstructorAdvice {
266+
@Advice.OnMethodExit(suppress = Throwable.class)
267+
public static void captureConfiguration(
268+
@Advice.FieldValue("metadata") Metadata metadata,
269+
@Advice.Argument(0) ProducerConfig producerConfig) {
270+
if (Config.get().isDataStreamsEnabled()) {
271+
MetadataState state =
272+
InstrumentationContext.get(Metadata.class, MetadataState.class)
273+
.putIfAbsent(metadata, MetadataState::new);
274+
KafkaConfigHelper.storePendingProducerConfig(
275+
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
276+
}
277+
}
278+
}
279+
246280
public static class PayloadSizeAdvice {
247281

248282
/**

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import datadog.trace.agent.tooling.Instrumenter;
1313
import datadog.trace.agent.tooling.InstrumenterModule;
1414
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
16+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1517
import java.util.Map;
1618
import net.bytebuddy.asm.Advice;
1719
import net.bytebuddy.description.type.TypeDescription;
@@ -51,12 +53,19 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
5153

5254
@Override
5355
public String[] helperClassNames() {
54-
return new String[] {packageName + ".KafkaDecorator"};
56+
return new String[] {
57+
packageName + ".KafkaDecorator",
58+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
59+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
60+
"datadog.trace.instrumentation.kafka_common.MetadataState",
61+
};
5562
}
5663

5764
@Override
5865
public Map<String, String> contextStore() {
59-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
66+
return singletonMap(
67+
"org.apache.kafka.clients.Metadata",
68+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6069
}
6170

6271
@Override
@@ -78,8 +87,12 @@ public static class MetadataUpdateBefore22Advice {
7887
public static void onEnter(
7988
@Advice.This final Metadata metadata, @Advice.Argument(0) final Cluster newCluster) {
8089
if (newCluster != null && !newCluster.isBootstrapConfigured()) {
81-
InstrumentationContext.get(Metadata.class, String.class)
82-
.put(metadata, newCluster.clusterResource().clusterId());
90+
String clusterId = newCluster.clusterResource().clusterId();
91+
MetadataState state =
92+
InstrumentationContext.get(Metadata.class, MetadataState.class)
93+
.putIfAbsent(metadata, MetadataState::new);
94+
state.clusterId = clusterId;
95+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
8396
}
8497
}
8598

@@ -95,8 +108,12 @@ public static class MetadataUpdate22AndAfterAdvice {
95108
public static void onEnter(
96109
@Advice.This final Metadata metadata, @Advice.Argument(1) final MetadataResponse response) {
97110
if (response != null) {
98-
InstrumentationContext.get(Metadata.class, String.class)
99-
.put(metadata, response.clusterId());
111+
String clusterId = response.clusterId();
112+
MetadataState state =
113+
InstrumentationContext.get(Metadata.class, MetadataState.class)
114+
.putIfAbsent(metadata, MetadataState::new);
115+
state.clusterId = clusterId;
116+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
100117
}
101118
}
102119

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727
@Override
2828
public Map<String, String> contextStore() {
2929
Map<String, String> contextStores = new HashMap<>(2);
30-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
30+
contextStores.put(
31+
"org.apache.kafka.clients.Metadata",
32+
"datadog.trace.instrumentation.kafka_common.MetadataState");
3133
contextStores.put(
3234
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
3335
KafkaConsumerInfo.class.getName());
@@ -41,7 +43,11 @@ public String instrumentedType() {
4143

4244
@Override
4345
public String[] helperClassNames() {
44-
return new String[] {packageName + ".KafkaConsumerInfo"};
46+
return new String[] {
47+
packageName + ".KafkaConsumerInfo",
48+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
49+
"datadog.trace.instrumentation.kafka_common.MetadataState",
50+
};
4551
}
4652

4753
@Override

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4141
@Override
4242
public Map<String, String> contextStore() {
4343
Map<String, String> contextStores = new HashMap<>(4);
44-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
44+
contextStores.put(
45+
"org.apache.kafka.clients.Metadata",
46+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4547
contextStores.put(
4648
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
4749
// new- here we are storing the callbackinvoker and consumerdelegate in the context store
@@ -77,6 +79,9 @@ public String[] helperClassNames() {
7779
packageName + ".KafkaConsumerInfo",
7880
packageName + ".KafkaConsumerInstrumentationHelper",
7981
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
82+
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
83+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
84+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8085
};
8186
}
8287

0 commit comments

Comments
 (0)