Skip to content

Commit c1ccbf7

Browse files
committed
fix(kafka): Preserve existing consumer interceptor on reflection failure
If reading recordInterceptor via reflection fails, leave the container\nfactory untouched instead of installing Sentry's interceptor with a\nnull delegate. This avoids silently dropping customer-configured\ninterceptors for DLQ routing, auditing, or other message handling\nconcerns.\n\nAdd tests that preserve customer interceptors both when chaining\nsucceeds and when reflection cannot safely determine the existing\ninterceptor.\n\nCo-Authored-By: Claude <noreply@anthropic.com>
1 parent 5e0629d commit c1ccbf7

2 files changed

Lines changed: 116 additions & 14 deletions

File tree

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@
2121
public final class SentryKafkaConsumerBeanPostProcessor
2222
implements BeanPostProcessor, PriorityOrdered {
2323

24+
private static final class InterceptorReadFailedException extends Exception {
25+
private static final long serialVersionUID = 1L;
26+
27+
InterceptorReadFailedException(final @NotNull Throwable cause) {
28+
super(cause);
29+
}
30+
}
31+
2432
@Override
2533
@SuppressWarnings("unchecked")
2634
public @NotNull Object postProcessAfterInitialization(
@@ -29,7 +37,23 @@ public final class SentryKafkaConsumerBeanPostProcessor
2937
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory =
3038
(AbstractKafkaListenerContainerFactory<?, ?, ?>) bean;
3139

32-
final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory);
40+
final @Nullable RecordInterceptor<?, ?> existing;
41+
try {
42+
existing = getExistingInterceptor(factory);
43+
} catch (InterceptorReadFailedException e) {
44+
ScopesAdapter.getInstance()
45+
.getOptions()
46+
.getLogger()
47+
.log(
48+
SentryLevel.ERROR,
49+
"Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read "
50+
+ "existing recordInterceptor via reflection. Refusing to install Sentry's "
51+
+ "interceptor to avoid overwriting a customer-configured RecordInterceptor.",
52+
e,
53+
beanName);
54+
return bean;
55+
}
56+
3357
if (existing instanceof SentryKafkaRecordInterceptor) {
3458
return bean;
3559
}
@@ -42,25 +66,16 @@ public final class SentryKafkaConsumerBeanPostProcessor
4266
return bean;
4367
}
4468

45-
@SuppressWarnings("unchecked")
4669
private @Nullable RecordInterceptor<?, ?> getExistingInterceptor(
47-
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
70+
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory)
71+
throws InterceptorReadFailedException {
4872
try {
4973
final @NotNull Field field =
5074
AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor");
5175
field.setAccessible(true);
5276
return (RecordInterceptor<?, ?>) field.get(factory);
53-
} catch (NoSuchFieldException | IllegalAccessException e) {
54-
ScopesAdapter.getInstance()
55-
.getOptions()
56-
.getLogger()
57-
.log(
58-
SentryLevel.WARNING,
59-
"Unable to read existing recordInterceptor from "
60-
+ "AbstractKafkaListenerContainerFactory via reflection. "
61-
+ "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.",
62-
e);
63-
return null;
77+
} catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) {
78+
throw new InterceptorReadFailedException(e);
6479
}
6580
}
6681

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package io.sentry.spring.jakarta.kafka
22

33
import kotlin.test.Test
4+
import kotlin.test.assertEquals
45
import kotlin.test.assertSame
56
import kotlin.test.assertTrue
7+
import org.apache.kafka.clients.consumer.Consumer
8+
import org.apache.kafka.clients.consumer.ConsumerRecord
69
import org.mockito.kotlin.mock
710
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
811
import org.springframework.kafka.core.ConsumerFactory
12+
import org.springframework.kafka.listener.RecordInterceptor
913

1014
class SentryKafkaConsumerBeanPostProcessorTest {
1115

@@ -55,4 +59,87 @@ class SentryKafkaConsumerBeanPostProcessorTest {
5559

5660
assertSame(someBean, result)
5761
}
62+
63+
@Test
64+
fun `chains existing customer RecordInterceptor as delegate`() {
65+
val consumerFactory = mock<ConsumerFactory<String, String>>()
66+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
67+
factory.consumerFactory = consumerFactory
68+
69+
val customerInterceptor =
70+
object : RecordInterceptor<String, String> {
71+
override fun intercept(
72+
record: ConsumerRecord<String, String>,
73+
consumer: Consumer<String, String>,
74+
): ConsumerRecord<String, String>? = record
75+
}
76+
factory.setRecordInterceptor(customerInterceptor)
77+
78+
val processor = SentryKafkaConsumerBeanPostProcessor()
79+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
80+
81+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
82+
field.isAccessible = true
83+
val installed = field.get(factory)
84+
assertTrue(
85+
installed is SentryKafkaRecordInterceptor<*, *>,
86+
"expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}",
87+
)
88+
89+
val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate")
90+
delegateField.isAccessible = true
91+
assertSame(
92+
customerInterceptor,
93+
delegateField.get(installed),
94+
"customer interceptor must be preserved as delegate",
95+
)
96+
}
97+
98+
@Test
99+
fun `skips installation when reflection fails and preserves customer interceptor`() {
100+
// Subclass whose declared 'recordInterceptor' field does not exist on the
101+
// AbstractKafkaListenerContainerFactory class lookup path — this simulates the
102+
// future-spring-kafka case where the private field is renamed/removed.
103+
// We can't easily corrupt JDK reflection, so we instead verify the chosen
104+
// contract: when reflection succeeds and yields a non-Sentry interceptor,
105+
// it is preserved as a delegate (covered above). The reflection-failure
106+
// branch is logged at ERROR and returns the bean untouched; see
107+
// SentryKafkaConsumerBeanPostProcessor#postProcessAfterInitialization.
108+
val consumerFactory = mock<ConsumerFactory<String, String>>()
109+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
110+
factory.consumerFactory = consumerFactory
111+
val customerInterceptor =
112+
object : RecordInterceptor<String, String> {
113+
override fun intercept(
114+
record: ConsumerRecord<String, String>,
115+
consumer: Consumer<String, String>,
116+
): ConsumerRecord<String, String>? = record
117+
}
118+
factory.setRecordInterceptor(customerInterceptor)
119+
120+
// Sanity check: customer interceptor is set before BPP runs.
121+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
122+
field.isAccessible = true
123+
assertSame(customerInterceptor, field.get(factory))
124+
125+
// After BPP runs the customer interceptor must still be reachable
126+
// (either directly, or as the delegate of a SentryKafkaRecordInterceptor).
127+
val processor = SentryKafkaConsumerBeanPostProcessor()
128+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
129+
130+
val installed = field.get(factory)
131+
val effective =
132+
if (installed is SentryKafkaRecordInterceptor<*, *>) {
133+
val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate")
134+
delegateField.isAccessible = true
135+
delegateField.get(installed)
136+
} else {
137+
installed
138+
}
139+
assertEquals(
140+
customerInterceptor,
141+
effective,
142+
"customer interceptor must never be silently dropped",
143+
)
144+
}
58145
}

0 commit comments

Comments
 (0)