@@ -18,6 +18,7 @@ import kotlin.test.AfterTest
1818import kotlin.test.BeforeTest
1919import kotlin.test.Test
2020import kotlin.test.assertEquals
21+ import kotlin.test.assertFailsWith
2122import kotlin.test.assertNull
2223import kotlin.test.assertTrue
2324import org.apache.kafka.clients.consumer.Consumer
@@ -27,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType
2728import org.mockito.kotlin.any
2829import org.mockito.kotlin.mock
2930import org.mockito.kotlin.never
31+ import org.mockito.kotlin.times
3032import org.mockito.kotlin.verify
3133import org.mockito.kotlin.whenever
3234import org.springframework.kafka.listener.RecordInterceptor
@@ -377,6 +379,64 @@ class SentryKafkaRecordInterceptorTest {
377379 verify(delegate).clearThreadState(consumer)
378380 }
379381
382+ @Test
383+ fun `full lifecycle intercept success clearThreadState closes token exactly once` () {
384+ val delegate = mock<RecordInterceptor <String , String >>()
385+ val record = createRecord()
386+ whenever(delegate.intercept(record, consumer)).thenReturn(record)
387+ val interceptor = SentryKafkaRecordInterceptor (scopes, delegate)
388+
389+ interceptor.setupThreadState(consumer)
390+ interceptor.intercept(record, consumer)
391+ interceptor.success(record, consumer)
392+ interceptor.clearThreadState(consumer)
393+
394+ // token closed once by success(); clearThreadState must not re-close it
395+ verify(lifecycleToken, times(1 )).close()
396+ assertTrue(transaction.isFinished)
397+ // delegate hooks still delegated across the full lifecycle
398+ verify(delegate).setupThreadState(consumer)
399+ verify(delegate).success(record, consumer)
400+ verify(delegate).clearThreadState(consumer)
401+ }
402+
403+ @Test
404+ fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token` () {
405+ val delegate = mock<RecordInterceptor <String , String >>()
406+ val record = createRecord()
407+ // delegate filters the record — per Spring Kafka contract, success/failure will not be invoked
408+ whenever(delegate.intercept(record, consumer)).thenReturn(null )
409+ val interceptor = SentryKafkaRecordInterceptor (scopes, delegate)
410+
411+ interceptor.setupThreadState(consumer)
412+ val result = interceptor.intercept(record, consumer)
413+ interceptor.clearThreadState(consumer)
414+
415+ assertNull(result)
416+ verify(lifecycleToken, times(1 )).close()
417+ assertTrue(transaction.isFinished)
418+ verify(delegate).clearThreadState(consumer)
419+ }
420+
421+ @Test
422+ fun `when delegate intercept throws clearThreadState still finishes transaction and closes token` () {
423+ val delegate = mock<RecordInterceptor <String , String >>()
424+ val record = createRecord()
425+ val boom = RuntimeException (" delegate boom" )
426+ whenever(delegate.intercept(record, consumer)).thenThrow(boom)
427+ val interceptor = SentryKafkaRecordInterceptor (scopes, delegate)
428+
429+ interceptor.setupThreadState(consumer)
430+ val thrown = assertFailsWith<RuntimeException > { interceptor.intercept(record, consumer) }
431+ assertEquals(boom, thrown)
432+
433+ interceptor.clearThreadState(consumer)
434+
435+ verify(lifecycleToken, times(1 )).close()
436+ assertTrue(transaction.isFinished)
437+ verify(delegate).clearThreadState(consumer)
438+ }
439+
380440 @Test
381441 fun `intercept cleans up stale context from previous record` () {
382442 val lifecycleToken2 = mock<ISentryLifecycleToken >()
0 commit comments