Skip to content

Commit 97d82f3

Browse files
adinauerclaude
andcommitted
ref(spring): Use injected scopes in Kafka interceptor
Stop the Spring Kafka record interceptor from reaching through the static Sentry API when forking root scopes. This keeps the raw Kafka and Spring Kafka paths aligned and makes the interceptor easier to test. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent d2f4d8c commit 97d82f3

2 files changed

Lines changed: 23 additions & 33 deletions

File tree

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.sentry.IScopes;
66
import io.sentry.ISentryLifecycleToken;
77
import io.sentry.ITransaction;
8-
import io.sentry.Sentry;
98
import io.sentry.SentryTraceHeader;
109
import io.sentry.SpanDataConvention;
1110
import io.sentry.SpanStatus;
@@ -60,7 +59,7 @@ public SentryKafkaRecordInterceptor(
6059

6160
finishStaleContext();
6261

63-
final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor");
62+
final @NotNull IScopes forkedScopes = scopes.forkedRootScopes("SentryKafkaRecordInterceptor");
6463
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
6564

6665
final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record);

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import kotlin.test.assertTrue
2222
import org.apache.kafka.clients.consumer.Consumer
2323
import org.apache.kafka.clients.consumer.ConsumerRecord
2424
import org.apache.kafka.common.header.internals.RecordHeaders
25-
import org.mockito.Mockito
2625
import org.mockito.kotlin.any
2726
import org.mockito.kotlin.mock
2827
import org.mockito.kotlin.never
@@ -56,6 +55,7 @@ class SentryKafkaRecordInterceptorTest {
5655
whenever(scopes.isEnabled).thenReturn(true)
5756

5857
forkedScopes = mock()
58+
whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes)
5959
whenever(forkedScopes.options).thenReturn(options)
6060
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
6161

@@ -69,13 +69,6 @@ class SentryKafkaRecordInterceptorTest {
6969
Sentry.close()
7070
}
7171

72-
private fun <T> withMockSentry(closure: () -> T): T =
73-
Mockito.mockStatic(Sentry::class.java).use {
74-
it.`when`<Any> { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes)
75-
it.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
76-
closure.invoke()
77-
}
78-
7972
private fun createRecord(
8073
topic: String = "my-topic",
8174
headers: RecordHeaders = RecordHeaders(),
@@ -120,8 +113,9 @@ class SentryKafkaRecordInterceptorTest {
120113
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
121114
val record = createRecord()
122115

123-
withMockSentry { interceptor.intercept(record, consumer) }
116+
interceptor.intercept(record, consumer)
124117

118+
verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor")
125119
verify(forkedScopes).makeCurrent()
126120
}
127121

@@ -131,7 +125,7 @@ class SentryKafkaRecordInterceptorTest {
131125
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
132126
val record = createRecordWithHeaders(sentryTrace = sentryTraceValue)
133127

134-
withMockSentry { interceptor.intercept(record, consumer) }
128+
interceptor.intercept(record, consumer)
135129

136130
verify(forkedScopes)
137131
.continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull())
@@ -142,7 +136,7 @@ class SentryKafkaRecordInterceptorTest {
142136
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
143137
val record = createRecord()
144138

145-
withMockSentry { interceptor.intercept(record, consumer) }
139+
interceptor.intercept(record, consumer)
146140

147141
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
148142
}
@@ -152,7 +146,7 @@ class SentryKafkaRecordInterceptorTest {
152146
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
153147
val record = createRecordWithHeaders(deliveryAttempt = 3)
154148

155-
withMockSentry { interceptor.intercept(record, consumer) }
149+
interceptor.intercept(record, consumer)
156150

157151
assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
158152
}
@@ -162,7 +156,7 @@ class SentryKafkaRecordInterceptorTest {
162156
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
163157
val record = createRecord()
164158

165-
withMockSentry { interceptor.intercept(record, consumer) }
159+
interceptor.intercept(record, consumer)
166160

167161
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
168162
}
@@ -173,7 +167,7 @@ class SentryKafkaRecordInterceptorTest {
173167
val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString()
174168
val record = createRecordWithHeaders(enqueuedTime = enqueuedTime)
175169

176-
withMockSentry { interceptor.intercept(record, consumer) }
170+
interceptor.intercept(record, consumer)
177171

178172
val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY)
179173
assertTrue(latency is Long && latency >= 0)
@@ -187,6 +181,7 @@ class SentryKafkaRecordInterceptorTest {
187181

188182
val result = interceptor.intercept(record, consumer)
189183

184+
verify(scopes, never()).forkedRootScopes(any())
190185
verify(forkedScopes, never()).makeCurrent()
191186
assertEquals(record, result)
192187
}
@@ -199,6 +194,7 @@ class SentryKafkaRecordInterceptorTest {
199194

200195
val result = interceptor.intercept(record, consumer)
201196

197+
verify(scopes, never()).forkedRootScopes(any())
202198
verify(forkedScopes, never()).makeCurrent()
203199
assertEquals(record, result)
204200
}
@@ -210,7 +206,7 @@ class SentryKafkaRecordInterceptorTest {
210206
whenever(delegate.intercept(record, consumer)).thenReturn(record)
211207

212208
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
213-
withMockSentry { interceptor.intercept(record, consumer) }
209+
interceptor.intercept(record, consumer)
214210

215211
verify(delegate).intercept(record, consumer)
216212
}
@@ -221,7 +217,7 @@ class SentryKafkaRecordInterceptorTest {
221217
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
222218
val record = createRecord()
223219

224-
withMockSentry { interceptor.intercept(record, consumer) }
220+
interceptor.intercept(record, consumer)
225221
interceptor.success(record, consumer)
226222

227223
verify(delegate).success(record, consumer)
@@ -234,7 +230,7 @@ class SentryKafkaRecordInterceptorTest {
234230
val record = createRecord()
235231
val exception = RuntimeException("processing failed")
236232

237-
withMockSentry { interceptor.intercept(record, consumer) }
233+
interceptor.intercept(record, consumer)
238234
interceptor.failure(record, exception, consumer)
239235

240236
verify(delegate).failure(record, exception, consumer)
@@ -264,7 +260,7 @@ class SentryKafkaRecordInterceptorTest {
264260
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
265261
val record = createRecord()
266262

267-
withMockSentry { interceptor.intercept(record, consumer) }
263+
interceptor.intercept(record, consumer)
268264

269265
interceptor.clearThreadState(consumer)
270266

@@ -293,21 +289,16 @@ class SentryKafkaRecordInterceptorTest {
293289
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
294290
val record = createRecord()
295291

296-
Mockito.mockStatic(Sentry::class.java).use { mockSentry ->
297-
mockSentry.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
298-
mockSentry
299-
.`when`<Any> { Sentry.forkedRootScopes(any()) }
300-
.thenAnswer {
301-
callCount++
302-
if (callCount == 1) forkedScopes else forkedScopes2
303-
}
292+
whenever(scopes.forkedRootScopes(any())).thenAnswer {
293+
callCount++
294+
if (callCount == 1) forkedScopes else forkedScopes2
295+
}
304296

305-
// First intercept sets up context
306-
interceptor.intercept(record, consumer)
297+
// First intercept sets up context
298+
interceptor.intercept(record, consumer)
307299

308-
// Second intercept without success/failure — should clean up stale context first
309-
interceptor.intercept(record, consumer)
310-
}
300+
// Second intercept without success/failure — should clean up stale context first
301+
interceptor.intercept(record, consumer)
311302

312303
// First lifecycle token should have been closed by the defensive cleanup
313304
verify(lifecycleToken).close()

0 commit comments

Comments
 (0)