Skip to content

Commit 2501e57

Browse files
adinauerclaude
andcommitted
fix(spring-jakarta): Clean up stale ThreadLocal context in Kafka consumer interceptor
Implement clearThreadState() and defensive cleanup in intercept() to prevent ThreadLocal leaks of SentryRecordContext. Spring Kafka calls clearThreadState() in the poll loop's finally block, making it the most reliable cleanup hook for edge cases where success()/failure() callbacks are skipped (e.g. Error thrown by listener). Also add defensive cleanup at the start of intercept() to handle any stale context from a previous record that was not properly cleaned up. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 24348b2 commit 2501e57

File tree

3 files changed

+74
-0
lines changed

3 files changed

+74
-0
lines changed

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
260260
public fun <init> (Lio/sentry/IScopes;)V
261261
public fun <init> (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V
262262
public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
263+
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
263264
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
264265
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
265266
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public SentryKafkaRecordInterceptor(
5252
return delegateIntercept(record, consumer);
5353
}
5454

55+
finishStaleContext();
56+
5557
final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
5658
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
5759

@@ -98,6 +100,11 @@ public void afterRecord(
98100
}
99101
}
100102

103+
@Override
104+
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
105+
finishStaleContext();
106+
}
107+
101108
private @Nullable ConsumerRecord<K, V> delegateIntercept(
102109
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
103110
if (delegate != null) {
@@ -165,6 +172,12 @@ public void afterRecord(
165172
return transaction;
166173
}
167174

175+
private void finishStaleContext() {
176+
if (currentContext.get() != null) {
177+
finishSpan(SpanStatus.UNKNOWN, null);
178+
}
179+
}
180+
168181
private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) {
169182
final @Nullable SentryRecordContext ctx = currentContext.get();
170183
if (ctx == null) {

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,64 @@ class SentryKafkaRecordInterceptorTest {
208208
SentryKafkaRecordInterceptor.TRACE_ORIGIN,
209209
)
210210
}
211+
212+
@Test
213+
fun `clearThreadState cleans up stale context`() {
214+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
215+
val record = createRecord()
216+
217+
// intercept sets up context in ThreadLocal
218+
interceptor.intercept(record, consumer)
219+
220+
// clearThreadState should clean up without success/failure being called
221+
interceptor.clearThreadState(consumer)
222+
223+
// lifecycle token should have been closed
224+
verify(lifecycleToken).close()
225+
}
226+
227+
@Test
228+
fun `clearThreadState is no-op when no context exists`() {
229+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
230+
231+
// should not throw
232+
interceptor.clearThreadState(consumer)
233+
}
234+
235+
@Test
236+
fun `intercept cleans up stale context from previous record`() {
237+
val lifecycleToken2 = mock<ISentryLifecycleToken>()
238+
val forkedScopes2 = mock<IScopes>()
239+
whenever(forkedScopes2.options).thenReturn(options)
240+
whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2)
241+
val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2)
242+
whenever(forkedScopes2.startTransaction(any<TransactionContext>(), any())).thenReturn(tx2)
243+
244+
var callCount = 0
245+
whenever(scopes.forkedScopes(any())).thenAnswer {
246+
callCount++
247+
if (callCount == 1) {
248+
val forkedScopes1 = mock<IScopes>()
249+
whenever(forkedScopes1.options).thenReturn(options)
250+
whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken)
251+
val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1)
252+
whenever(forkedScopes1.startTransaction(any<TransactionContext>(), any())).thenReturn(tx1)
253+
forkedScopes1
254+
} else {
255+
forkedScopes2
256+
}
257+
}
258+
259+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
260+
val record = createRecord()
261+
262+
// First intercept sets up context
263+
interceptor.intercept(record, consumer)
264+
265+
// Second intercept without success/failure — should clean up stale context first
266+
interceptor.intercept(record, consumer)
267+
268+
// First lifecycle token should have been closed by the defensive cleanup
269+
verify(lifecycleToken).close()
270+
}
211271
}

0 commit comments

Comments
 (0)