Skip to content

Commit 6c836c1

Browse files
authored
Merge pull request #5338 from getsentry/feat/kafka-producer-wrapper
fix(kafka): [Queue Instrumentation 35] Inject trace headers without real active span
2 parents 2064336 + 18e05fa commit 6c836c1

17 files changed

Lines changed: 749 additions & 489 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
### Fixes
1818

19+
- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338))
1920
- Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328))
2021

2122
## 8.37.1

sentry-kafka/api/sentry-kafka.api

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,11 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing {
99
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
1010
}
1111

12-
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
12+
public final class io/sentry/kafka/SentryKafkaProducer {
1313
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
1414
public static final field TRACE_ORIGIN Ljava/lang/String;
15-
public fun <init> ()V
16-
public fun <init> (Lio/sentry/IScopes;)V
17-
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
18-
public fun close ()V
19-
public fun configure (Ljava/util/Map;)V
20-
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
21-
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
15+
public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;)Lorg/apache/kafka/clients/producer/Producer;
16+
public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)Lorg/apache/kafka/clients/producer/Producer;
17+
public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)Lorg/apache/kafka/clients/producer/Producer;
2218
}
2319

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private void finishTransaction(
241241

242242
private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
243243
final @Nullable String enqueuedTimeStr =
244-
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
244+
headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER);
245245
if (enqueuedTimeStr == null) {
246246
return null;
247247
}
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
package io.sentry.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
5+
import io.sentry.IScopes;
6+
import io.sentry.ISpan;
7+
import io.sentry.ScopesAdapter;
8+
import io.sentry.SentryLevel;
9+
import io.sentry.SentryTraceHeader;
10+
import io.sentry.SpanDataConvention;
11+
import io.sentry.SpanOptions;
12+
import io.sentry.SpanStatus;
13+
import io.sentry.util.SpanUtils;
14+
import io.sentry.util.TracingUtils;
15+
import java.lang.reflect.InvocationHandler;
16+
import java.lang.reflect.InvocationTargetException;
17+
import java.lang.reflect.Method;
18+
import java.lang.reflect.Proxy;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.apache.kafka.clients.producer.Callback;
23+
import org.apache.kafka.clients.producer.Producer;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.header.Header;
26+
import org.apache.kafka.common.header.Headers;
27+
import org.jetbrains.annotations.ApiStatus;
28+
import org.jetbrains.annotations.NotNull;
29+
import org.jetbrains.annotations.Nullable;
30+
31+
/**
32+
* Wraps a Kafka {@link Producer} via {@link Proxy} to record a {@code queue.publish} span around
33+
* each {@code send} and to inject Sentry trace propagation headers into the produced record.
34+
*
35+
* <p>Only the two {@code send} overloads are intercepted; every other {@link Producer} method is
36+
* forwarded directly to the delegate. Because the wrapper is a dynamic proxy, it is compatible with
37+
* any Kafka client version — new methods added to the {@link Producer} interface in future Kafka
38+
* releases are forwarded automatically without recompilation.
39+
*
40+
* <p>For raw Kafka usage:
41+
*
42+
* <pre>{@code
43+
* Producer<String, String> producer =
44+
* SentryKafkaProducer.wrap(new KafkaProducer<>(props));
45+
* }</pre>
46+
*
47+
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
48+
* sentry-spring-jakarta} installs this wrapper automatically via {@code
49+
* ProducerFactory.addPostProcessor(...)}.
50+
*/
51+
@ApiStatus.Experimental
52+
public final class SentryKafkaProducer {
53+
54+
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
55+
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
56+
57+
private SentryKafkaProducer() {}
58+
59+
/**
60+
* Wraps the given producer with Sentry instrumentation using the global scopes.
61+
*
62+
* @param delegate the Kafka producer to wrap
63+
* @return an instrumented producer that records {@code queue.publish} spans
64+
* @param <K> the Kafka record key type
65+
* @param <V> the Kafka record value type
66+
*/
67+
public static <K, V> @NotNull Producer<K, V> wrap(final @NotNull Producer<K, V> delegate) {
68+
return wrap(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN);
69+
}
70+
71+
/**
72+
* Wraps the given producer with Sentry instrumentation using the provided scopes.
73+
*
74+
* @param delegate the Kafka producer to wrap
75+
* @param scopes the Sentry scopes to use for span creation and header injection
76+
* @return an instrumented producer that records {@code queue.publish} spans
77+
* @param <K> the Kafka record key type
78+
* @param <V> the Kafka record value type
79+
*/
80+
public static <K, V> @NotNull Producer<K, V> wrap(
81+
final @NotNull Producer<K, V> delegate, final @NotNull IScopes scopes) {
82+
return wrap(delegate, scopes, TRACE_ORIGIN);
83+
}
84+
85+
/**
86+
* Wraps the given producer with Sentry instrumentation.
87+
*
88+
* @param delegate the Kafka producer to wrap
89+
* @param scopes the Sentry scopes to use for span creation and header injection
90+
* @param traceOrigin the trace origin to set on created spans
91+
* @return an instrumented producer that records {@code queue.publish} spans
92+
* @param <K> the Kafka record key type
93+
* @param <V> the Kafka record value type
94+
*/
95+
@SuppressWarnings("unchecked")
96+
public static <K, V> @NotNull Producer<K, V> wrap(
97+
final @NotNull Producer<K, V> delegate,
98+
final @NotNull IScopes scopes,
99+
final @NotNull String traceOrigin) {
100+
return (Producer<K, V>)
101+
Proxy.newProxyInstance(
102+
delegate.getClass().getClassLoader(),
103+
new Class<?>[] {Producer.class},
104+
new SentryProducerHandler<>(delegate, scopes, traceOrigin));
105+
}
106+
107+
static final class SentryProducerHandler<K, V> implements InvocationHandler {
108+
109+
final @NotNull Producer<K, V> delegate;
110+
private final @NotNull IScopes scopes;
111+
private final @NotNull String traceOrigin;
112+
113+
SentryProducerHandler(
114+
final @NotNull Producer<K, V> delegate,
115+
final @NotNull IScopes scopes,
116+
final @NotNull String traceOrigin) {
117+
this.delegate = delegate;
118+
this.scopes = scopes;
119+
this.traceOrigin = traceOrigin;
120+
}
121+
122+
@Override
123+
@SuppressWarnings("unchecked")
124+
public @Nullable Object invoke(
125+
final @NotNull Object proxy, final @NotNull Method method, final @Nullable Object[] args)
126+
throws Throwable {
127+
if ("send".equals(method.getName()) && args != null) {
128+
if (args.length == 1) {
129+
return instrumentedSend((ProducerRecord<K, V>) args[0], null);
130+
} else if (args.length == 2) {
131+
return instrumentedSend((ProducerRecord<K, V>) args[0], (Callback) args[1]);
132+
}
133+
}
134+
135+
if ("toString".equals(method.getName()) && (args == null || args.length == 0)) {
136+
return "SentryKafkaProducer[delegate=" + delegate + "]";
137+
}
138+
139+
try {
140+
return method.invoke(delegate, args);
141+
} catch (InvocationTargetException e) {
142+
throw e.getCause();
143+
}
144+
}
145+
146+
@SuppressWarnings("unchecked")
147+
private @NotNull Object instrumentedSend(
148+
final @NotNull ProducerRecord<K, V> record, final @Nullable Callback callback) {
149+
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
150+
return delegate.send(record, callback);
151+
}
152+
153+
final @Nullable ISpan activeSpan = scopes.getSpan();
154+
if (activeSpan == null || activeSpan.isNoOp()) {
155+
maybeInjectHeaders(record.headers(), null);
156+
return delegate.send(record, callback);
157+
}
158+
159+
final @NotNull SpanOptions spanOptions = new SpanOptions();
160+
spanOptions.setOrigin(traceOrigin);
161+
final @NotNull ISpan span =
162+
activeSpan.startChild("queue.publish", record.topic(), spanOptions);
163+
164+
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
165+
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
166+
maybeInjectHeaders(record.headers(), span);
167+
168+
try {
169+
return delegate.send(record, wrapCallback(callback, span));
170+
} catch (Throwable t) {
171+
finishWithError(span, t);
172+
throw t;
173+
}
174+
}
175+
176+
private @NotNull Callback wrapCallback(
177+
final @Nullable Callback userCallback, final @NotNull ISpan span) {
178+
return (metadata, exception) -> {
179+
try {
180+
if (exception != null) {
181+
span.setThrowable(exception);
182+
span.setStatus(SpanStatus.INTERNAL_ERROR);
183+
} else {
184+
span.setStatus(SpanStatus.OK);
185+
}
186+
} catch (Throwable t) {
187+
scopes
188+
.getOptions()
189+
.getLogger()
190+
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
191+
} finally {
192+
try {
193+
span.finish();
194+
} finally {
195+
if (userCallback != null) {
196+
userCallback.onCompletion(metadata, exception);
197+
}
198+
}
199+
}
200+
};
201+
}
202+
203+
private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
204+
span.setThrowable(t);
205+
span.setStatus(SpanStatus.INTERNAL_ERROR);
206+
span.finish();
207+
}
208+
209+
private boolean isIgnored() {
210+
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
211+
}
212+
213+
private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) {
214+
try {
215+
final @Nullable List<String> existingBaggageHeaders =
216+
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
217+
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
218+
TracingUtils.trace(scopes, existingBaggageHeaders, span);
219+
if (tracingHeaders != null) {
220+
final @NotNull SentryTraceHeader sentryTraceHeader =
221+
tracingHeaders.getSentryTraceHeader();
222+
headers.remove(sentryTraceHeader.getName());
223+
headers.add(
224+
sentryTraceHeader.getName(),
225+
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
226+
227+
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
228+
if (baggageHeader != null) {
229+
headers.remove(baggageHeader.getName());
230+
headers.add(
231+
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
232+
}
233+
}
234+
235+
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
236+
headers.add(
237+
SENTRY_ENQUEUED_TIME_HEADER,
238+
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
239+
.toString()
240+
.getBytes(StandardCharsets.UTF_8));
241+
} catch (Throwable t) {
242+
scopes
243+
.getOptions()
244+
.getLogger()
245+
.log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t);
246+
}
247+
}
248+
249+
private static @Nullable List<String> readHeaderValues(
250+
final @NotNull Headers headers, final @NotNull String name) {
251+
@Nullable List<String> values = null;
252+
for (final @NotNull Header header : headers.headers(name)) {
253+
final byte @Nullable [] value = header.value();
254+
if (value != null) {
255+
if (values == null) {
256+
values = new ArrayList<>();
257+
}
258+
values.add(new String(value, StandardCharsets.UTF_8));
259+
}
260+
}
261+
return values;
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)