Skip to content

Commit 7384727

Browse files
adinauerclaude
andcommitted
test(spring-jakarta): [Queue Instrumentation 30] Cover Kafka record interceptor lifecycle edge cases
Add three regression tests for SentryKafkaRecordInterceptor that pin down the lifecycle contract around clearThreadState cleanup: - full lifecycle intercept -> success -> clearThreadState closes the lifecycle token exactly once and does not double-finish the transaction - when a delegating interceptor returns null from intercept (filtering the record), the safety net in clearThreadState still finishes the transaction and closes the token - when a delegating interceptor throws from intercept, clearThreadState still finishes the transaction and closes the token after the exception has propagated Addresses review finding R6-F001. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 4a48e54 commit 7384727

1 file changed

Lines changed: 60 additions & 0 deletions

File tree

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import kotlin.test.AfterTest
1818
import kotlin.test.BeforeTest
1919
import kotlin.test.Test
2020
import kotlin.test.assertEquals
21+
import kotlin.test.assertFailsWith
2122
import kotlin.test.assertNull
2223
import kotlin.test.assertTrue
2324
import org.apache.kafka.clients.consumer.Consumer
@@ -27,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType
2728
import org.mockito.kotlin.any
2829
import org.mockito.kotlin.mock
2930
import org.mockito.kotlin.never
31+
import org.mockito.kotlin.times
3032
import org.mockito.kotlin.verify
3133
import org.mockito.kotlin.whenever
3234
import org.springframework.kafka.listener.RecordInterceptor
@@ -377,6 +379,64 @@ class SentryKafkaRecordInterceptorTest {
377379
verify(delegate).clearThreadState(consumer)
378380
}
379381

382+
@Test
383+
fun `full lifecycle intercept success clearThreadState closes token exactly once`() {
384+
val delegate = mock<RecordInterceptor<String, String>>()
385+
val record = createRecord()
386+
whenever(delegate.intercept(record, consumer)).thenReturn(record)
387+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
388+
389+
interceptor.setupThreadState(consumer)
390+
interceptor.intercept(record, consumer)
391+
interceptor.success(record, consumer)
392+
interceptor.clearThreadState(consumer)
393+
394+
// token closed once by success(); clearThreadState must not re-close it
395+
verify(lifecycleToken, times(1)).close()
396+
assertTrue(transaction.isFinished)
397+
// delegate hooks still delegated across the full lifecycle
398+
verify(delegate).setupThreadState(consumer)
399+
verify(delegate).success(record, consumer)
400+
verify(delegate).clearThreadState(consumer)
401+
}
402+
403+
@Test
404+
fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token`() {
405+
val delegate = mock<RecordInterceptor<String, String>>()
406+
val record = createRecord()
407+
// delegate filters the record — per Spring Kafka contract, success/failure will not be invoked
408+
whenever(delegate.intercept(record, consumer)).thenReturn(null)
409+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
410+
411+
interceptor.setupThreadState(consumer)
412+
val result = interceptor.intercept(record, consumer)
413+
interceptor.clearThreadState(consumer)
414+
415+
assertNull(result)
416+
verify(lifecycleToken, times(1)).close()
417+
assertTrue(transaction.isFinished)
418+
verify(delegate).clearThreadState(consumer)
419+
}
420+
421+
@Test
422+
fun `when delegate intercept throws clearThreadState still finishes transaction and closes token`() {
423+
val delegate = mock<RecordInterceptor<String, String>>()
424+
val record = createRecord()
425+
val boom = RuntimeException("delegate boom")
426+
whenever(delegate.intercept(record, consumer)).thenThrow(boom)
427+
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
428+
429+
interceptor.setupThreadState(consumer)
430+
val thrown = assertFailsWith<RuntimeException> { interceptor.intercept(record, consumer) }
431+
assertEquals(boom, thrown)
432+
433+
interceptor.clearThreadState(consumer)
434+
435+
verify(lifecycleToken, times(1)).close()
436+
assertTrue(transaction.isFinished)
437+
verify(delegate).clearThreadState(consumer)
438+
}
439+
380440
@Test
381441
fun `intercept cleans up stale context from previous record`() {
382442
val lifecycleToken2 = mock<ISentryLifecycleToken>()

0 commit comments

Comments
 (0)