Skip to content

Commit fc23438

Browse files
adinauerclaude
andcommitted
feat(kafka): [Queue Instrumentation 34] Wrap Producer for send spans
Replace SentryKafkaProducerInterceptor with SentryKafkaProducer, a Producer<K,V> wrapper that records a queue.publish span around each send and finishes it when the broker ack callback fires. The span now reflects the full async send lifecycle, not just the synchronous onSend window. For Spring Boot, the SentryKafkaProducerBeanPostProcessor switches from patching KafkaTemplate.setProducerInterceptor(...) to installing a ProducerPostProcessor on every ProducerFactory bean via ProducerFactory.addPostProcessor(...). KafkaTemplate beans are no longer touched, so all customer-configured listeners, interceptors and observation settings are preserved. The console sample now wraps the raw KafkaProducer instead of setting INTERCEPTOR_CLASSES_CONFIG. Spring Boot samples need no change — the auto-configured ProducerPostProcessor is transparent. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent cac8c1f commit fc23438

16 files changed

Lines changed: 781 additions & 487 deletions

File tree

sentry-kafka/api/sentry-kafka.api

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing {
99
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
1010
}
1111

12-
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
12+
public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer {
1313
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
1414
public static final field TRACE_ORIGIN Ljava/lang/String;
15-
public fun <init> ()V
16-
public fun <init> (Lio/sentry/IScopes;)V
17-
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
15+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;)V
16+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V
17+
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V
18+
public fun abortTransaction ()V
19+
public fun beginTransaction ()V
20+
public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid;
1821
public fun close ()V
19-
public fun configure (Ljava/util/Map;)V
20-
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
21-
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
22+
public fun close (Ljava/time/Duration;)V
23+
public fun commitTransaction ()V
24+
public fun flush ()V
25+
public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer;
26+
public fun initTransactions ()V
27+
public fun metrics ()Ljava/util/Map;
28+
public fun partitionsFor (Ljava/lang/String;)Ljava/util/List;
29+
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future;
30+
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future;
31+
public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V
32+
public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V
33+
public fun toString ()Ljava/lang/String;
2234
}
2335

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ private void finishTransaction(
241241

242242
private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
243243
final @Nullable String enqueuedTimeStr =
244-
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
244+
headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER);
245245
if (enqueuedTimeStr == null) {
246246
return null;
247247
}
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package io.sentry.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
5+
import io.sentry.IScopes;
6+
import io.sentry.ISpan;
7+
import io.sentry.ScopesAdapter;
8+
import io.sentry.SentryLevel;
9+
import io.sentry.SentryTraceHeader;
10+
import io.sentry.SpanDataConvention;
11+
import io.sentry.SpanOptions;
12+
import io.sentry.SpanStatus;
13+
import io.sentry.util.SpanUtils;
14+
import io.sentry.util.TracingUtils;
15+
import java.nio.charset.StandardCharsets;
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.Future;
21+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
22+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
23+
import org.apache.kafka.clients.producer.Callback;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.clients.producer.RecordMetadata;
27+
import org.apache.kafka.common.Metric;
28+
import org.apache.kafka.common.MetricName;
29+
import org.apache.kafka.common.PartitionInfo;
30+
import org.apache.kafka.common.TopicPartition;
31+
import org.apache.kafka.common.Uuid;
32+
import org.apache.kafka.common.errors.ProducerFencedException;
33+
import org.apache.kafka.common.header.Header;
34+
import org.apache.kafka.common.header.Headers;
35+
import org.jetbrains.annotations.ApiStatus;
36+
import org.jetbrains.annotations.NotNull;
37+
import org.jetbrains.annotations.Nullable;
38+
39+
/**
40+
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
41+
* and to inject Sentry trace propagation headers into the produced record.
42+
*
43+
* <p>Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the
44+
* span open until the send callback fires, so the span reflects the actual broker-ack lifecycle.
45+
*
46+
* <p>For raw Kafka usage:
47+
*
48+
* <pre>{@code
49+
* Producer<String, String> producer =
50+
* new SentryKafkaProducer<>(new KafkaProducer<>(props));
51+
* }</pre>
52+
*
53+
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
54+
* sentry-spring-jakarta} installs this wrapper automatically via {@code
55+
* ProducerFactory.addPostProcessor(...)}.
56+
*/
57+
@ApiStatus.Experimental
58+
public final class SentryKafkaProducer<K, V> implements Producer<K, V> {
59+
60+
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
61+
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
62+
63+
private final @NotNull Producer<K, V> delegate;
64+
private final @NotNull IScopes scopes;
65+
private final @NotNull String traceOrigin;
66+
67+
public SentryKafkaProducer(final @NotNull Producer<K, V> delegate) {
68+
this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN);
69+
}
70+
71+
public SentryKafkaProducer(
72+
final @NotNull Producer<K, V> delegate, final @NotNull IScopes scopes) {
73+
this(delegate, scopes, TRACE_ORIGIN);
74+
}
75+
76+
public SentryKafkaProducer(
77+
final @NotNull Producer<K, V> delegate,
78+
final @NotNull IScopes scopes,
79+
final @NotNull String traceOrigin) {
80+
this.delegate = delegate;
81+
this.scopes = scopes;
82+
this.traceOrigin = traceOrigin;
83+
}
84+
85+
/** Returns the wrapped producer. */
86+
public @NotNull Producer<K, V> getDelegate() {
87+
return delegate;
88+
}
89+
90+
@Override
91+
public @NotNull Future<RecordMetadata> send(final @NotNull ProducerRecord<K, V> record) {
92+
return send(record, null);
93+
}
94+
95+
@Override
96+
public @NotNull Future<RecordMetadata> send(
97+
final @NotNull ProducerRecord<K, V> record, final @Nullable Callback callback) {
98+
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
99+
return delegate.send(record, callback);
100+
}
101+
102+
final @Nullable ISpan activeSpan = scopes.getSpan();
103+
if (activeSpan == null || activeSpan.isNoOp()) {
104+
return delegate.send(record, callback);
105+
}
106+
107+
@Nullable ISpan span = null;
108+
try {
109+
final @NotNull SpanOptions spanOptions = new SpanOptions();
110+
spanOptions.setOrigin(traceOrigin);
111+
span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
112+
if (span.isNoOp()) {
113+
return delegate.send(record, callback);
114+
}
115+
116+
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
117+
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
118+
injectHeaders(record.headers(), span);
119+
} catch (Throwable t) {
120+
if (span != null) {
121+
span.setThrowable(t);
122+
span.setStatus(SpanStatus.INTERNAL_ERROR);
123+
if (!span.isFinished()) {
124+
span.finish();
125+
}
126+
}
127+
scopes
128+
.getOptions()
129+
.getLogger()
130+
.log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t);
131+
return delegate.send(record, callback);
132+
}
133+
134+
final @NotNull ISpan finalSpan = span;
135+
final @NotNull Callback wrappedCallback = wrapCallback(callback, finalSpan);
136+
137+
try {
138+
return delegate.send(record, wrappedCallback);
139+
} catch (Throwable t) {
140+
finishWithError(finalSpan, t);
141+
throw t;
142+
}
143+
}
144+
145+
private @NotNull Callback wrapCallback(
146+
final @Nullable Callback userCallback, final @NotNull ISpan span) {
147+
return (metadata, exception) -> {
148+
try {
149+
if (exception != null) {
150+
span.setThrowable(exception);
151+
span.setStatus(SpanStatus.INTERNAL_ERROR);
152+
} else {
153+
span.setStatus(SpanStatus.OK);
154+
}
155+
} catch (Throwable t) {
156+
scopes
157+
.getOptions()
158+
.getLogger()
159+
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
160+
} finally {
161+
if (!span.isFinished()) {
162+
span.finish();
163+
}
164+
if (userCallback != null) {
165+
userCallback.onCompletion(metadata, exception);
166+
}
167+
}
168+
};
169+
}
170+
171+
private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
172+
span.setThrowable(t);
173+
span.setStatus(SpanStatus.INTERNAL_ERROR);
174+
if (!span.isFinished()) {
175+
span.finish();
176+
}
177+
}
178+
179+
private boolean isIgnored() {
180+
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
181+
}
182+
183+
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
184+
final @Nullable List<String> existingBaggageHeaders =
185+
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
186+
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
187+
TracingUtils.trace(scopes, existingBaggageHeaders, span);
188+
if (tracingHeaders != null) {
189+
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
190+
headers.remove(sentryTraceHeader.getName());
191+
headers.add(
192+
sentryTraceHeader.getName(),
193+
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
194+
195+
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
196+
if (baggageHeader != null) {
197+
headers.remove(baggageHeader.getName());
198+
headers.add(
199+
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
200+
}
201+
}
202+
203+
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
204+
headers.add(
205+
SENTRY_ENQUEUED_TIME_HEADER,
206+
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
207+
.toString()
208+
.getBytes(StandardCharsets.UTF_8));
209+
}
210+
211+
private static @Nullable List<String> readHeaderValues(
212+
final @NotNull Headers headers, final @NotNull String name) {
213+
@Nullable List<String> values = null;
214+
for (final @NotNull Header header : headers.headers(name)) {
215+
final byte @Nullable [] value = header.value();
216+
if (value != null) {
217+
if (values == null) {
218+
values = new ArrayList<>();
219+
}
220+
values.add(new String(value, StandardCharsets.UTF_8));
221+
}
222+
}
223+
return values;
224+
}
225+
226+
// --- Pure delegation for everything else ---
227+
228+
@Override
229+
public void initTransactions() {
230+
delegate.initTransactions();
231+
}
232+
233+
@Override
234+
public void beginTransaction() throws ProducerFencedException {
235+
delegate.beginTransaction();
236+
}
237+
238+
@Override
239+
@SuppressWarnings("deprecation")
240+
public void sendOffsetsToTransaction(
241+
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
242+
final @NotNull String consumerGroupId)
243+
throws ProducerFencedException {
244+
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
245+
}
246+
247+
@Override
248+
public void sendOffsetsToTransaction(
249+
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
250+
final @NotNull ConsumerGroupMetadata groupMetadata)
251+
throws ProducerFencedException {
252+
delegate.sendOffsetsToTransaction(offsets, groupMetadata);
253+
}
254+
255+
@Override
256+
public void commitTransaction() throws ProducerFencedException {
257+
delegate.commitTransaction();
258+
}
259+
260+
@Override
261+
public void abortTransaction() throws ProducerFencedException {
262+
delegate.abortTransaction();
263+
}
264+
265+
@Override
266+
public void flush() {
267+
delegate.flush();
268+
}
269+
270+
@Override
271+
public @NotNull List<PartitionInfo> partitionsFor(final @NotNull String topic) {
272+
return delegate.partitionsFor(topic);
273+
}
274+
275+
@Override
276+
public @NotNull Map<MetricName, ? extends Metric> metrics() {
277+
return delegate.metrics();
278+
}
279+
280+
@Override
281+
public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) {
282+
return delegate.clientInstanceId(timeout);
283+
}
284+
285+
@Override
286+
public void close() {
287+
delegate.close();
288+
}
289+
290+
@Override
291+
public void close(final @NotNull Duration timeout) {
292+
delegate.close(timeout);
293+
}
294+
295+
@Override
296+
public @NotNull String toString() {
297+
return "SentryKafkaProducer[delegate=" + delegate + "]";
298+
}
299+
}

0 commit comments

Comments
 (0)