Skip to content

Commit a304870

Browse files
committed
wip
1 parent 35cee76 commit a304870

8 files changed

Lines changed: 303 additions & 190 deletions

File tree

dd-java-agent/instrumentation/google-pubsub-1.116/src/main/java/datadog/trace/instrumentation/googlepubsub/PublisherInstrumentation.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datadog.trace.instrumentation.googlepubsub;
22

33
import static datadog.context.propagation.Propagators.defaultPropagator;
4+
import static datadog.trace.agent.tooling.InstrumenterModule.TargetSystem.CONTEXT_TRACKING;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
56
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
67
import static datadog.trace.api.datastreams.DataStreamsTags.create;
78
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
9+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
911
import static datadog.trace.instrumentation.googlepubsub.PubSubDecorator.PRODUCER_DECORATE;
1012
import static datadog.trace.instrumentation.googlepubsub.PubSubDecorator.PUBSUB_PRODUCE;
@@ -14,6 +16,7 @@
1416
import com.google.cloud.pubsub.v1.Publisher;
1517
import com.google.pubsub.v1.PubsubMessage;
1618
import datadog.trace.agent.tooling.Instrumenter;
19+
import datadog.trace.agent.tooling.annotation.AppliesOn;
1720
import datadog.trace.api.datastreams.DataStreamsContext;
1821
import datadog.trace.api.datastreams.DataStreamsTags;
1922
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
@@ -30,25 +33,21 @@ public String instrumentedType() {
3033

3134
@Override
3235
public void methodAdvice(MethodTransformer transformer) {
33-
transformer.applyAdvice(isMethod().and(named("publish")), getClass().getName() + "$Wrap");
36+
transformer.applyAdvices(
37+
isMethod().and(named("publish")),
38+
getClass().getName() + "$Wrap",
39+
getClass().getName() + "$ContextPropagationAdvice");
3440
}
3541

3642
public static final class Wrap {
3743
@Advice.OnMethodEnter(suppress = Throwable.class)
38-
public static AgentScope before(
39-
@Advice.Argument(value = 0, readOnly = false) PubsubMessage msg,
40-
@Advice.This Publisher publisher) {
44+
public static AgentScope before(@Advice.This Publisher publisher) {
4145
final AgentSpan span = startSpan(PUBSUB_PRODUCE);
4246

4347
final CharSequence topicName = PRODUCER_DECORATE.extractTopic(publisher.getTopicNameString());
4448
PRODUCER_DECORATE.afterStart(span);
4549
PRODUCER_DECORATE.onProduce(span, topicName);
4650

47-
DataStreamsTags tags = create("google-pubsub", OUTBOUND, topicName.toString());
48-
PubsubMessage.Builder builder = msg.toBuilder();
49-
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
50-
defaultPropagator().inject(span.with(dsmContext), builder, SETTER);
51-
msg = builder.build();
5251
return activateSpan(span);
5352
}
5453

@@ -61,4 +60,24 @@ public static void stopSpan(
6160
scope.close();
6261
}
6362
}
63+
64+
@AppliesOn(CONTEXT_TRACKING)
65+
public static final class ContextPropagationAdvice {
66+
@Advice.OnMethodEnter(suppress = Throwable.class)
67+
public static void onEnter(
68+
@Advice.Argument(value = 0, readOnly = false) PubsubMessage msg,
69+
@Advice.This Publisher publisher) {
70+
AgentSpan span = activeSpan();
71+
if (span == null) return;
72+
DataStreamsTags tags =
73+
create(
74+
"google-pubsub",
75+
OUTBOUND,
76+
PRODUCER_DECORATE.extractTopic(publisher.getTopicNameString()).toString());
77+
PubsubMessage.Builder builder = msg.toBuilder();
78+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
79+
defaultPropagator().inject(span.with(dsmContext), builder, SETTER);
80+
msg = builder.build();
81+
}
82+
}
6483
}

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/MDBMessageConsumerInstrumentation.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datadog.trace.instrumentation.jms;
22

3+
import static datadog.context.Context.root;
4+
import static datadog.context.propagation.Propagators.defaultPropagator;
5+
import static datadog.trace.agent.tooling.InstrumenterModule.TargetSystem.CONTEXT_TRACKING;
36
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresAnnotation;
47
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType;
58
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
69
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
7-
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
810
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
911
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
1012
import static datadog.trace.instrumentation.jms.JMSDecorator.CONSUMER_DECORATE;
@@ -16,11 +18,12 @@
1618
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1719
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1820

21+
import datadog.context.ContextScope;
1922
import datadog.trace.agent.tooling.Instrumenter;
23+
import datadog.trace.agent.tooling.annotation.AppliesOn;
2024
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
2125
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2226
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
23-
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2427
import javax.jms.Destination;
2528
import javax.jms.JMSException;
2629
import javax.jms.Message;
@@ -52,23 +55,37 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
5255

5356
@Override
5457
public void methodAdvice(MethodTransformer transformer) {
55-
transformer.applyAdvice(
58+
transformer.applyAdvices(
5659
isMethod()
5760
.and(isPublic())
5861
.and(named("onMessage"))
5962
.and(takesArguments(1))
6063
.and(takesArgument(0, (named(namespace + ".jms.Message")))),
64+
getClass().getName() + "$ContextPropagationAdvice",
6165
getClass().getName() + "$MDBAdvice");
6266
}
6367

68+
@AppliesOn(CONTEXT_TRACKING)
69+
public static class ContextPropagationAdvice {
70+
@Advice.OnMethodEnter(suppress = Throwable.class)
71+
public static void onEnter(
72+
@Advice.Argument(0) final Message message, @Advice.Local("ctxScope") ContextScope scope) {
73+
scope = defaultPropagator().extract(root(), message, GETTER).attach();
74+
}
75+
76+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
77+
public static void onExit(@Advice.Local("ctxScope") ContextScope scope) {
78+
if (scope != null) scope.close();
79+
}
80+
}
81+
6482
public static class MDBAdvice {
6583
@Advice.OnMethodEnter(suppress = Throwable.class)
6684
public static AgentScope methodEnter(@Advice.Argument(0) final Message message) {
6785
if (CallDepthThreadLocalMap.incrementCallDepth(MessageListener.class) > 0) {
6886
return null;
6987
}
70-
AgentSpanContext propagatedContext = extractContextAndGetSpanContext(message, GETTER);
71-
AgentSpan span = startSpan(JMS_CONSUME, propagatedContext);
88+
AgentSpan span = startSpan(JMS_CONSUME);
7289
CONSUMER_DECORATE.afterStart(span);
7390
CharSequence consumerResourceName;
7491
try {

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

33
import static datadog.context.propagation.Propagators.defaultPropagator;
4+
import static datadog.trace.agent.tooling.InstrumenterModule.TargetSystem.CONTEXT_TRACKING;
45
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
56
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
67
import static datadog.trace.api.datastreams.DataStreamsContext.fromTagsWithoutCheckpoint;
@@ -28,6 +29,7 @@
2829
import datadog.context.propagation.Propagators;
2930
import datadog.trace.agent.tooling.Instrumenter;
3031
import datadog.trace.agent.tooling.InstrumenterModule;
32+
import datadog.trace.agent.tooling.annotation.AppliesOn;
3133
import datadog.trace.api.Config;
3234
import datadog.trace.api.datastreams.DataStreamsContext;
3335
import datadog.trace.api.datastreams.DataStreamsTags;
@@ -97,13 +99,14 @@ public Map<String, String> contextStore() {
9799

98100
@Override
99101
public void methodAdvice(MethodTransformer transformer) {
100-
transformer.applyAdvice(
102+
transformer.applyAdvices(
101103
isMethod()
102104
.and(isPublic())
103105
.and(named("send"))
104106
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
105107
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
106-
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");
108+
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice",
109+
KafkaProducerInstrumentation.class.getName() + "$ContextPropagationAdvice");
107110

108111
transformer.applyAdvice(
109112
isMethod()
@@ -118,7 +121,6 @@ public static class ProducerAdvice {
118121

119122
@Advice.OnMethodEnter(suppress = Throwable.class)
120123
public static AgentScope onEnter(
121-
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
122124
@Advice.FieldValue("producerConfig") ProducerConfig producerConfig,
123125
@Advice.FieldValue("sender") Sender sender,
124126
@Advice.FieldValue("metadata") Metadata metadata,
@@ -156,6 +158,32 @@ public static AgentScope onEnter(
156158
span.setTag(InstrumentationTags.TOMBSTONE, true);
157159
}
158160

161+
return activateSpan(span);
162+
}
163+
164+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
165+
public static void stopSpan(
166+
@Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
167+
// Clear cluster ID from Schema Registry instrumentation
168+
ClusterIdHolder.clear();
169+
170+
PRODUCER_DECORATE.onError(scope, throwable);
171+
PRODUCER_DECORATE.beforeFinish(scope);
172+
scope.close();
173+
}
174+
}
175+
176+
@AppliesOn(CONTEXT_TRACKING)
177+
public static class ContextPropagationAdvice {
178+
179+
@Advice.OnMethodEnter(suppress = Throwable.class)
180+
public static void onEnter(
181+
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
182+
@Advice.FieldValue("metadata") Metadata metadata,
183+
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record) {
184+
AgentSpan span = activeSpan();
185+
if (span == null) return;
186+
String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata);
159187
TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER;
160188
// Do not inject headers for batch versions below 2
161189
// This is how similar check is being done in Kafka client itself:
@@ -205,26 +233,13 @@ record =
205233
if (TIME_IN_QUEUE_ENABLED) {
206234
setter.injectTimeInQueue(record.headers());
207235
}
208-
209236
AgentTracer.get()
210237
.getDataStreamsMonitoring()
211238
.trackTransaction(
212239
span,
213240
DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS,
214241
record.headers(),
215242
DSM_TRANSACTION_SOURCE_READER);
216-
return activateSpan(span);
217-
}
218-
219-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
220-
public static void stopSpan(
221-
@Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
222-
// Clear cluster ID from Schema Registry instrumentation
223-
ClusterIdHolder.clear();
224-
225-
PRODUCER_DECORATE.onError(scope, throwable);
226-
PRODUCER_DECORATE.beforeFinish(scope);
227-
scope.close();
228243
}
229244
}
230245

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public String[] helperClassNames() {
4545
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
4646
"datadog.trace.instrumentation.kafka_common.Utils",
4747
packageName + ".AvroSchemaExtractor",
48+
packageName + ".ProducerContextPropagationAdvice",
4849
};
4950
}
5051

@@ -55,13 +56,14 @@ public Map<String, String> contextStore() {
5556

5657
@Override
5758
public void methodAdvice(MethodTransformer transformer) {
58-
transformer.applyAdvice(
59+
transformer.applyAdvices(
5960
isMethod()
6061
.and(isPublic())
6162
.and(named("send"))
6263
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
6364
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
64-
packageName + ".ProducerAdvice");
65+
packageName + ".ProducerAdvice",
66+
packageName + ".ProducerContextPropagationAdvice");
6567

6668
transformer.applyAdvice(
6769
isMethod()

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3-
import static datadog.context.propagation.Propagators.defaultPropagator;
4-
import static datadog.trace.api.datastreams.DataStreamsContext.fromTagsWithoutCheckpoint;
5-
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
6-
import static datadog.trace.api.datastreams.DataStreamsTags.create;
7-
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
83
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
94
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
105
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
116
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
127
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_PRODUCE;
138
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.PRODUCER_DECORATE;
14-
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
15-
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
169

17-
import datadog.context.propagation.Propagator;
18-
import datadog.context.propagation.Propagators;
19-
import datadog.trace.api.Config;
20-
import datadog.trace.api.datastreams.DataStreamsContext;
21-
import datadog.trace.api.datastreams.DataStreamsTags;
2210
import datadog.trace.bootstrap.InstrumentationContext;
2311
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2412
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -73,56 +61,6 @@ public static AgentScope onEnter(
7361
span.setTag(InstrumentationTags.TOMBSTONE, true);
7462
}
7563

76-
TextMapInjectAdapterInterface setter = NoopTextMapInjectAdapter.NOOP_SETTER;
77-
// Do not inject headers for batch versions below 2
78-
// This is how similar check is being done in Kafka client itself:
79-
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
80-
// Also, do not inject headers if specified by JVM option or environment variable
81-
// This can help in mixed client environments where clients < 0.11 that do not support
82-
// headers attempt to read messages that were produced by clients > 0.11 and the magic
83-
// value of the broker(s) is >= 2
84-
85-
// Please note that the minimum magic for kafka 3.8+ is 2 so there is no need to check this
86-
if (Config.get().isKafkaClientPropagationEnabled()
87-
&& !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) {
88-
setter = TextMapInjectAdapter.SETTER;
89-
}
90-
DataStreamsTags tags = create("kafka", OUTBOUND, record.topic(), null, clusterId);
91-
try {
92-
defaultPropagator().inject(span, record.headers(), setter);
93-
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
94-
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
95-
// inject the context in the headers, but delay sending the stats until we know the
96-
// message size.
97-
// The stats are saved in the pathway context and sent in PayloadSizeAdvice.
98-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
99-
DataStreamsContext dsmContext = fromTagsWithoutCheckpoint(tags);
100-
dsmPropagator.inject(span.with(dsmContext), record.headers(), setter);
101-
AvroSchemaExtractor.tryExtractProducer(record, span);
102-
}
103-
} catch (final IllegalStateException e) {
104-
// headers must be read-only from reused record. try again with new one.
105-
record =
106-
new ProducerRecord<>(
107-
record.topic(),
108-
record.partition(),
109-
record.timestamp(),
110-
record.key(),
111-
record.value(),
112-
record.headers());
113-
114-
defaultPropagator().inject(span, record.headers(), setter);
115-
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())
116-
|| STREAMING_CONTEXT.isSinkTopic(record.topic())) {
117-
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
118-
DataStreamsContext dsmContext = fromTagsWithoutCheckpoint(tags);
119-
dsmPropagator.inject(span.with(dsmContext), record.headers(), setter);
120-
AvroSchemaExtractor.tryExtractProducer(record, span);
121-
}
122-
}
123-
if (TIME_IN_QUEUE_ENABLED) {
124-
setter.injectTimeInQueue(record.headers());
125-
}
12664
return activateSpan(span);
12765
}
12866

0 commit comments

Comments
 (0)