From 320e80598309c4aa0937a36f7fd2b9a5ce03d34b Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Fri, 10 Apr 2026 12:23:20 +0200 Subject: [PATCH] fix(spring-jakarta): Fork root scopes and skip when OTel is active in Kafka consumer interceptor Use Sentry.forkedRootScopes() instead of scopes.forkedScopes() so each Kafka message starts with a clean scope from root, matching the pattern used by SentryWebFilter for reactive request boundaries. Add isIgnored() check using SpanUtils.isIgnored() on the trace origin so the interceptor no-ops when OTel is active and the origin is in the ignored span origins list, consistent with SentryTracingFilter. Co-Authored-By: Claude --- .../kafka/SentryKafkaRecordInterceptor.java | 10 +- .../kafka/SentryKafkaRecordInterceptorTest.kt | 95 +++++++++---------- 2 files changed, 54 insertions(+), 51 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index d11f7f8a67..e07f86fa26 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -4,11 +4,13 @@ import io.sentry.IScopes; import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; +import io.sentry.Sentry; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanStatus; import io.sentry.TransactionContext; import io.sentry.TransactionOptions; +import io.sentry.util.SpanUtils; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -48,13 +50,13 @@ public SentryKafkaRecordInterceptor( @Override public @Nullable ConsumerRecord intercept( final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { - if (!scopes.getOptions().isEnableQueueTracing()) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { return delegateIntercept(record, consumer); } finishStaleContext(); - final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); + final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); @@ -105,6 +107,10 @@ public void clearThreadState(final @NotNull Consumer consumer) { finishStaleContext(); } + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + private @Nullable ConsumerRecord delegateIntercept( final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { if (delegate != null) { diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 0688af70db..206a43298e 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -17,6 +17,7 @@ import kotlin.test.assertEquals import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders +import org.mockito.Mockito import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never @@ -27,6 +28,7 @@ import org.springframework.kafka.listener.RecordInterceptor class SentryKafkaRecordInterceptorTest { private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes private lateinit var options: SentryOptions private lateinit var consumer: Consumer private lateinit var lifecycleToken: ISentryLifecycleToken @@ -46,10 +48,9 @@ class SentryKafkaRecordInterceptorTest { whenever(scopes.options).thenReturn(options) whenever(scopes.isEnabled).thenReturn(true) - val forkedScopes = mock() + forkedScopes = mock() whenever(forkedScopes.options).thenReturn(options) whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) - whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) @@ -60,6 +61,13 @@ class SentryKafkaRecordInterceptorTest { Sentry.close() } + private fun withMockSentry(closure: () -> T): T = + Mockito.mockStatic(Sentry::class.java).use { + it.`when` { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes) + it.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) + closure.invoke() + } + private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(), @@ -93,30 +101,22 @@ class SentryKafkaRecordInterceptorTest { } @Test - fun `intercept creates forked scopes`() { + fun `intercept forks root scopes`() { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } - verify(scopes).forkedScopes("SentryKafkaRecordInterceptor") + verify(forkedScopes).makeCurrent() } @Test fun `intercept continues trace from headers`() { - val forkedScopes = mock() - whenever(forkedScopes.options).thenReturn(options) - whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) - whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) - - val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) - whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) - val interceptor = SentryKafkaRecordInterceptor(scopes) val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } verify(forkedScopes) .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) @@ -124,18 +124,10 @@ class SentryKafkaRecordInterceptorTest { @Test fun `intercept calls continueTrace with null when no headers`() { - val forkedScopes = mock() - whenever(forkedScopes.options).thenReturn(options) - whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) - whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) - - val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) - whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) - val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } @@ -148,7 +140,19 @@ class SentryKafkaRecordInterceptorTest { val result = interceptor.intercept(record, consumer) - verify(scopes, never()).forkedScopes(any()) + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `does not create span when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(forkedScopes, never()).makeCurrent() assertEquals(record, result) } @@ -159,7 +163,7 @@ class SentryKafkaRecordInterceptorTest { whenever(delegate.intercept(record, consumer)).thenReturn(record) val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } verify(delegate).intercept(record, consumer) } @@ -170,8 +174,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) val record = createRecord() - // intercept first to set up context - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } interceptor.success(record, consumer) verify(delegate).success(record, consumer) @@ -184,7 +187,7 @@ class SentryKafkaRecordInterceptorTest { val record = createRecord() val exception = RuntimeException("processing failed") - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } interceptor.failure(record, exception, consumer) verify(delegate).failure(record, exception, consumer) @@ -214,13 +217,10 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - // intercept sets up context in ThreadLocal - interceptor.intercept(record, consumer) + withMockSentry { interceptor.intercept(record, consumer) } - // clearThreadState should clean up without success/failure being called interceptor.clearThreadState(consumer) - // lifecycle token should have been closed verify(lifecycleToken).close() } @@ -242,28 +242,25 @@ class SentryKafkaRecordInterceptorTest { whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) var callCount = 0 - whenever(scopes.forkedScopes(any())).thenAnswer { - callCount++ - if (callCount == 1) { - val forkedScopes1 = mock() - whenever(forkedScopes1.options).thenReturn(options) - whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken) - val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1) - whenever(forkedScopes1.startTransaction(any(), any())).thenReturn(tx1) - forkedScopes1 - } else { - forkedScopes2 - } - } val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - // First intercept sets up context - interceptor.intercept(record, consumer) + Mockito.mockStatic(Sentry::class.java).use { mockSentry -> + mockSentry.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) + mockSentry + .`when` { Sentry.forkedRootScopes(any()) } + .thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } + + // First intercept sets up context + interceptor.intercept(record, consumer) - // Second intercept without success/failure — should clean up stale context first - interceptor.intercept(record, consumer) + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + } // First lifecycle token should have been closed by the defensive cleanup verify(lifecycleToken).close()