11package io .sentry .spring .jakarta .kafka ;
22
3- import io .micrometer .observation .Observation ;
43import io .sentry .BaggageHeader ;
54import io .sentry .IScopes ;
65import io .sentry .ISpan ;
109import io .sentry .SpanStatus ;
1110import io .sentry .util .TracingUtils ;
1211import java .nio .charset .StandardCharsets ;
13- import java .util .concurrent .CompletableFuture ;
12+ import java .util .Map ;
13+ import org .apache .kafka .clients .producer .ProducerInterceptor ;
1414import org .apache .kafka .clients .producer .ProducerRecord ;
15+ import org .apache .kafka .clients .producer .RecordMetadata ;
1516import org .apache .kafka .common .header .Headers ;
1617import org .jetbrains .annotations .ApiStatus ;
1718import org .jetbrains .annotations .NotNull ;
1819import org .jetbrains .annotations .Nullable ;
19- import org .springframework .kafka .core .KafkaTemplate ;
20- import org .springframework .kafka .support .SendResult ;
2120
2221/**
23- * Wraps a {@link KafkaTemplate} to create {@code queue.publish} spans for Kafka send operations.
22+ * A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
23+ * headers into outgoing records.
2424 *
25- * <p>Overrides {@code doSend} which is the common path for all send variants in {@link
26- * KafkaTemplate}.
25+ * <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
26+ * "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
27+ * #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
28+ *
29+ * <p>If the customer already has a {@link ProducerInterceptor}, the {@link
30+ * SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
31+ * org.springframework.kafka.support.CompositeProducerInterceptor}.
2732 */
2833@ ApiStatus .Internal
29- public final class SentryKafkaProducerWrapper <K , V > extends KafkaTemplate <K , V > {
34+ public final class SentryProducerInterceptor <K , V > implements ProducerInterceptor <K , V > {
3035
3136 static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer" ;
3237 static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time" ;
3338
3439 private final @ NotNull IScopes scopes ;
3540
36- public SentryKafkaProducerWrapper (
37- final @ NotNull KafkaTemplate <K , V > delegate , final @ NotNull IScopes scopes ) {
38- super (delegate .getProducerFactory ());
41+ public SentryProducerInterceptor (final @ NotNull IScopes scopes ) {
3942 this .scopes = scopes ;
40- this .setDefaultTopic (delegate .getDefaultTopic ());
41- if (delegate .isTransactional ()) {
42- this .setTransactionIdPrefix (delegate .getTransactionIdPrefix ());
43- }
44- this .setMessageConverter (delegate .getMessageConverter ());
45- this .setMicrometerTagsProvider (delegate .getMicrometerTagsProvider ());
4643 }
4744
4845 @ Override
49- protected @ NotNull CompletableFuture <SendResult <K , V >> doSend (
50- final @ NotNull ProducerRecord <K , V > record , final @ Nullable Observation observation ) {
46+ public @ NotNull ProducerRecord <K , V > onSend (final @ NotNull ProducerRecord <K , V > record ) {
5147 if (!scopes .getOptions ().isEnableQueueTracing ()) {
52- return super . doSend ( record , observation ) ;
48+ return record ;
5349 }
5450
5551 final @ Nullable ISpan activeSpan = scopes .getSpan ();
5652 if (activeSpan == null || activeSpan .isNoOp ()) {
57- return super . doSend ( record , observation ) ;
53+ return record ;
5854 }
5955
6056 final @ NotNull SpanOptions spanOptions = new SpanOptions ();
6157 spanOptions .setOrigin (TRACE_ORIGIN );
6258 final @ NotNull ISpan span = activeSpan .startChild ("queue.publish" , record .topic (), spanOptions );
6359 if (span .isNoOp ()) {
64- return super . doSend ( record , observation ) ;
60+ return record ;
6561 }
6662
6763 span .setData (SpanDataConvention .MESSAGING_SYSTEM , "kafka" );
@@ -73,27 +69,22 @@ public SentryKafkaProducerWrapper(
7369 // Header injection must not break the send
7470 }
7571
76- final @ NotNull CompletableFuture <SendResult <K , V >> future ;
77- try {
78- future = super .doSend (record , observation );
79- return future .whenComplete (
80- (result , throwable ) -> {
81- if (throwable != null ) {
82- span .setStatus (SpanStatus .INTERNAL_ERROR );
83- span .setThrowable (throwable );
84- } else {
85- span .setStatus (SpanStatus .OK );
86- }
87- span .finish ();
88- });
89- } catch (Throwable e ) {
90- span .setStatus (SpanStatus .INTERNAL_ERROR );
91- span .setThrowable (e );
92- span .finish ();
93- throw e ;
94- }
72+ span .setStatus (SpanStatus .OK );
73+ span .finish ();
74+
75+ return record ;
9576 }
9677
78+ @ Override
79+ public void onAcknowledgement (
80+ final @ Nullable RecordMetadata metadata , final @ Nullable Exception exception ) {}
81+
82+ @ Override
83+ public void close () {}
84+
85+ @ Override
86+ public void configure (final @ Nullable Map <String , ?> configs ) {}
87+
9788 private void injectHeaders (final @ NotNull Headers headers , final @ NotNull ISpan span ) {
9889 final @ Nullable TracingUtils .TracingHeaders tracingHeaders =
9990 TracingUtils .trace (scopes , null , span );
0 commit comments