Skip to content

Commit fd826ee

Browse files
committed
update integration
1 parent d92d8cd commit fd826ee

28 files changed

+226
-81
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/ConsumerCoordinatorInstrumentation.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import datadog.trace.api.datastreams.DataStreamsTags;
1111
import datadog.trace.bootstrap.InstrumentationContext;
1212
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
13+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1314
import java.util.HashMap;
1415
import java.util.Map;
1516
import net.bytebuddy.asm.Advice;
@@ -42,7 +43,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4243
@Override
4344
public Map<String, String> contextStore() {
4445
Map<String, String> contextStores = new HashMap<>();
45-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
46+
contextStores.put("org.apache.kafka.clients.Metadata",
47+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4648
contextStores.put(
4749
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
4850
KafkaConsumerInfo.class.getName());
@@ -56,7 +58,11 @@ public String instrumentedType() {
5658

5759
@Override
5860
public String[] helperClassNames() {
59-
return new String[] {packageName + ".KafkaConsumerInfo"};
61+
return new String[] {
62+
packageName + ".KafkaConsumerInfo",
63+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
64+
"datadog.trace.instrumentation.kafka_common.MetadataState",
65+
};
6066
}
6167

6268
@Override
@@ -90,7 +96,9 @@ public static void trackCommitOffset(
9096
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
9197
String clusterId = null;
9298
if (consumerMetadata != null) {
93-
clusterId = InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
99+
MetadataState metadataState =
100+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
101+
clusterId = metadataState != null ? metadataState.clusterId : null;
94102
}
95103

96104
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2525
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
2626
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
27+
import datadog.trace.instrumentation.kafka_common.MetadataState;
2728
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
@@ -61,7 +62,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
6162
@Override
6263
public Map<String, String> contextStore() {
6364
Map<String, String> contextStores = new HashMap<>();
64-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
65+
contextStores.put("org.apache.kafka.clients.Metadata",
66+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6567
contextStores.put(
6668
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
6769
contextStores.put(
@@ -84,7 +86,8 @@ public String[] helperClassNames() {
8486
packageName + ".KafkaConsumerInfo",
8587
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
8688
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
87-
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper$PendingConfig",
89+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
90+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8891
};
8992
}
9093

@@ -153,8 +156,14 @@ public static void captureGroup(
153156
}
154157

155158
if (Config.get().isDataStreamsEnabled()) {
159+
MetadataState state =
160+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
161+
if (state == null) {
162+
state = new MetadataState();
163+
InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state);
164+
}
156165
KafkaConfigHelper.storePendingConsumerConfig(
157-
metadata,
166+
state,
158167
normalizedConsumerGroup,
159168
KafkaConfigHelper.extractConsumerConfig(consumerConfig));
160169
}
@@ -203,8 +212,14 @@ public static void captureGroup(
203212
}
204213

205214
if (Config.get().isDataStreamsEnabled()) {
215+
MetadataState state =
216+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
217+
if (state == null) {
218+
state = new MetadataState();
219+
InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state);
220+
}
206221
KafkaConfigHelper.storePendingConsumerConfig(
207-
metadata,
222+
state,
208223
normalizedConsumerGroup,
209224
KafkaConfigHelper.extractConsumerConfigFromMap(consumerConfig));
210225
}
@@ -231,8 +246,9 @@ public static AgentScope onEnter(@Advice.This KafkaConsumer consumer) {
231246
if (kafkaConsumerInfo != null && Config.get().isDataStreamsEnabled()) {
232247
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
233248
if (consumerMetadata != null) {
234-
String clusterId =
235-
InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata);
249+
MetadataState metadataState =
250+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(consumerMetadata);
251+
String clusterId = metadataState != null ? metadataState.clusterId : null;
236252
if (clusterId != null) {
237253
ClusterIdHolder.set(clusterId);
238254
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import datadog.trace.agent.tooling.Instrumenter;
1616
import datadog.trace.agent.tooling.InstrumenterModule;
1717
import datadog.trace.bootstrap.InstrumentationContext;
18+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1819
import java.util.HashMap;
1920
import java.util.Iterator;
2021
import java.util.List;
@@ -46,7 +47,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4647
@Override
4748
public Map<String, String> contextStore() {
4849
Map<String, String> contextStores = new HashMap<>(2);
49-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
50+
contextStores.put("org.apache.kafka.clients.Metadata",
51+
"datadog.trace.instrumentation.kafka_common.MetadataState");
5052
contextStores.put(
5153
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
5254
return contextStores;
@@ -73,6 +75,8 @@ public String[] helperClassNames() {
7375
packageName + ".TextMapInjectAdapter",
7476
"datadog.trace.instrumentation.kafka_common.Utils",
7577
"datadog.trace.instrumentation.kafka_common.StreamingContext",
78+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
79+
"datadog.trace.instrumentation.kafka_common.MetadataState",
7680
};
7781
}
7882

@@ -113,7 +117,7 @@ public static void wrap(
113117
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
114118
String clusterId =
115119
KafkaConsumerInstrumentationHelper.extractClusterId(
116-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
120+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
117121
String bootstrapServers =
118122
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
119123
iterable =
@@ -135,7 +139,7 @@ public static void wrap(
135139
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
136140
String clusterId =
137141
KafkaConsumerInstrumentationHelper.extractClusterId(
138-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
142+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
139143
String bootstrapServers =
140144
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
141145
iterable =
@@ -157,7 +161,7 @@ public static void wrap(
157161
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
158162
String clusterId =
159163
KafkaConsumerInstrumentationHelper.extractClusterId(
160-
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
164+
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, MetadataState.class));
161165
String bootstrapServers =
162166
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
163167
iterator =

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentationHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datadog.trace.api.Config;
44
import datadog.trace.bootstrap.ContextStore;
5+
import datadog.trace.instrumentation.kafka_common.MetadataState;
56
import org.apache.kafka.clients.Metadata;
67

78
public class KafkaConsumerInstrumentationHelper {
@@ -13,11 +14,13 @@ public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
1314
}
1415

1516
public static String extractClusterId(
16-
KafkaConsumerInfo kafkaConsumerInfo, ContextStore<Metadata, String> metadataContextStore) {
17+
KafkaConsumerInfo kafkaConsumerInfo,
18+
ContextStore<Metadata, MetadataState> metadataContextStore) {
1719
if (Config.get().isDataStreamsEnabled() && kafkaConsumerInfo != null) {
1820
Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata();
1921
if (consumerMetadata != null) {
20-
return metadataContextStore.get(consumerMetadata);
22+
MetadataState state = metadataContextStore.get(consumerMetadata);
23+
return state != null ? state.clusterId : null;
2124
}
2225
}
2326
return null;

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
4343
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
4444
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
45+
import datadog.trace.instrumentation.kafka_common.MetadataState;
4546
import java.util.Map;
4647
import net.bytebuddy.asm.Advice;
4748
import net.bytebuddy.matcher.ElementMatcher;
@@ -89,14 +90,16 @@ public String[] helperClassNames() {
8990
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
9091
"datadog.trace.instrumentation.kafka_common.Utils",
9192
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
92-
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper$PendingConfig",
93+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
94+
"MetadataState",
9395
packageName + ".AvroSchemaExtractor",
9496
};
9597
}
9698

9799
@Override
98100
public Map<String, String> contextStore() {
99-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
101+
return singletonMap("org.apache.kafka.clients.Metadata",
102+
"MetadataState");
100103
}
101104

102105
@Override
@@ -135,7 +138,11 @@ public static AgentScope onEnter(
135138
@Advice.FieldValue("metadata") Metadata metadata,
136139
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
137140
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
138-
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
141+
MetadataState metadataState =
142+
InstrumentationContext.get(
143+
Metadata.class, MetadataState.class)
144+
.get(metadata);
145+
String clusterId = metadataState != null ? metadataState.clusterId : null;
139146

140147
// Set cluster ID for Schema Registry instrumentation
141148
if (clusterId != null) {
@@ -245,8 +252,19 @@ public static void captureConfiguration(
245252
@Advice.FieldValue("metadata") Metadata metadata,
246253
@Advice.Argument(0) ProducerConfig producerConfig) {
247254
if (Config.get().isDataStreamsEnabled()) {
255+
MetadataState state =
256+
InstrumentationContext.get(
257+
Metadata.class,
258+
MetadataState.class)
259+
.get(metadata);
260+
if (state == null) {
261+
state = new MetadataState();
262+
InstrumentationContext.get(
263+
Metadata.class, MetadataState.class)
264+
.put(metadata, state);
265+
}
248266
KafkaConfigHelper.storePendingProducerConfig(
249-
metadata, KafkaConfigHelper.extractProducerConfig(producerConfig));
267+
state, KafkaConfigHelper.extractProducerConfig(producerConfig));
250268
}
251269
}
252270
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/MetadataInstrumentation.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.trace.agent.tooling.InstrumenterModule;
1414
import datadog.trace.bootstrap.InstrumentationContext;
1515
import datadog.trace.instrumentation.kafka_common.KafkaConfigHelper;
16+
import datadog.trace.instrumentation.kafka_common.MetadataState;
1617
import java.util.Map;
1718
import net.bytebuddy.asm.Advice;
1819
import net.bytebuddy.description.type.TypeDescription;
@@ -55,13 +56,15 @@ public String[] helperClassNames() {
5556
return new String[] {
5657
packageName + ".KafkaDecorator",
5758
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
58-
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper$PendingConfig",
59+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
60+
"datadog.trace.instrumentation.kafka_common.MetadataState",
5961
};
6062
}
6163

6264
@Override
6365
public Map<String, String> contextStore() {
64-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
66+
return singletonMap("org.apache.kafka.clients.Metadata",
67+
"datadog.trace.instrumentation.kafka_common.MetadataState");
6568
}
6669

6770
@Override
@@ -84,8 +87,14 @@ public static void onEnter(
8487
@Advice.This final Metadata metadata, @Advice.Argument(0) final Cluster newCluster) {
8588
if (newCluster != null && !newCluster.isBootstrapConfigured()) {
8689
String clusterId = newCluster.clusterResource().clusterId();
87-
InstrumentationContext.get(Metadata.class, String.class).put(metadata, clusterId);
88-
KafkaConfigHelper.reportPendingConfig(metadata, clusterId);
90+
MetadataState state =
91+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
92+
if (state == null) {
93+
state = new MetadataState();
94+
InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state);
95+
}
96+
state.clusterId = clusterId;
97+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
8998
}
9099
}
91100

@@ -102,8 +111,14 @@ public static void onEnter(
102111
@Advice.This final Metadata metadata, @Advice.Argument(1) final MetadataResponse response) {
103112
if (response != null) {
104113
String clusterId = response.clusterId();
105-
InstrumentationContext.get(Metadata.class, String.class).put(metadata, clusterId);
106-
KafkaConfigHelper.reportPendingConfig(metadata, clusterId);
114+
MetadataState state =
115+
InstrumentationContext.get(Metadata.class, MetadataState.class).get(metadata);
116+
if (state == null) {
117+
state = new MetadataState();
118+
InstrumentationContext.get(Metadata.class, MetadataState.class).put(metadata, state);
119+
}
120+
state.clusterId = clusterId;
121+
KafkaConfigHelper.reportPendingConfig(state, clusterId);
107122
}
108123
}
109124

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorInstrumentation.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727
@Override
2828
public Map<String, String> contextStore() {
2929
Map<String, String> contextStores = new HashMap<>(2);
30-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
30+
contextStores.put("org.apache.kafka.clients.Metadata",
31+
"datadog.trace.instrumentation.kafka_common.MetadataState");
3132
contextStores.put(
3233
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
3334
KafkaConsumerInfo.class.getName());
@@ -41,7 +42,11 @@ public String instrumentedType() {
4142

4243
@Override
4344
public String[] helperClassNames() {
44-
return new String[] {packageName + ".KafkaConsumerInfo"};
45+
return new String[] {
46+
packageName + ".KafkaConsumerInfo",
47+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
48+
"datadog.trace.instrumentation.kafka_common.MetadataState",
49+
};
4550
}
4651

4752
@Override

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
4141
@Override
4242
public Map<String, String> contextStore() {
4343
Map<String, String> contextStores = new HashMap<>(4);
44-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
44+
contextStores.put("org.apache.kafka.clients.Metadata",
45+
"datadog.trace.instrumentation.kafka_common.MetadataState");
4546
contextStores.put(
4647
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
4748
// new- here we are storing the callbackinvoker and consumerdelegate in the context store
@@ -78,7 +79,8 @@ public String[] helperClassNames() {
7879
packageName + ".KafkaConsumerInstrumentationHelper",
7980
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
8081
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
81-
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper$PendingConfig",
82+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
83+
"datadog.trace.instrumentation.kafka_common.MetadataState",
8284
};
8385
}
8486

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
3434
@Override
3535
public Map<String, String> contextStore() {
3636
Map<String, String> contextStores = new HashMap<>(2);
37-
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
37+
contextStores.put("org.apache.kafka.clients.Metadata",
38+
"datadog.trace.instrumentation.kafka_common.MetadataState");
3839
contextStores.put(
3940
"org.apache.kafka.clients.consumer.ConsumerRecords",
4041
"datadog.trace.instrumentation.kafka_clients38.KafkaConsumerInfo");
@@ -62,6 +63,8 @@ public String[] helperClassNames() {
6263
packageName + ".TextMapInjectAdapter",
6364
"datadog.trace.instrumentation.kafka_common.Utils",
6465
"datadog.trace.instrumentation.kafka_common.StreamingContext",
66+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
67+
"datadog.trace.instrumentation.kafka_common.MetadataState",
6568
};
6669
}
6770

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ public String[] helperClassNames() {
4646
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
4747
"datadog.trace.instrumentation.kafka_common.Utils",
4848
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper",
49-
"datadog.trace.instrumentation.kafka_common.KafkaConfigHelper$PendingConfig",
49+
"datadog.trace.instrumentation.kafka_common.PendingConfig",
50+
"datadog.trace.instrumentation.kafka_common.MetadataState",
5051
packageName + ".AvroSchemaExtractor",
5152
};
5253
}
5354

5455
@Override
5556
public Map<String, String> contextStore() {
56-
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
57+
return singletonMap("org.apache.kafka.clients.Metadata",
58+
"datadog.trace.instrumentation.kafka_common.MetadataState");
5759
}
5860

5961
@Override

0 commit comments

Comments
 (0)