Skip to content

Commit 9816eca

Browse files
authored
Merge pull request #209 from DataDog/tyler/kafka
Kafka and Kafka Streams instrumentation
2 parents a2010ef + 4314f71 commit 9816eca

15 files changed

Lines changed: 1038 additions & 2 deletions

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
apply plugin: 'version-scan'
2+
3+
versionScan {
4+
group = "org.apache.kafka"
5+
module = "kafka-clients"
6+
versions = "[0.11.0.0,)"
7+
verifyPresent = [
8+
'org.apache.kafka.common.header.Header' : null,
9+
'org.apache.kafka.common.header.Headers': null,
10+
]
11+
}
12+
13+
apply from: "${rootDir}/gradle/java.gradle"
14+
15+
dependencies {
16+
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
17+
18+
compile project(':dd-trace-ot')
19+
compile project(':dd-java-agent:tooling')
20+
21+
compile deps.bytebuddy
22+
compile deps.opentracing
23+
24+
testCompile project(':dd-java-agent:testing')
25+
testCompile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
26+
testCompile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE'
27+
testCompile group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE'
28+
testCompile group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.0'
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
6+
import static net.bytebuddy.matcher.ElementMatchers.named;
7+
import static net.bytebuddy.matcher.ElementMatchers.returns;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
10+
11+
import com.google.auto.service.AutoService;
12+
import datadog.trace.agent.tooling.DDAdvice;
13+
import datadog.trace.agent.tooling.HelperInjector;
14+
import datadog.trace.agent.tooling.Instrumenter;
15+
import datadog.trace.api.DDSpanTypes;
16+
import datadog.trace.api.DDTags;
17+
import io.opentracing.SpanContext;
18+
import io.opentracing.Tracer;
19+
import io.opentracing.propagation.Format;
20+
import io.opentracing.tag.Tags;
21+
import io.opentracing.util.GlobalTracer;
22+
import java.util.Iterator;
23+
import net.bytebuddy.agent.builder.AgentBuilder;
24+
import net.bytebuddy.asm.Advice;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
27+
@AutoService(Instrumenter.class)
28+
public final class KafkaConsumerInstrumentation implements Instrumenter {
29+
public static final HelperInjector HELPER_INJECTOR =
30+
new HelperInjector(
31+
"datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter",
32+
"datadog.trace.instrumentation.kafka_clients.TracingIterable",
33+
"datadog.trace.instrumentation.kafka_clients.TracingIterable$TracingIterator",
34+
"datadog.trace.instrumentation.kafka_clients.TracingIterable$SpanBuilderDecorator",
35+
"datadog.trace.instrumentation.kafka_clients.KafkaConsumerInstrumentation$ConsumeScopeAction");
36+
public static final ConsumeScopeAction CONSUME_ACTION = new ConsumeScopeAction();
37+
38+
private static final String OPERATION = "kafka.consume";
39+
private static final String COMPONENT_NAME = "java-kafka";
40+
41+
@Override
42+
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
43+
return agentBuilder
44+
.type(
45+
named("org.apache.kafka.clients.consumer.ConsumerRecords"),
46+
classLoaderHasClasses(
47+
"org.apache.kafka.common.header.Header", "org.apache.kafka.common.header.Headers"))
48+
.transform(HELPER_INJECTOR)
49+
.transform(
50+
DDAdvice.create()
51+
.advice(
52+
isMethod()
53+
.and(isPublic())
54+
.and(named("records"))
55+
.and(takesArgument(0, String.class))
56+
.and(returns(Iterable.class)),
57+
IterableAdvice.class.getName())
58+
.advice(
59+
isMethod()
60+
.and(isPublic())
61+
.and(named("iterator"))
62+
.and(takesArguments(0))
63+
.and(returns(Iterator.class)),
64+
IteratorAdvice.class.getName()))
65+
.asDecorator();
66+
}
67+
68+
public static class IterableAdvice {
69+
70+
@Advice.OnMethodExit(suppress = Throwable.class)
71+
public static void wrap(@Advice.Return(readOnly = false) Iterable<ConsumerRecord> iterable) {
72+
iterable = new TracingIterable(iterable, OPERATION, CONSUME_ACTION);
73+
}
74+
}
75+
76+
public static class IteratorAdvice {
77+
78+
@Advice.OnMethodExit(suppress = Throwable.class)
79+
public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) {
80+
iterator = new TracingIterable.TracingIterator(iterator, OPERATION, CONSUME_ACTION);
81+
}
82+
}
83+
84+
public static class ConsumeScopeAction
85+
implements TracingIterable.SpanBuilderDecorator<ConsumerRecord> {
86+
87+
@Override
88+
public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord record) {
89+
final String topic = record.topic() == null ? "unknown" : record.topic();
90+
final SpanContext spanContext =
91+
GlobalTracer.get()
92+
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.headers()));
93+
spanBuilder
94+
.asChildOf(spanContext)
95+
.withTag(DDTags.SERVICE_NAME, "kafka")
96+
.withTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic)
97+
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
98+
.withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME)
99+
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
100+
.withTag("partition", record.partition())
101+
.withTag("offset", record.offset());
102+
}
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
6+
import static net.bytebuddy.matcher.ElementMatchers.named;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.DDAdvice;
11+
import datadog.trace.agent.tooling.HelperInjector;
12+
import datadog.trace.agent.tooling.Instrumenter;
13+
import datadog.trace.api.DDSpanTypes;
14+
import datadog.trace.api.DDTags;
15+
import io.opentracing.Scope;
16+
import io.opentracing.Span;
17+
import io.opentracing.propagation.Format;
18+
import io.opentracing.tag.Tags;
19+
import io.opentracing.util.GlobalTracer;
20+
import java.util.Collections;
21+
import net.bytebuddy.agent.builder.AgentBuilder;
22+
import net.bytebuddy.asm.Advice;
23+
import org.apache.kafka.clients.producer.Callback;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.clients.producer.RecordMetadata;
26+
27+
@AutoService(Instrumenter.class)
28+
public final class KafkaProducerInstrumentation implements Instrumenter {
29+
public static final HelperInjector HELPER_INJECTOR =
30+
new HelperInjector(
31+
"datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter",
32+
KafkaProducerInstrumentation.class.getName() + "$ProducerCallback");
33+
34+
private static final String OPERATION = "kafka.produce";
35+
private static final String COMPONENT_NAME = "java-kafka";
36+
37+
@Override
38+
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
39+
return agentBuilder
40+
.type(
41+
named("org.apache.kafka.clients.producer.KafkaProducer"),
42+
classLoaderHasClasses(
43+
"org.apache.kafka.common.header.Header", "org.apache.kafka.common.header.Headers"))
44+
.transform(HELPER_INJECTOR)
45+
.transform(
46+
DDAdvice.create()
47+
.advice(
48+
isMethod()
49+
.and(isPublic())
50+
.and(named("send"))
51+
.and(
52+
takesArgument(
53+
0, named("org.apache.kafka.clients.producer.ProducerRecord")))
54+
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
55+
ProducerAdvice.class.getName()))
56+
.asDecorator();
57+
}
58+
59+
public static class ProducerAdvice {
60+
61+
@Advice.OnMethodEnter(suppress = Throwable.class)
62+
public static Scope startSpan(
63+
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
64+
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
65+
final Scope scope = GlobalTracer.get().buildSpan(OPERATION).startActive(false);
66+
callback = new ProducerCallback(callback, scope);
67+
68+
final Span span = scope.span();
69+
final String topic = record.topic() == null ? "unknown" : record.topic();
70+
if (record.partition() != null) {
71+
span.setTag("kafka.partition", record.partition());
72+
}
73+
74+
Tags.COMPONENT.set(span, COMPONENT_NAME);
75+
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_PRODUCER);
76+
77+
span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic);
78+
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER);
79+
span.setTag(DDTags.SERVICE_NAME, "kafka");
80+
81+
try {
82+
GlobalTracer.get()
83+
.inject(
84+
scope.span().context(),
85+
Format.Builtin.TEXT_MAP,
86+
new TextMapInjectAdapter(record.headers()));
87+
} catch (final IllegalStateException e) {
88+
//headers must be read-only from reused record. try again with new one.
89+
record =
90+
new ProducerRecord<>(
91+
record.topic(),
92+
record.partition(),
93+
record.timestamp(),
94+
record.key(),
95+
record.value(),
96+
record.headers());
97+
98+
GlobalTracer.get()
99+
.inject(
100+
scope.span().context(),
101+
Format.Builtin.TEXT_MAP,
102+
new TextMapInjectAdapter(record.headers()));
103+
}
104+
105+
return scope;
106+
}
107+
108+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
109+
public static void stopSpan(
110+
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
111+
if (throwable != null) {
112+
final Span span = scope.span();
113+
Tags.ERROR.set(span, true);
114+
span.log(Collections.singletonMap("error.object", throwable));
115+
span.finish();
116+
}
117+
scope.close();
118+
}
119+
}
120+
121+
public static class ProducerCallback implements Callback {
122+
private final Callback callback;
123+
private final Scope scope;
124+
125+
public ProducerCallback(final Callback callback, final Scope scope) {
126+
this.callback = callback;
127+
this.scope = scope;
128+
}
129+
130+
@Override
131+
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
132+
if (exception != null) {
133+
Tags.ERROR.set(scope.span(), Boolean.TRUE);
134+
scope.span().log(Collections.singletonMap("error.object", exception));
135+
}
136+
try {
137+
if (callback != null) {
138+
callback.onCompletion(metadata, exception);
139+
}
140+
} finally {
141+
scope.span().finish();
142+
scope.close();
143+
}
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import io.opentracing.propagation.TextMap;
4+
import java.nio.charset.StandardCharsets;
5+
import java.util.HashMap;
6+
import java.util.Iterator;
7+
import java.util.Map;
8+
import org.apache.kafka.common.header.Header;
9+
import org.apache.kafka.common.header.Headers;
10+
11+
public class TextMapExtractAdapter implements TextMap {
12+
13+
private final Map<String, String> map = new HashMap<>();
14+
15+
public TextMapExtractAdapter(final Headers headers) {
16+
for (final Header header : headers) {
17+
map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
18+
}
19+
}
20+
21+
@Override
22+
public Iterator<Map.Entry<String, String>> iterator() {
23+
return map.entrySet().iterator();
24+
}
25+
26+
@Override
27+
public void put(final String key, final String value) {
28+
throw new UnsupportedOperationException("Use inject adapter instead");
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import io.opentracing.propagation.TextMap;
4+
import java.nio.charset.StandardCharsets;
5+
import java.util.Iterator;
6+
import java.util.Map;
7+
import org.apache.kafka.common.header.Headers;
8+
9+
public class TextMapInjectAdapter implements TextMap {
10+
11+
private final Headers headers;
12+
13+
public TextMapInjectAdapter(final Headers headers) {
14+
this.headers = headers;
15+
}
16+
17+
@Override
18+
public Iterator<Map.Entry<String, String>> iterator() {
19+
throw new UnsupportedOperationException("Use extract adapter instead");
20+
}
21+
22+
@Override
23+
public void put(final String key, final String value) {
24+
headers.add(key, value.getBytes(StandardCharsets.UTF_8));
25+
}
26+
}

0 commit comments

Comments
 (0)