Skip to content

Commit 4047f1b

Browse files
authored
Merge pull request #275 from DataDog/tyler/fix-kafka-consume
Advice shouldn’t reference fields from non-injected classes
2 parents cb604a0 + a1a2a0d commit 4047f1b

3 files changed

Lines changed: 52 additions & 58 deletions

File tree

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Configurabl
3434
"datadog.trace.instrumentation.kafka_clients.TracingIterable$TracingIterator",
3535
"datadog.trace.instrumentation.kafka_clients.TracingIterable$SpanBuilderDecorator",
3636
"datadog.trace.instrumentation.kafka_clients.KafkaConsumerInstrumentation$ConsumeScopeAction");
37-
public static final ConsumeScopeAction CONSUME_ACTION = new ConsumeScopeAction();
38-
39-
private static final String OPERATION = "kafka.consume";
40-
private static final String COMPONENT_NAME = "java-kafka";
4137

4238
public KafkaConsumerInstrumentation() {
4339
super("kafka");
@@ -75,20 +71,23 @@ public static class IterableAdvice {
7571

7672
@Advice.OnMethodExit(suppress = Throwable.class)
7773
public static void wrap(@Advice.Return(readOnly = false) Iterable<ConsumerRecord> iterable) {
78-
iterable = new TracingIterable(iterable, OPERATION, CONSUME_ACTION);
74+
iterable = new TracingIterable(iterable, "kafka.consume", ConsumeScopeAction.INSTANCE);
7975
}
8076
}
8177

8278
public static class IteratorAdvice {
8379

8480
@Advice.OnMethodExit(suppress = Throwable.class)
8581
public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) {
86-
iterator = new TracingIterable.TracingIterator(iterator, OPERATION, CONSUME_ACTION);
82+
iterator =
83+
new TracingIterable.TracingIterator(
84+
iterator, "kafka.consume", ConsumeScopeAction.INSTANCE);
8785
}
8886
}
8987

9088
public static class ConsumeScopeAction
9189
implements TracingIterable.SpanBuilderDecorator<ConsumerRecord> {
90+
public static final ConsumeScopeAction INSTANCE = new ConsumeScopeAction();
9291

9392
@Override
9493
public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord record) {
@@ -101,7 +100,7 @@ public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord
101100
.withTag(DDTags.SERVICE_NAME, "kafka")
102101
.withTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic)
103102
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
104-
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
103+
.withTag(Tags.COMPONENT.getKey(), "java-kafka")
105104
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
106105
.withTag("partition", record.partition())
107106
.withTag("offset", record.offset());

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ public class KafkaStreamsProcessorInstrumentation {
3434
public static final HelperInjector HELPER_INJECTOR =
3535
new HelperInjector("datadog.trace.instrumentation.kafka_streams.TextMapExtractAdapter");
3636

37-
private static final String OPERATION = "kafka.consume";
38-
private static final String COMPONENT_NAME = "java-kafka";
39-
4037
@AutoService(Instrumenter.class)
4138
public static class StartInstrumentation extends Instrumenter.Configurable {
4239

@@ -80,12 +77,12 @@ public static void startSpan(@Advice.Return final StampedRecord record) {
8077
Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers()));
8178

8279
GlobalTracer.get()
83-
.buildSpan(OPERATION)
80+
.buildSpan("kafka.consume")
8481
.asChildOf(extractedContext)
8582
.withTag(DDTags.SERVICE_NAME, "kafka")
8683
.withTag(DDTags.RESOURCE_NAME, "Consume Topic " + record.topic())
8784
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
88-
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
85+
.withTag(Tags.COMPONENT.getKey(), "java-kafka")
8986
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
9087
.withTag("partition", record.partition())
9188
.withTag("offset", record.offset())

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,56 +17,54 @@
1717
import org.apache.kafka.common.record.TimestampType;
1818

1919
// This is necessary because SourceNodeRecordDeserializer drops the headers. :-(
20-
public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation {
20+
@AutoService(Instrumenter.class)
21+
public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation
22+
extends Instrumenter.Configurable {
2123

22-
@AutoService(Instrumenter.class)
23-
public static class StartInstrumentation extends Instrumenter.Configurable {
24-
25-
public StartInstrumentation() {
26-
super("kafka", "kafka-streams");
27-
}
24+
public KafkaStreamsSourceNodeRecordDeserializerInstrumentation() {
25+
super("kafka", "kafka-streams");
26+
}
2827

29-
@Override
30-
public AgentBuilder apply(final AgentBuilder agentBuilder) {
31-
return agentBuilder
32-
.type(
33-
named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"),
34-
classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators"))
35-
.transform(DDTransformers.defaultTransformers())
36-
.transform(
37-
DDAdvice.create()
38-
.advice(
39-
isMethod()
40-
.and(isPublic())
41-
.and(named("deserialize"))
42-
.and(
43-
takesArgument(
44-
0, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
45-
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
46-
SaveHeadersAdvice.class.getName()))
47-
.asDecorator();
48-
}
28+
@Override
29+
public AgentBuilder apply(final AgentBuilder agentBuilder) {
30+
return agentBuilder
31+
.type(
32+
named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"),
33+
classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators"))
34+
.transform(DDTransformers.defaultTransformers())
35+
.transform(
36+
DDAdvice.create()
37+
.advice(
38+
isMethod()
39+
.and(isPublic())
40+
.and(named("deserialize"))
41+
.and(
42+
takesArgument(
43+
0, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
44+
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
45+
SaveHeadersAdvice.class.getName()))
46+
.asDecorator();
47+
}
4948

50-
public static class SaveHeadersAdvice {
49+
public static class SaveHeadersAdvice {
5150

52-
@Advice.OnMethodExit(suppress = Throwable.class)
53-
public static void saveHeaders(
54-
@Advice.Argument(0) final ConsumerRecord incoming,
55-
@Advice.Return(readOnly = false) ConsumerRecord result) {
56-
result =
57-
new ConsumerRecord<>(
58-
result.topic(),
59-
result.partition(),
60-
result.offset(),
61-
result.timestamp(),
62-
TimestampType.CREATE_TIME,
63-
result.checksum(),
64-
result.serializedKeySize(),
65-
result.serializedValueSize(),
66-
result.key(),
67-
result.value(),
68-
incoming.headers());
69-
}
51+
@Advice.OnMethodExit(suppress = Throwable.class)
52+
public static void saveHeaders(
53+
@Advice.Argument(0) final ConsumerRecord incoming,
54+
@Advice.Return(readOnly = false) ConsumerRecord result) {
55+
result =
56+
new ConsumerRecord<>(
57+
result.topic(),
58+
result.partition(),
59+
result.offset(),
60+
result.timestamp(),
61+
TimestampType.CREATE_TIME,
62+
result.checksum(),
63+
result.serializedKeySize(),
64+
result.serializedValueSize(),
65+
result.key(),
66+
result.value(),
67+
incoming.headers());
7068
}
7169
}
7270
}

0 commit comments

Comments
 (0)