Skip to content

Commit 4fa767e

Browse files
committed
Merge branch 'feat/queue-instrumentation-e2e' into feat/queue-instrumentation-otel-samples
2 parents 10a5c63 + 5d297ac commit 4fa767e

File tree

9 files changed

+284
-211
lines changed

9 files changed

+284
-211
lines changed

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,12 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostPro
256256
public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
257257
}
258258

259-
public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : org/springframework/kafka/core/KafkaTemplate {
260-
public fun <init> (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V
259+
public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
260+
public fun <init> (Lio/sentry/IScopes;)V
261+
public fun close ()V
262+
public fun configure (Ljava/util/Map;)V
263+
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
264+
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
261265
}
262266

263267
public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor {

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.sentry.spring.jakarta.kafka;
22

33
import io.sentry.ScopesAdapter;
4+
import io.sentry.SentryLevel;
45
import java.lang.reflect.Field;
56
import org.jetbrains.annotations.ApiStatus;
67
import org.jetbrains.annotations.NotNull;
@@ -50,6 +51,15 @@ public final class SentryKafkaConsumerBeanPostProcessor
5051
field.setAccessible(true);
5152
return (RecordInterceptor<?, ?>) field.get(factory);
5253
} catch (NoSuchFieldException | IllegalAccessException e) {
54+
ScopesAdapter.getInstance()
55+
.getOptions()
56+
.getLogger()
57+
.log(
58+
SentryLevel.WARNING,
59+
"Unable to read existing recordInterceptor from "
60+
+ "AbstractKafkaListenerContainerFactory via reflection. "
61+
+ "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.",
62+
e);
5363
return null;
5464
}
5565
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
11
package io.sentry.spring.jakarta.kafka;
22

33
import io.sentry.ScopesAdapter;
4+
import io.sentry.SentryLevel;
5+
import java.lang.reflect.Field;
6+
import org.apache.kafka.clients.producer.ProducerInterceptor;
47
import org.jetbrains.annotations.ApiStatus;
58
import org.jetbrains.annotations.NotNull;
9+
import org.jetbrains.annotations.Nullable;
610
import org.springframework.beans.BeansException;
711
import org.springframework.beans.factory.config.BeanPostProcessor;
812
import org.springframework.core.Ordered;
913
import org.springframework.core.PriorityOrdered;
1014
import org.springframework.kafka.core.KafkaTemplate;
15+
import org.springframework.kafka.support.CompositeProducerInterceptor;
1116

12-
/** Wraps {@link KafkaTemplate} beans in {@link SentryKafkaProducerWrapper} for instrumentation. */
17+
/**
18+
* Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link
19+
* KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced.
20+
*
21+
* <p>If the template already has a {@link ProducerInterceptor}, both are composed using {@link
22+
* CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public
23+
* getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry
24+
* interceptor is set.
25+
*/
1326
@ApiStatus.Internal
1427
public final class SentryKafkaProducerBeanPostProcessor
1528
implements BeanPostProcessor, PriorityOrdered {
@@ -18,13 +31,50 @@ public final class SentryKafkaProducerBeanPostProcessor
1831
@SuppressWarnings("unchecked")
1932
public @NotNull Object postProcessAfterInitialization(
2033
final @NotNull Object bean, final @NotNull String beanName) throws BeansException {
21-
if (bean instanceof KafkaTemplate && !(bean instanceof SentryKafkaProducerWrapper)) {
22-
return new SentryKafkaProducerWrapper<>(
23-
(KafkaTemplate<?, ?>) bean, ScopesAdapter.getInstance());
34+
if (bean instanceof KafkaTemplate) {
35+
final @NotNull KafkaTemplate<?, ?> template = (KafkaTemplate<?, ?>) bean;
36+
final @Nullable ProducerInterceptor<?, ?> existing = getExistingInterceptor(template);
37+
38+
if (existing instanceof SentryProducerInterceptor) {
39+
return bean;
40+
}
41+
42+
@SuppressWarnings("rawtypes")
43+
final SentryProducerInterceptor sentryInterceptor =
44+
new SentryProducerInterceptor<>(ScopesAdapter.getInstance());
45+
46+
if (existing != null) {
47+
@SuppressWarnings("rawtypes")
48+
final CompositeProducerInterceptor composite =
49+
new CompositeProducerInterceptor(sentryInterceptor, existing);
50+
template.setProducerInterceptor(composite);
51+
} else {
52+
template.setProducerInterceptor(sentryInterceptor);
53+
}
2454
}
2555
return bean;
2656
}
2757

58+
@SuppressWarnings("unchecked")
59+
private @Nullable ProducerInterceptor<?, ?> getExistingInterceptor(
60+
final @NotNull KafkaTemplate<?, ?> template) {
61+
try {
62+
final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor");
63+
field.setAccessible(true);
64+
return (ProducerInterceptor<?, ?>) field.get(template);
65+
} catch (NoSuchFieldException | IllegalAccessException e) {
66+
ScopesAdapter.getInstance()
67+
.getOptions()
68+
.getLogger()
69+
.log(
70+
SentryLevel.WARNING,
71+
"Unable to read existing producerInterceptor from KafkaTemplate via reflection. "
72+
+ "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.",
73+
e);
74+
return null;
75+
}
76+
}
77+
2878
@Override
2979
public int getOrder() {
3080
return Ordered.LOWEST_PRECEDENCE;

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void afterRecord(
149149
}
150150

151151
final @Nullable String enqueuedTimeStr =
152-
headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER);
152+
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
153153
if (enqueuedTimeStr != null) {
154154
try {
155155
final long enqueuedTime = Long.parseLong(enqueuedTimeStr);

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper.java renamed to sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.sentry.spring.jakarta.kafka;
22

3-
import io.micrometer.observation.Observation;
43
import io.sentry.BaggageHeader;
54
import io.sentry.IScopes;
65
import io.sentry.ISpan;
@@ -10,58 +9,55 @@
109
import io.sentry.SpanStatus;
1110
import io.sentry.util.TracingUtils;
1211
import java.nio.charset.StandardCharsets;
13-
import java.util.concurrent.CompletableFuture;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.producer.ProducerInterceptor;
1414
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.clients.producer.RecordMetadata;
1516
import org.apache.kafka.common.header.Headers;
1617
import org.jetbrains.annotations.ApiStatus;
1718
import org.jetbrains.annotations.NotNull;
1819
import org.jetbrains.annotations.Nullable;
19-
import org.springframework.kafka.core.KafkaTemplate;
20-
import org.springframework.kafka.support.SendResult;
2120

2221
/**
23-
* Wraps a {@link KafkaTemplate} to create {@code queue.publish} spans for Kafka send operations.
22+
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
23+
* headers into outgoing records.
2424
*
25-
* <p>Overrides {@code doSend} which is the common path for all send variants in {@link
26-
* KafkaTemplate}.
25+
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
26+
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
27+
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
28+
*
29+
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
30+
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
31+
* org.springframework.kafka.support.CompositeProducerInterceptor}.
2732
*/
2833
@ApiStatus.Internal
29-
public final class SentryKafkaProducerWrapper<K, V> extends KafkaTemplate<K, V> {
34+
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
3035

3136
static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
3237
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
3338

3439
private final @NotNull IScopes scopes;
3540

36-
public SentryKafkaProducerWrapper(
37-
final @NotNull KafkaTemplate<K, V> delegate, final @NotNull IScopes scopes) {
38-
super(delegate.getProducerFactory());
41+
public SentryProducerInterceptor(final @NotNull IScopes scopes) {
3942
this.scopes = scopes;
40-
this.setDefaultTopic(delegate.getDefaultTopic());
41-
if (delegate.isTransactional()) {
42-
this.setTransactionIdPrefix(delegate.getTransactionIdPrefix());
43-
}
44-
this.setMessageConverter(delegate.getMessageConverter());
45-
this.setMicrometerTagsProvider(delegate.getMicrometerTagsProvider());
4643
}
4744

4845
@Override
49-
protected @NotNull CompletableFuture<SendResult<K, V>> doSend(
50-
final @NotNull ProducerRecord<K, V> record, final @Nullable Observation observation) {
46+
public @NotNull ProducerRecord<K, V> onSend(final @NotNull ProducerRecord<K, V> record) {
5147
if (!scopes.getOptions().isEnableQueueTracing()) {
52-
return super.doSend(record, observation);
48+
return record;
5349
}
5450

5551
final @Nullable ISpan activeSpan = scopes.getSpan();
5652
if (activeSpan == null || activeSpan.isNoOp()) {
57-
return super.doSend(record, observation);
53+
return record;
5854
}
5955

6056
final @NotNull SpanOptions spanOptions = new SpanOptions();
6157
spanOptions.setOrigin(TRACE_ORIGIN);
6258
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
6359
if (span.isNoOp()) {
64-
return super.doSend(record, observation);
60+
return record;
6561
}
6662

6763
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
@@ -73,27 +69,22 @@ public SentryKafkaProducerWrapper(
7369
// Header injection must not break the send
7470
}
7571

76-
final @NotNull CompletableFuture<SendResult<K, V>> future;
77-
try {
78-
future = super.doSend(record, observation);
79-
return future.whenComplete(
80-
(result, throwable) -> {
81-
if (throwable != null) {
82-
span.setStatus(SpanStatus.INTERNAL_ERROR);
83-
span.setThrowable(throwable);
84-
} else {
85-
span.setStatus(SpanStatus.OK);
86-
}
87-
span.finish();
88-
});
89-
} catch (Throwable e) {
90-
span.setStatus(SpanStatus.INTERNAL_ERROR);
91-
span.setThrowable(e);
92-
span.finish();
93-
throw e;
94-
}
72+
span.setStatus(SpanStatus.OK);
73+
span.finish();
74+
75+
return record;
9576
}
9677

78+
@Override
79+
public void onAcknowledgement(
80+
final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {}
81+
82+
@Override
83+
public void close() {}
84+
85+
@Override
86+
public void configure(final @Nullable Map<String, ?> configs) {}
87+
9788
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
9889
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
9990
TracingUtils.trace(scopes, null, span);
Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,78 @@
11
package io.sentry.spring.jakarta.kafka
22

3-
import io.sentry.IScopes
43
import kotlin.test.Test
54
import kotlin.test.assertSame
65
import kotlin.test.assertTrue
6+
import org.apache.kafka.clients.producer.ProducerInterceptor
77
import org.mockito.kotlin.mock
8-
import org.mockito.kotlin.whenever
98
import org.springframework.kafka.core.KafkaTemplate
109
import org.springframework.kafka.core.ProducerFactory
10+
import org.springframework.kafka.support.CompositeProducerInterceptor
1111

1212
class SentryKafkaProducerBeanPostProcessorTest {
1313

14-
@Test
15-
fun `wraps KafkaTemplate beans in SentryKafkaProducerWrapper`() {
16-
val producerFactory = mock<ProducerFactory<String, String>>()
17-
val kafkaTemplate = mock<KafkaTemplate<String, String>>()
18-
whenever(kafkaTemplate.producerFactory).thenReturn(producerFactory)
19-
whenever(kafkaTemplate.defaultTopic).thenReturn("")
20-
whenever(kafkaTemplate.messageConverter).thenReturn(mock())
21-
whenever(kafkaTemplate.micrometerTagsProvider).thenReturn(null)
14+
private fun readInterceptor(template: KafkaTemplate<*, *>): Any? {
15+
val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor")
16+
field.isAccessible = true
17+
return field.get(template)
18+
}
2219

20+
@Test
21+
fun `sets SentryProducerInterceptor on KafkaTemplate`() {
22+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
2323
val processor = SentryKafkaProducerBeanPostProcessor()
24-
val result = processor.postProcessAfterInitialization(kafkaTemplate, "kafkaTemplate")
2524

26-
assertTrue(result is SentryKafkaProducerWrapper<*, *>)
25+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
26+
27+
assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>)
2728
}
2829

2930
@Test
30-
fun `does not double-wrap SentryKafkaProducerWrapper`() {
31-
val producerFactory = mock<ProducerFactory<String, String>>()
32-
val kafkaTemplate = mock<KafkaTemplate<String, String>>()
33-
whenever(kafkaTemplate.producerFactory).thenReturn(producerFactory)
34-
whenever(kafkaTemplate.defaultTopic).thenReturn("")
35-
whenever(kafkaTemplate.messageConverter).thenReturn(mock())
36-
whenever(kafkaTemplate.micrometerTagsProvider).thenReturn(null)
37-
38-
val scopes = mock<IScopes>()
39-
val alreadyWrapped = SentryKafkaProducerWrapper(kafkaTemplate, scopes)
31+
fun `does not double-wrap when SentryProducerInterceptor already set`() {
32+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
4033
val processor = SentryKafkaProducerBeanPostProcessor()
4134

42-
val result = processor.postProcessAfterInitialization(alreadyWrapped, "kafkaTemplate")
35+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
36+
val firstInterceptor = readInterceptor(template)
37+
38+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
39+
val secondInterceptor = readInterceptor(template)
4340

44-
assertSame(alreadyWrapped, result)
41+
assertSame(firstInterceptor, secondInterceptor)
4542
}
4643

4744
@Test
48-
fun `does not wrap non-KafkaTemplate beans`() {
45+
fun `does not modify non-KafkaTemplate beans`() {
4946
val someBean = "not a kafka template"
5047
val processor = SentryKafkaProducerBeanPostProcessor()
5148

5249
val result = processor.postProcessAfterInitialization(someBean, "someBean")
5350

5451
assertSame(someBean, result)
5552
}
53+
54+
@Test
55+
fun `returns the same bean instance`() {
56+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
57+
val processor = SentryKafkaProducerBeanPostProcessor()
58+
59+
val result = processor.postProcessAfterInitialization(template, "kafkaTemplate")
60+
61+
assertSame(template, result, "BPP should return the same bean, not a replacement")
62+
}
63+
64+
@Test
65+
fun `composes with existing customer interceptor using CompositeProducerInterceptor`() {
66+
val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>())
67+
val customerInterceptor = mock<ProducerInterceptor<String, String>>()
68+
template.setProducerInterceptor(customerInterceptor)
69+
70+
val processor = SentryKafkaProducerBeanPostProcessor()
71+
processor.postProcessAfterInitialization(template, "kafkaTemplate")
72+
73+
assertTrue(
74+
readInterceptor(template) is CompositeProducerInterceptor<*, *>,
75+
"Should use CompositeProducerInterceptor when existing interceptor is present",
76+
)
77+
}
5678
}

0 commit comments

Comments
 (0)