Skip to content

Commit 915e42b

Browse files
adinauerclaude
andcommitted
ref(spring-jakarta): Replace SentryKafkaProducerWrapper with SentryProducerInterceptor
Replace the KafkaTemplate subclass approach with a Kafka-native ProducerInterceptor. The BeanPostProcessor now sets the interceptor on the existing KafkaTemplate instead of replacing the bean, which preserves any custom configuration on the template. Existing customer interceptors are composed using Spring's CompositeProducerInterceptor. If reflection fails to read the existing interceptor, a warning is logged. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 5049ffc commit 915e42b

File tree

6 files changed

+272
-209
lines changed

6 files changed

+272
-209
lines changed

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,12 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostPro
250250
public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
251251
}
252252

253-
public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : org/springframework/kafka/core/KafkaTemplate {
254-
public fun <init> (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V
253+
public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
254+
public fun <init> (Lio/sentry/IScopes;)V
255+
public fun close ()V
256+
public fun configure (Ljava/util/Map;)V
257+
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
258+
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
255259
}
256260

257261
public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration {

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
package io.sentry.spring.jakarta.kafka;
22

33
import io.sentry.ScopesAdapter;
4+
import io.sentry.SentryLevel;
5+
import java.lang.reflect.Field;
6+
import org.apache.kafka.clients.producer.ProducerInterceptor;
47
import org.jetbrains.annotations.ApiStatus;
58
import org.jetbrains.annotations.NotNull;
9+
import org.jetbrains.annotations.Nullable;
610
import org.springframework.beans.BeansException;
711
import org.springframework.beans.factory.config.BeanPostProcessor;
812
import org.springframework.core.Ordered;
913
import org.springframework.core.PriorityOrdered;
1014
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.support.CompositeProducerInterceptor;
1116

12-
/** Wraps {@link KafkaTemplate} beans in {@link SentryKafkaProducerWrapper} for instrumentation. */
17+
/**
18+
* Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link
19+
* KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced.
20+
*
21+
* <p>If the template already has a {@link ProducerInterceptor}, both are composed using {@link
22+
* CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public
23+
* getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry
24+
* interceptor is set.
25+
*/
1326
@ApiStatus.Internal
1427
public final class SentryKafkaProducerBeanPostProcessor
1528
implements BeanPostProcessor, PriorityOrdered {
@@ -18,13 +31,50 @@ public final class SentryKafkaProducerBeanPostProcessor
1831
@SuppressWarnings("unchecked")
1932
public @NotNull Object postProcessAfterInitialization(
2033
final @NotNull Object bean, final @NotNull String beanName) throws BeansException {
21-
if (bean instanceof KafkaTemplate && !(bean instanceof SentryKafkaProducerWrapper)) {
22-
return new SentryKafkaProducerWrapper<>(
23-
(KafkaTemplate<?, ?>) bean, ScopesAdapter.getInstance());
34+
if (bean instanceof KafkaTemplate) {
35+
final @NotNull KafkaTemplate<?, ?> template = (KafkaTemplate<?, ?>) bean;
36+
final @Nullable ProducerInterceptor<?, ?> existing = getExistingInterceptor(template);
37+
38+
if (existing instanceof SentryProducerInterceptor) {
39+
return bean;
40+
}
41+
42+
@SuppressWarnings("rawtypes")
43+
final SentryProducerInterceptor sentryInterceptor =
44+
new SentryProducerInterceptor<>(ScopesAdapter.getInstance());
45+
46+
if (existing != null) {
47+
@SuppressWarnings("rawtypes")
48+
final CompositeProducerInterceptor composite =
49+
new CompositeProducerInterceptor(sentryInterceptor, existing);
50+
template.setProducerInterceptor(composite);
51+
} else {
52+
template.setProducerInterceptor(sentryInterceptor);
53+
}
2454
}
2555
return bean;
2656
}
2757

58+
@SuppressWarnings("unchecked")
59+
private @Nullable ProducerInterceptor<?, ?> getExistingInterceptor(
60+
final @NotNull KafkaTemplate<?, ?> template) {
61+
try {
62+
final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor");
63+
field.setAccessible(true);
64+
return (ProducerInterceptor<?, ?>) field.get(template);
65+
} catch (NoSuchFieldException | IllegalAccessException e) {
66+
ScopesAdapter.getInstance()
67+
.getOptions()
68+
.getLogger()
69+
.log(
70+
SentryLevel.WARNING,
71+
"Unable to read existing producerInterceptor from KafkaTemplate via reflection. "
72+
+ "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.",
73+
e);
74+
return null;
75+
}
76+
}
77+
2878
@Override
2979
public int getOrder() {
3080
return Ordered.LOWEST_PRECEDENCE;

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper.java renamed to sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.sentry.spring.jakarta.kafka;
22

3-
import io.micrometer.observation.Observation;
43
import io.sentry.BaggageHeader;
54
import io.sentry.IScopes;
65
import io.sentry.ISpan;
@@ -10,58 +9,55 @@
109
import io.sentry.SpanStatus;
1110
import io.sentry.util.TracingUtils;
1211
import java.nio.charset.StandardCharsets;
13-
import java.util.concurrent.CompletableFuture;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.producer.ProducerInterceptor;
1414
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.clients.producer.RecordMetadata;
1516
import org.apache.kafka.common.header.Headers;
1617
import org.jetbrains.annotations.ApiStatus;
1718
import org.jetbrains.annotations.NotNull;
1819
import org.jetbrains.annotations.Nullable;
19-
import org.springframework.kafka.core.KafkaTemplate;
20-
import org.springframework.kafka.support.SendResult;
2120

2221
/**
23-
* Wraps a {@link KafkaTemplate} to create {@code queue.publish} spans for Kafka send operations.
22+
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
23+
* headers into outgoing records.
2424
*
25-
* <p>Overrides {@code doSend} which is the common path for all send variants in {@link
26-
* KafkaTemplate}.
25+
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
26+
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
27+
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
28+
*
29+
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
30+
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
31+
* org.springframework.kafka.support.CompositeProducerInterceptor}.
2732
*/
2833
@ApiStatus.Internal
29-
public final class SentryKafkaProducerWrapper<K, V> extends KafkaTemplate<K, V> {
34+
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
3035

3136
static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
3237
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
3338

3439
private final @NotNull IScopes scopes;
3540

36-
public SentryKafkaProducerWrapper(
37-
final @NotNull KafkaTemplate<K, V> delegate, final @NotNull IScopes scopes) {
38-
super(delegate.getProducerFactory());
41+
public SentryProducerInterceptor(final @NotNull IScopes scopes) {
3942
this.scopes = scopes;
40-
this.setDefaultTopic(delegate.getDefaultTopic());
41-
if (delegate.isTransactional()) {
42-
this.setTransactionIdPrefix(delegate.getTransactionIdPrefix());
43-
}
44-
this.setMessageConverter(delegate.getMessageConverter());
45-
this.setMicrometerTagsProvider(delegate.getMicrometerTagsProvider());
4643
}
4744

4845
@Override
49-
protected @NotNull CompletableFuture<SendResult<K, V>> doSend(
50-
final @NotNull ProducerRecord<K, V> record, final @Nullable Observation observation) {
46+
public @NotNull ProducerRecord<K, V> onSend(final @NotNull ProducerRecord<K, V> record) {
5147
if (!scopes.getOptions().isEnableQueueTracing()) {
52-
return super.doSend(record, observation);
48+
return record;
5349
}
5450

5551
final @Nullable ISpan activeSpan = scopes.getSpan();
5652
if (activeSpan == null || activeSpan.isNoOp()) {
57-
return super.doSend(record, observation);
53+
return record;
5854
}
5955

6056
final @NotNull SpanOptions spanOptions = new SpanOptions();
6157
spanOptions.setOrigin(TRACE_ORIGIN);
6258
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
6359
if (span.isNoOp()) {
64-
return super.doSend(record, observation);
60+
return record;
6561
}
6662

6763
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
@@ -73,27 +69,22 @@ public SentryKafkaProducerWrapper(
7369
// Header injection must not break the send
7470
}
7571

76-
final @NotNull CompletableFuture<SendResult<K, V>> future;
77-
try {
78-
future = super.doSend(record, observation);
79-
return future.whenComplete(
80-
(result, throwable) -> {
81-
if (throwable != null) {
82-
span.setStatus(SpanStatus.INTERNAL_ERROR);
83-
span.setThrowable(throwable);
84-
} else {
85-
span.setStatus(SpanStatus.OK);
86-
}
87-
span.finish();
88-
});
89-
} catch (Throwable e) {
90-
span.setStatus(SpanStatus.INTERNAL_ERROR);
91-
span.setThrowable(e);
92-
span.finish();
93-
throw e;
94-
}
72+
span.setStatus(SpanStatus.OK);
73+
span.finish();
74+
75+
return record;
9576
}
9677

78+
@Override
79+
public void onAcknowledgement(
80+
final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {}
81+
82+
@Override
83+
public void close() {}
84+
85+
@Override
86+
public void configure(final @Nullable Map<String, ?> configs) {}
87+
9788
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
9889
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
9990
TracingUtils.trace(scopes, null, span);
Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,78 @@
11
package io.sentry.spring.jakarta.kafka
22

3-
import io.sentry.IScopes
43
import kotlin.test.Test
54
import kotlin.test.assertSame
65
import kotlin.test.assertTrue
6+
import org.apache.kafka.clients.producer.ProducerInterceptor
77
import org.mockito.kotlin.mock
8-
import org.mockito.kotlin.whenever
98
import org.springframework.kafka.core.KafkaTemplate
109
import org.springframework.kafka.core.ProducerFactory
10+
import org.springframework.kafka.support.CompositeProducerInterceptor
1111

1212
class SentryKafkaProducerBeanPostProcessorTest {
1313

14-
@Test
15-
fun `wraps KafkaTemplate beans in SentryKafkaProducerWrapper`() {
16-
val producerFactory = mock<ProducerFactory<String, String>>()
17-
val kafkaTemplate = mock<KafkaTemplate<String, String>>()
18-
whenever(kafkaTemplate.producerFactory).thenReturn(producerFactory)
19-
whenever(kafkaTemplate.defaultTopic).thenReturn("")
20-
whenever(kafkaTemplate.messageConverter).thenReturn(mock())
21-
whenever(kafkaTemplate.micrometerTagsProvider).thenReturn(null)
14+
private fun readInterceptor(template: KafkaTemplate<*, *>): Any? {
15+
val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor")
16+
field.isAccessible = true
17+
return field.get(template)
18+
}
2219

20+
@Test
21+
fun `sets SentryProducerInterceptor on KafkaTemplate`() {
22+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
2323
val processor = SentryKafkaProducerBeanPostProcessor()
24-
val result = processor.postProcessAfterInitialization(kafkaTemplate, "kafkaTemplate")
2524

26-
assertTrue(result is SentryKafkaProducerWrapper<*, *>)
25+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
26+
27+
assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>)
2728
}
2829

2930
@Test
30-
fun `does not double-wrap SentryKafkaProducerWrapper`() {
31-
val producerFactory = mock<ProducerFactory<String, String>>()
32-
val kafkaTemplate = mock<KafkaTemplate<String, String>>()
33-
whenever(kafkaTemplate.producerFactory).thenReturn(producerFactory)
34-
whenever(kafkaTemplate.defaultTopic).thenReturn("")
35-
whenever(kafkaTemplate.messageConverter).thenReturn(mock())
36-
whenever(kafkaTemplate.micrometerTagsProvider).thenReturn(null)
37-
38-
val scopes = mock<IScopes>()
39-
val alreadyWrapped = SentryKafkaProducerWrapper(kafkaTemplate, scopes)
31+
fun `does not double-wrap when SentryProducerInterceptor already set`() {
32+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
4033
val processor = SentryKafkaProducerBeanPostProcessor()
4134

42-
val result = processor.postProcessAfterInitialization(alreadyWrapped, "kafkaTemplate")
35+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
36+
val firstInterceptor = readInterceptor(template)
37+
38+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
39+
val secondInterceptor = readInterceptor(template)
4340

44-
assertSame(alreadyWrapped, result)
41+
assertSame(firstInterceptor, secondInterceptor)
4542
}
4643

4744
@Test
48-
fun `does not wrap non-KafkaTemplate beans`() {
45+
fun `does not modify non-KafkaTemplate beans`() {
4946
val someBean = "not a kafka template"
5047
val processor = SentryKafkaProducerBeanPostProcessor()
5148

5249
val result = processor.postProcessAfterInitialization(someBean, "someBean")
5350

5451
assertSame(someBean, result)
5552
}
53+
54+
@Test
55+
fun `returns the same bean instance`() {
56+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
57+
val processor = SentryKafkaProducerBeanPostProcessor()
58+
59+
val result = processor.postProcessAfterInitialization(template, "kafkaTemplate")
60+
61+
assertSame(template, result, "BPP should return the same bean, not a replacement")
62+
}
63+
64+
@Test
65+
fun `composes with existing customer interceptor using CompositeProducerInterceptor`() {
66+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
67+
val customerInterceptor = mock<ProducerInterceptor<String, String>>()
68+
template.setProducerInterceptor(customerInterceptor)
69+
70+
val processor = SentryKafkaProducerBeanPostProcessor()
71+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
72+
73+
assertTrue(
74+
readInterceptor(template) is CompositeProducerInterceptor<*, *>,
75+
"Should use CompositeProducerInterceptor when existing interceptor is present",
76+
)
77+
}
5678
}

0 commit comments

Comments
 (0)