Skip to content

Commit a4417eb

Browse files
authored
Merge pull request #5323 from getsentry/fix/queue-instrumentation-kafka-record-interceptor-thread-state
fix(spring-jakarta): [Queue Instrumentation 27] Delegate Kafka record thread-state hooks
2 parents 29f7f02 + 47b2d2f commit a4417eb

3 files changed

Lines changed: 61 additions & 1 deletion

File tree

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
263263
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
264264
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
265265
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
266+
public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
266267
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
267268
}
268269

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,22 @@ public void afterRecord(
106106
}
107107
}
108108

109+
@Override
110+
public void setupThreadState(final @NotNull Consumer<?, ?> consumer) {
111+
if (delegate != null) {
112+
delegate.setupThreadState(consumer);
113+
}
114+
}
115+
109116
@Override
110117
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
111-
finishStaleContext();
118+
try {
119+
finishStaleContext();
120+
} finally {
121+
if (delegate != null) {
122+
delegate.clearThreadState(consumer);
123+
}
124+
}
112125
}
113126

114127
private boolean isIgnored() {

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,52 @@ class SentryKafkaRecordInterceptorTest {
298298
interceptor.clearThreadState(consumer)
299299
}
300300

301+
@Test
302+
fun `setupThreadState delegates to existing interceptor`() {
303+
val delegate = mock<RecordInterceptor<String, String>>()
304+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
305+
306+
interceptor.setupThreadState(consumer)
307+
308+
verify(delegate).setupThreadState(consumer)
309+
}
310+
311+
@Test
312+
fun `setupThreadState is no-op without delegate`() {
313+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
314+
315+
// should not throw
316+
interceptor.setupThreadState(consumer)
317+
}
318+
319+
@Test
320+
fun `clearThreadState delegates to existing interceptor`() {
321+
val delegate = mock<RecordInterceptor<String, String>>()
322+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
323+
324+
interceptor.clearThreadState(consumer)
325+
326+
verify(delegate).clearThreadState(consumer)
327+
}
328+
329+
@Test
330+
fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() {
331+
val delegate = mock<RecordInterceptor<String, String>>()
332+
whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom"))
333+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
334+
val record = createRecord()
335+
336+
interceptor.intercept(record, consumer)
337+
338+
try {
339+
interceptor.clearThreadState(consumer)
340+
} catch (ignored: RuntimeException) {
341+
// expected
342+
}
343+
344+
verify(delegate).clearThreadState(consumer)
345+
}
346+
301347
@Test
302348
fun `intercept cleans up stale context from previous record`() {
303349
val lifecycleToken2 = mock<ISentryLifecycleToken>()

0 commit comments

Comments
 (0)