Skip to content

Commit 47b2d2f

Browse files
adinauerclaude
andcommitted
fix(spring-jakarta): [Queue Instrumentation 27] Delegate Kafka record thread-state hooks
SentryKafkaRecordInterceptor wraps an existing customer RecordInterceptor when one is present on the listener container factory, but it previously only delegated intercept, success, failure, and afterRecord. setupThreadState was not overridden, so the default no-op from ThreadStateProcessor shadowed any delegate implementation. clearThreadState performed Sentry cleanup but never forwarded to the delegate either. Customers relying on these hooks for MDC, security context, or other thread-local state on Kafka listener threads would silently lose that behavior once Sentry auto-wrapped their interceptor. Delegate setupThreadState to the wrapped interceptor, and in clearThreadState run Sentry cleanup inside try and delegate to the wrapped interceptor in finally so delegate cleanup still executes if Sentry cleanup throws. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 08e6da2 commit 47b2d2f

3 files changed

Lines changed: 61 additions & 1 deletion

File tree

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
263263
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
264264
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
265265
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
266+
public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
266267
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
267268
}
268269

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,22 @@ public void afterRecord(
106106
}
107107
}
108108

109+
@Override
110+
public void setupThreadState(final @NotNull Consumer<?, ?> consumer) {
111+
if (delegate != null) {
112+
delegate.setupThreadState(consumer);
113+
}
114+
}
115+
109116
@Override
110117
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
111-
finishStaleContext();
118+
try {
119+
finishStaleContext();
120+
} finally {
121+
if (delegate != null) {
122+
delegate.clearThreadState(consumer);
123+
}
124+
}
112125
}
113126

114127
private boolean isIgnored() {

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,52 @@ class SentryKafkaRecordInterceptorTest {
298298
interceptor.clearThreadState(consumer)
299299
}
300300

301+
@Test
302+
fun `setupThreadState delegates to existing interceptor`() {
303+
val delegate = mock<RecordInterceptor<String, String>>()
304+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
305+
306+
interceptor.setupThreadState(consumer)
307+
308+
verify(delegate).setupThreadState(consumer)
309+
}
310+
311+
@Test
312+
fun `setupThreadState is no-op without delegate`() {
313+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
314+
315+
// should not throw
316+
interceptor.setupThreadState(consumer)
317+
}
318+
319+
@Test
320+
fun `clearThreadState delegates to existing interceptor`() {
321+
val delegate = mock<RecordInterceptor<String, String>>()
322+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
323+
324+
interceptor.clearThreadState(consumer)
325+
326+
verify(delegate).clearThreadState(consumer)
327+
}
328+
329+
@Test
330+
fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() {
331+
val delegate = mock<RecordInterceptor<String, String>>()
332+
whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom"))
333+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
334+
val record = createRecord()
335+
336+
interceptor.intercept(record, consumer)
337+
338+
try {
339+
interceptor.clearThreadState(consumer)
340+
} catch (ignored: RuntimeException) {
341+
// expected
342+
}
343+
344+
verify(delegate).clearThreadState(consumer)
345+
}
346+
301347
@Test
302348
fun `intercept cleans up stale context from previous record`() {
303349
val lifecycleToken2 = mock<ISentryLifecycleToken>()

0 commit comments

Comments
 (0)