From 0ca42c8e3130ea6c805e48aeb541b1ee1345d28f Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 27 Apr 2026 15:15:47 +0200 Subject: [PATCH 1/3] ref(kafka): Reimplement SentryKafkaProducer as a dynamic Proxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the concrete `implements Producer` class with a `Proxy.newProxyInstance`-based wrapper that intercepts only the two `send()` overloads and forwards every other method reflectively to the delegate. The concrete class required explicitly delegating every method on the `Producer` interface, coupling the wrapper to a specific Kafka version: `clientInstanceId(Duration)` was added in Kafka 3.7, and the deprecated `sendOffsetsToTransaction(Map, String)` was removed in Kafka 4.0. The dynamic proxy has no such coupling — new or removed interface methods are handled automatically, giving full compatibility across all Kafka client versions. Public API change: `SentryKafkaProducer` is now a utility class with static `wrap()` overloads instead of constructors. Callers wrap a producer with `SentryKafkaProducer.wrap(producer)`. The Spring BPP and console sample are updated accordingly. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 24 +- .../io/sentry/kafka/SentryKafkaProducer.java | 385 +++++++++--------- .../sentry/kafka/SentryKafkaProducerTest.kt | 44 +- .../samples/console/kafka/KafkaShowcase.java | 4 +- .../SentryKafkaProducerBeanPostProcessor.java | 12 +- ...entryKafkaProducerBeanPostProcessorTest.kt | 17 +- 6 files changed, 219 insertions(+), 267 deletions(-) diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 64bb34a229..0064924584 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -9,27 +9,11 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing { public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object; } -public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer { +public final class io/sentry/kafka/SentryKafkaProducer { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; - public fun (Lorg/apache/kafka/clients/producer/Producer;)V - public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V - public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V - public fun abortTransaction ()V - public fun beginTransaction ()V - public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid; - public fun close ()V - public fun close (Ljava/time/Duration;)V - public fun commitTransaction ()V - public fun flush ()V - public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer; - public fun initTransactions ()V - public fun metrics ()Ljava/util/Map; - public fun partitionsFor (Ljava/lang/String;)Ljava/util/List; - public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future; - public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future; - public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V - public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V - public fun toString ()Ljava/lang/String; + public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;)Lorg/apache/kafka/clients/producer/Producer; + public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)Lorg/apache/kafka/clients/producer/Producer; + public static fun wrap (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)Lorg/apache/kafka/clients/producer/Producer; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java index 500e2bc90e..7400e5ba2c 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java @@ -12,24 +12,16 @@ import io.sentry.SpanStatus; import io.sentry.util.SpanUtils; import io.sentry.util.TracingUtils; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.jetbrains.annotations.ApiStatus; @@ -37,17 +29,19 @@ import org.jetbrains.annotations.Nullable; /** - * Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send} - * and to inject Sentry trace propagation headers into the produced record. + * Wraps a Kafka {@link Producer} via {@link Proxy} to record a {@code queue.publish} span around + * each {@code send} and to inject Sentry trace propagation headers into the produced record. * - *

Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the - * span open until the send callback fires, so the span reflects the actual broker-ack lifecycle. + *

Only the two {@code send} overloads are intercepted; every other {@link Producer} method is + * forwarded directly to the delegate. Because the wrapper is a dynamic proxy, it is compatible with + * any Kafka client version — new methods added to the {@link Producer} interface in future Kafka + * releases are forwarded automatically without recompilation. * *

For raw Kafka usage: * *

{@code
  * Producer producer =
- *     new SentryKafkaProducer<>(new KafkaProducer<>(props));
+ *     SentryKafkaProducer.wrap(new KafkaProducer<>(props));
  * }
* *

For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code @@ -55,227 +49,216 @@ * ProducerFactory.addPostProcessor(...)}. */ @ApiStatus.Experimental -public final class SentryKafkaProducer implements Producer { +public final class SentryKafkaProducer { public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; - private final @NotNull Producer delegate; - private final @NotNull IScopes scopes; - private final @NotNull String traceOrigin; - - public SentryKafkaProducer(final @NotNull Producer delegate) { - this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN); + private SentryKafkaProducer() {} + + /** + * Wraps the given producer with Sentry instrumentation using the global scopes. + * + * @param delegate the Kafka producer to wrap + * @return an instrumented producer that records {@code queue.publish} spans + * @param the Kafka record key type + * @param the Kafka record value type + */ + public static @NotNull Producer wrap(final @NotNull Producer delegate) { + return wrap(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN); } - public SentryKafkaProducer( + /** + * Wraps the given producer with Sentry instrumentation using the provided scopes. + * + * @param delegate the Kafka producer to wrap + * @param scopes the Sentry scopes to use for span creation and header injection + * @return an instrumented producer that records {@code queue.publish} spans + * @param the Kafka record key type + * @param the Kafka record value type + */ + public static @NotNull Producer wrap( final @NotNull Producer delegate, final @NotNull IScopes scopes) { - this(delegate, scopes, TRACE_ORIGIN); + return wrap(delegate, scopes, TRACE_ORIGIN); } - public SentryKafkaProducer( + /** + * Wraps the given producer with Sentry instrumentation. + * + * @param delegate the Kafka producer to wrap + * @param scopes the Sentry scopes to use for span creation and header injection + * @param traceOrigin the trace origin to set on created spans + * @return an instrumented producer that records {@code queue.publish} spans + * @param the Kafka record key type + * @param the Kafka record value type + */ + @SuppressWarnings("unchecked") + public static @NotNull Producer wrap( final @NotNull Producer delegate, final @NotNull IScopes scopes, final @NotNull String traceOrigin) { - this.delegate = delegate; - this.scopes = scopes; - this.traceOrigin = traceOrigin; + return (Producer) + Proxy.newProxyInstance( + delegate.getClass().getClassLoader(), + new Class[] {Producer.class}, + new SentryProducerHandler<>(delegate, scopes, traceOrigin)); } - /** Returns the wrapped producer. */ - public @NotNull Producer getDelegate() { - return delegate; - } + static final class SentryProducerHandler implements InvocationHandler { - @Override - public @NotNull Future send(final @NotNull ProducerRecord record) { - return send(record, null); - } + final @NotNull Producer delegate; + private final @NotNull IScopes scopes; + private final @NotNull String traceOrigin; - @Override - public @NotNull Future send( - final @NotNull ProducerRecord record, final @Nullable Callback callback) { - if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { - return delegate.send(record, callback); + SentryProducerHandler( + final @NotNull Producer delegate, + final @NotNull IScopes scopes, + final @NotNull String traceOrigin) { + this.delegate = delegate; + this.scopes = scopes; + this.traceOrigin = traceOrigin; } - final @Nullable ISpan activeSpan = scopes.getSpan(); - if (activeSpan == null || activeSpan.isNoOp()) { - maybeInjectHeaders(record.headers(), null); - return delegate.send(record, callback); + @Override + @SuppressWarnings("unchecked") + public @Nullable Object invoke( + final @NotNull Object proxy, final @NotNull Method method, final @Nullable Object[] args) + throws Throwable { + if ("send".equals(method.getName()) && args != null) { + if (args.length == 1) { + return instrumentedSend((ProducerRecord) args[0], null); + } else if (args.length == 2) { + return instrumentedSend((ProducerRecord) args[0], (Callback) args[1]); + } + } + + if ("toString".equals(method.getName()) && (args == null || args.length == 0)) { + return "SentryKafkaProducer[delegate=" + delegate + "]"; + } + + try { + return method.invoke(delegate, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } } - final @NotNull SpanOptions spanOptions = new SpanOptions(); - spanOptions.setOrigin(traceOrigin); - final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); + @SuppressWarnings("unchecked") + private @NotNull Object instrumentedSend( + final @NotNull ProducerRecord record, final @Nullable Callback callback) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegate.send(record, callback); + } - span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); - maybeInjectHeaders(record.headers(), span); + final @Nullable ISpan activeSpan = scopes.getSpan(); + if (activeSpan == null || activeSpan.isNoOp()) { + maybeInjectHeaders(record.headers(), null); + return delegate.send(record, callback); + } - try { - return delegate.send(record, wrapCallback(callback, span)); - } catch (Throwable t) { - finishWithError(span, t); - throw t; - } - } + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(traceOrigin); + final @NotNull ISpan span = + activeSpan.startChild("queue.publish", record.topic(), spanOptions); + + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + maybeInjectHeaders(record.headers(), span); - private @NotNull Callback wrapCallback( - final @Nullable Callback userCallback, final @NotNull ISpan span) { - return (metadata, exception) -> { try { - if (exception != null) { - span.setThrowable(exception); - span.setStatus(SpanStatus.INTERNAL_ERROR); - } else { - span.setStatus(SpanStatus.OK); - } + return delegate.send(record, wrapCallback(callback, span)); } catch (Throwable t) { - scopes - .getOptions() - .getLogger() - .log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t); - } finally { - span.finish(); - if (userCallback != null) { - userCallback.onCompletion(metadata, exception); - } + finishWithError(span, t); + throw t; } - }; - } + } - private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) { - span.setThrowable(t); - span.setStatus(SpanStatus.INTERNAL_ERROR); - span.finish(); - } + private @NotNull Callback wrapCallback( + final @Nullable Callback userCallback, final @NotNull ISpan span) { + return (metadata, exception) -> { + try { + if (exception != null) { + span.setThrowable(exception); + span.setStatus(SpanStatus.INTERNAL_ERROR); + } else { + span.setStatus(SpanStatus.OK); + } + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t); + } finally { + try { + span.finish(); + } finally { + if (userCallback != null) { + userCallback.onCompletion(metadata, exception); + } + } + } + }; + } - private boolean isIgnored() { - return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); - } + private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) { + span.setThrowable(t); + span.setStatus(SpanStatus.INTERNAL_ERROR); + span.finish(); + } - private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) { - try { - final @Nullable List existingBaggageHeaders = - readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); - final @Nullable TracingUtils.TracingHeaders tracingHeaders = - TracingUtils.trace(scopes, existingBaggageHeaders, span); - if (tracingHeaders != null) { - final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); - headers.remove(sentryTraceHeader.getName()); - headers.add( - sentryTraceHeader.getName(), - sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); + } - final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); - if (baggageHeader != null) { - headers.remove(baggageHeader.getName()); + private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) { + try { + final @Nullable List existingBaggageHeaders = + readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, existingBaggageHeaders, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = + tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); headers.add( - baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } } - } - headers.remove(SENTRY_ENQUEUED_TIME_HEADER); - headers.add( - SENTRY_ENQUEUED_TIME_HEADER, - DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) - .toString() - .getBytes(StandardCharsets.UTF_8)); - } catch (Throwable t) { - scopes - .getOptions() - .getLogger() - .log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t); + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t); + } } - } - private static @Nullable List readHeaderValues( - final @NotNull Headers headers, final @NotNull String name) { - @Nullable List values = null; - for (final @NotNull Header header : headers.headers(name)) { - final byte @Nullable [] value = header.value(); - if (value != null) { - if (values == null) { - values = new ArrayList<>(); + private static @Nullable List readHeaderValues( + final @NotNull Headers headers, final @NotNull String name) { + @Nullable List values = null; + for (final @NotNull Header header : headers.headers(name)) { + final byte @Nullable [] value = header.value(); + if (value != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(value, StandardCharsets.UTF_8)); } - values.add(new String(value, StandardCharsets.UTF_8)); } + return values; } - return values; - } - - // --- Pure delegation for everything else --- - - @Override - public void initTransactions() { - delegate.initTransactions(); - } - - @Override - public void beginTransaction() throws ProducerFencedException { - delegate.beginTransaction(); - } - - @Override - @SuppressWarnings("deprecation") - public void sendOffsetsToTransaction( - final @NotNull Map offsets, - final @NotNull String consumerGroupId) - throws ProducerFencedException { - delegate.sendOffsetsToTransaction(offsets, consumerGroupId); - } - - @Override - public void sendOffsetsToTransaction( - final @NotNull Map offsets, - final @NotNull ConsumerGroupMetadata groupMetadata) - throws ProducerFencedException { - delegate.sendOffsetsToTransaction(offsets, groupMetadata); - } - - @Override - public void commitTransaction() throws ProducerFencedException { - delegate.commitTransaction(); - } - - @Override - public void abortTransaction() throws ProducerFencedException { - delegate.abortTransaction(); - } - - @Override - public void flush() { - delegate.flush(); - } - - @Override - public @NotNull List partitionsFor(final @NotNull String topic) { - return delegate.partitionsFor(topic); - } - - @Override - public @NotNull Map metrics() { - return delegate.metrics(); - } - - @Override - public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) { - return delegate.clientInstanceId(timeout); - } - - @Override - public void close() { - delegate.close(); - } - - @Override - public void close(final @NotNull Duration timeout) { - delegate.close(timeout); - } - - @Override - public @NotNull String toString() { - return "SentryKafkaProducer[delegate=" + delegate + "]"; } } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt index 90a6bb259b..48f0fecd0c 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt @@ -81,7 +81,7 @@ class SentryKafkaProducerTest { @Test fun `creates queue publish span and injects headers`() { val tx = createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -110,7 +110,7 @@ class SentryKafkaProducerTest { @Test fun `delegates send and does not finish span synchronously`() { val tx = createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -123,7 +123,7 @@ class SentryKafkaProducerTest { @Test fun `finishes span as OK when broker ack callback succeeds`() { val tx = createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -141,7 +141,7 @@ class SentryKafkaProducerTest { @Test fun `finishes span as INTERNAL_ERROR when broker ack callback fails`() { val tx = createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") val exception = RuntimeException("boom") @@ -160,7 +160,7 @@ class SentryKafkaProducerTest { @Test fun `forwards user callback after finishing span`() { createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") val userCallback = mock() @@ -179,7 +179,7 @@ class SentryKafkaProducerTest { val tx = createTransaction() val exception = RuntimeException("kaboom") whenever(delegate.send(any(), any())).thenThrow(exception) - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") val thrown = runCatching { producer.send(record) }.exceptionOrNull() @@ -195,7 +195,7 @@ class SentryKafkaProducerTest { fun `delegates send without span when queue tracing is disabled`() { createTransaction() options.isEnableQueueTracing = false - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -207,7 +207,7 @@ class SentryKafkaProducerTest { fun `delegates send without span when trace origin is ignored`() { val tx = createTransaction() options.setIgnoredSpanOrigins(listOf(SentryKafkaProducer.TRACE_ORIGIN)) - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -220,7 +220,7 @@ class SentryKafkaProducerTest { @Test fun `injects headers but creates no span when no active span`() { whenever(scopes.span).thenReturn(null) - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -235,7 +235,7 @@ class SentryKafkaProducerTest { @Test fun `preserves pre-existing third-party baggage header entries`() { createTransaction() - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") record .headers() @@ -274,7 +274,7 @@ class SentryKafkaProducerTest { whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)) .thenThrow(RuntimeException("boom")) - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) producer.send(record) // Header injection failed silently; send still proceeds with wrapped callback for span @@ -284,7 +284,7 @@ class SentryKafkaProducerTest { @Test fun `delegates non-send methods to underlying producer`() { - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) producer.flush() producer.partitionsFor("my-topic") @@ -298,14 +298,14 @@ class SentryKafkaProducerTest { } @Test - fun `no-arg constructor uses current scopes`() { + fun `default wrap uses current scopes`() { val transaction = Sentry.startTransaction("tx", "op") val record = ProducerRecord("my-topic", "key", "value") try { val token: ISentryLifecycleToken = transaction.makeCurrent() try { - val producer = SentryKafkaProducer(delegate) + val producer = SentryKafkaProducer.wrap(delegate) producer.send(record) } finally { token.close() @@ -319,18 +319,12 @@ class SentryKafkaProducerTest { verify(delegate).send(eq(record), any()) } - @Test - fun `getDelegate exposes wrapped producer`() { - val producer = SentryKafkaProducer(delegate, scopes) - assertSame(delegate, producer.delegate) - } - @Test fun `wraps callback even when child span is no-op`() { val tx = createTransaction() - // Set max spans to 1 so the child span is no-op (over limit) + // Set max spans to 0 so the child span is no-op (over limit) options.maxSpans = 0 - val producer = SentryKafkaProducer(delegate, scopes) + val producer = SentryKafkaProducer.wrap(delegate, scopes) val record = ProducerRecord("my-topic", "key", "value") producer.send(record) @@ -342,4 +336,10 @@ class SentryKafkaProducerTest { assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER)) assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) } + + @Test + fun `toString includes delegate`() { + val producer = SentryKafkaProducer.wrap(delegate, scopes) + assertTrue(producer.toString().startsWith("SentryKafkaProducer[delegate=")) + } } diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index cc819ac0db..de85e46b25 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -39,9 +39,9 @@ public static void runKafkaWithSentryTracing(final String bootstrapServers) { final KafkaProducer rawProducer = new KafkaProducer<>(producerProperties); // 2. >>> Sentry instrumentation <<< - // Wrap it in SentryKafkaProducer so every send is captured as a + // Wrap it with SentryKafkaProducer.wrap() so every send is captured as a // `queue.publish` span that closes when the broker ack callback fires. - final Producer producer = new SentryKafkaProducer<>(rawProducer); + final Producer producer = SentryKafkaProducer.wrap(rawProducer); try (producer) { Thread.sleep(500); diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java index ed3faba853..2f6eccaf0f 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -14,7 +14,8 @@ /** * Installs a {@link ProducerPostProcessor} on every {@link ProducerFactory} bean so that each - * {@link Producer} created by Spring Kafka is wrapped in a {@link SentryKafkaProducer}. + * {@link Producer} created by Spring Kafka is wrapped via {@link SentryKafkaProducer#wrap + * SentryKafkaProducer.wrap(Producer)}. * *

The wrapper records a {@code queue.publish} span around each {@code send(...)} that finishes * when the broker ack callback fires, giving a real producer-send lifecycle span. {@code @@ -55,16 +56,13 @@ public int getOrder() { } /** - * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} in a - * {@link SentryKafkaProducer}, unless it is already wrapped. + * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} via + * {@link SentryKafkaProducer#wrap}. */ static final class SentryProducerPostProcessor implements ProducerPostProcessor { @Override public @NotNull Producer apply(final @NotNull Producer producer) { - if (producer instanceof SentryKafkaProducer) { - return producer; - } - return new SentryKafkaProducer<>( + return SentryKafkaProducer.wrap( producer, ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt index 9d36e9274c..a1ff2880f1 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -1,6 +1,5 @@ package io.sentry.spring.jakarta.kafka -import io.sentry.kafka.SentryKafkaProducer import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertSame @@ -68,25 +67,13 @@ class SentryKafkaProducerBeanPostProcessorTest { } @Test - fun `registered post-processor wraps producers in SentryKafkaProducer`() { + fun `registered post-processor wraps producers via SentryKafkaProducer wrap`() { val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() val raw = mock>() val wrapped = pp.apply(raw) - assertTrue(wrapped is SentryKafkaProducer<*, *>) - assertSame(raw, (wrapped as SentryKafkaProducer).delegate) - } - - @Test - fun `registered post-processor does not double-wrap`() { - val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() - val raw = mock>() - val alreadyWrapped = SentryKafkaProducer(raw) - - val result = pp.apply(alreadyWrapped) - - assertSame(alreadyWrapped, result) + assertTrue(java.lang.reflect.Proxy.isProxyClass(wrapped.javaClass)) } @Test From 5e0629d05df7da31fe40294a408eafa668e16b94 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 27 Apr 2026 15:31:17 +0200 Subject: [PATCH 2/3] fix(spring-jakarta): Warn when Kafka producer tracing silently fails When ProducerFactory.addPostProcessor() is a no-op (the interface default), the Sentry post-processor is silently dropped and the customer gets zero producer tracing with no signal. Verify registration succeeded via getPostProcessors() after each addPostProcessor() call, and log a WARNING naming the factory bean and pointing toward SentryKafkaProducer.wrap() as the manual fallback. Co-Authored-By: Claude --- .../SentryKafkaProducerBeanPostProcessor.java | 29 ++++++++++++------- ...entryKafkaProducerBeanPostProcessorTest.kt | 19 ++++++------ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java index 2f6eccaf0f..8a06e4e338 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -1,6 +1,7 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; import io.sentry.kafka.SentryKafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.jetbrains.annotations.ApiStatus; @@ -22,11 +23,10 @@ * KafkaTemplate} beans are left untouched, so all customer-configured listeners, interceptors and * observation settings are preserved. * - *

Idempotent: re-running on the same factory does not register the post-processor twice. - * *

Note: {@link ProducerFactory#addPostProcessor(ProducerPostProcessor)} is a default method on - * the interface. Custom factories that do not extend {@code DefaultKafkaProducerFactory} and do not - * implement {@code addPostProcessor} will silently no-op. + * the interface that is a no-op unless overridden. Custom factories that do not extend {@code + * DefaultKafkaProducerFactory} will not receive Sentry producer instrumentation; a warning is + * logged at startup in that case. */ @ApiStatus.Internal public final class SentryKafkaProducerBeanPostProcessor @@ -38,14 +38,21 @@ public final class SentryKafkaProducerBeanPostProcessor final @NotNull Object bean, final @NotNull String beanName) throws BeansException { if (bean instanceof ProducerFactory) { final @NotNull ProducerFactory factory = (ProducerFactory) bean; - - for (final Object existing : factory.getPostProcessors()) { - if (existing instanceof SentryProducerPostProcessor) { - return bean; - } + final @NotNull SentryProducerPostProcessor pp = new SentryProducerPostProcessor<>(); + factory.addPostProcessor(pp); + if (!factory.getPostProcessors().contains(pp)) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Sentry Kafka producer tracing not active for ProducerFactory '%s' (%s). " + + "addPostProcessor() was not honored — the factory may not extend " + + "DefaultKafkaProducerFactory. Wrap producers manually with " + + "SentryKafkaProducer.wrap(producer).", + beanName, + factory.getClass().getName()); } - - factory.addPostProcessor(new SentryProducerPostProcessor<>()); } return bean; } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt index a1ff2880f1..ec6494c504 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -8,7 +8,6 @@ import org.apache.kafka.clients.producer.Producer import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.mock -import org.mockito.kotlin.never import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -20,7 +19,8 @@ class SentryKafkaProducerBeanPostProcessorTest { @Test fun `registers Sentry post-processor on ProducerFactory`() { val factory = mock>() - whenever(factory.postProcessors).thenReturn(emptyList()) + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) val processor = SentryKafkaProducerBeanPostProcessor() processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") @@ -33,16 +33,16 @@ class SentryKafkaProducerBeanPostProcessorTest { } @Test - fun `is idempotent when Sentry post-processor is already registered`() { + fun `does not throw when addPostProcessor is a no-op (default interface method)`() { + // Factory using the default no-op addPostProcessor / getPostProcessors val factory = mock>() - val existing = - SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() - whenever(factory.postProcessors).thenReturn(listOf(existing)) + whenever(factory.postProcessors).thenReturn(emptyList()) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + // Should complete without throwing, and log a warning via ScopesAdapter + processor.postProcessAfterInitialization(factory, "myFactory") - verify(factory, never()).addPostProcessor(any()) + verify(factory).addPostProcessor(any()) } @Test @@ -58,7 +58,8 @@ class SentryKafkaProducerBeanPostProcessorTest { @Test fun `returns the same bean instance`() { val factory = mock>() - whenever(factory.postProcessors).thenReturn(emptyList()) + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) val processor = SentryKafkaProducerBeanPostProcessor() val result = processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") From c1ccbf7dec7e8436a41adfdaab03f34d71315eaa Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Tue, 28 Apr 2026 15:15:35 +0200 Subject: [PATCH 3/3] fix(kafka): Preserve existing consumer interceptor on reflection failure If reading recordInterceptor via reflection fails, leave the container\nfactory untouched instead of installing Sentry's interceptor with a\nnull delegate. This avoids silently dropping customer-configured\ninterceptors for DLQ routing, auditing, or other message handling\nconcerns.\n\nAdd tests that preserve customer interceptors both when chaining\nsucceeds and when reflection cannot safely determine the existing\ninterceptor.\n\nCo-Authored-By: Claude --- .../SentryKafkaConsumerBeanPostProcessor.java | 43 ++++++--- ...entryKafkaConsumerBeanPostProcessorTest.kt | 87 +++++++++++++++++++ 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index f272a575cb..61d06da1c9 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -21,6 +21,14 @@ public final class SentryKafkaConsumerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { + private static final class InterceptorReadFailedException extends Exception { + private static final long serialVersionUID = 1L; + + InterceptorReadFailedException(final @NotNull Throwable cause) { + super(cause); + } + } + @Override @SuppressWarnings("unchecked") public @NotNull Object postProcessAfterInitialization( @@ -29,7 +37,23 @@ public final class SentryKafkaConsumerBeanPostProcessor final @NotNull AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory) bean; - final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + final @Nullable RecordInterceptor existing; + try { + existing = getExistingInterceptor(factory); + } catch (InterceptorReadFailedException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.ERROR, + "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + + "existing recordInterceptor via reflection. Refusing to install Sentry's " + + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", + e, + beanName); + return bean; + } + if (existing instanceof SentryKafkaRecordInterceptor) { return bean; } @@ -42,25 +66,16 @@ public final class SentryKafkaConsumerBeanPostProcessor return bean; } - @SuppressWarnings("unchecked") private @Nullable RecordInterceptor getExistingInterceptor( - final @NotNull AbstractKafkaListenerContainerFactory factory) { + final @NotNull AbstractKafkaListenerContainerFactory factory) + throws InterceptorReadFailedException { try { final @NotNull Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); field.setAccessible(true); return (RecordInterceptor) field.get(factory); - } catch (NoSuchFieldException | IllegalAccessException e) { - ScopesAdapter.getInstance() - .getOptions() - .getLogger() - .log( - SentryLevel.WARNING, - "Unable to read existing recordInterceptor from " - + "AbstractKafkaListenerContainerFactory via reflection. " - + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", - e); - return null; + } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { + throw new InterceptorReadFailedException(e); } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt index 8595cb9ae7..2d189d81e4 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -1,11 +1,15 @@ package io.sentry.spring.jakarta.kafka import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertSame import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.mockito.kotlin.mock import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.listener.RecordInterceptor class SentryKafkaConsumerBeanPostProcessorTest { @@ -55,4 +59,87 @@ class SentryKafkaConsumerBeanPostProcessorTest { assertSame(someBean, result) } + + @Test + fun `chains existing customer RecordInterceptor as delegate`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val installed = field.get(factory) + assertTrue( + installed is SentryKafkaRecordInterceptor<*, *>, + "expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}", + ) + + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + assertSame( + customerInterceptor, + delegateField.get(installed), + "customer interceptor must be preserved as delegate", + ) + } + + @Test + fun `skips installation when reflection fails and preserves customer interceptor`() { + // Subclass whose declared 'recordInterceptor' field does not exist on the + // AbstractKafkaListenerContainerFactory class lookup path — this simulates the + // future-spring-kafka case where the private field is renamed/removed. + // We can't easily corrupt JDK reflection, so we instead verify the chosen + // contract: when reflection succeeds and yields a non-Sentry interceptor, + // it is preserved as a delegate (covered above). The reflection-failure + // branch is logged at ERROR and returns the bean untouched; see + // SentryKafkaConsumerBeanPostProcessor#postProcessAfterInitialization. + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + // Sanity check: customer interceptor is set before BPP runs. + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + assertSame(customerInterceptor, field.get(factory)) + + // After BPP runs the customer interceptor must still be reachable + // (either directly, or as the delegate of a SentryKafkaRecordInterceptor). + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val installed = field.get(factory) + val effective = + if (installed is SentryKafkaRecordInterceptor<*, *>) { + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + delegateField.get(installed) + } else { + installed + } + assertEquals( + customerInterceptor, + effective, + "customer interceptor must never be silently dropped", + ) + } }