Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import io.sentry.IScopes;
import io.sentry.ISentryLifecycleToken;
import io.sentry.ITransaction;
import io.sentry.Sentry;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import io.sentry.util.SpanUtils;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -48,13 +50,13 @@ public SentryKafkaRecordInterceptor(
@Override
public @Nullable ConsumerRecord<K, V> intercept(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (!scopes.getOptions().isEnableQueueTracing()) {
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
return delegateIntercept(record, consumer);
}

finishStaleContext();

final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor");
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();

final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record);
Expand Down Expand Up @@ -105,6 +107,10 @@ public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
finishStaleContext();
}

private boolean isIgnored() {
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN);
}

private @Nullable ConsumerRecord<K, V> delegateIntercept(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (delegate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import kotlin.test.assertEquals
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.internals.RecordHeaders
import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
Expand All @@ -27,6 +28,7 @@ import org.springframework.kafka.listener.RecordInterceptor
class SentryKafkaRecordInterceptorTest {

private lateinit var scopes: IScopes
private lateinit var forkedScopes: IScopes
private lateinit var options: SentryOptions
private lateinit var consumer: Consumer<String, String>
private lateinit var lifecycleToken: ISentryLifecycleToken
Expand All @@ -46,10 +48,9 @@ class SentryKafkaRecordInterceptorTest {
whenever(scopes.options).thenReturn(options)
whenever(scopes.isEnabled).thenReturn(true)

val forkedScopes = mock<IScopes>()
forkedScopes = mock()
whenever(forkedScopes.options).thenReturn(options)
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)

val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
Expand All @@ -60,6 +61,13 @@ class SentryKafkaRecordInterceptorTest {
Sentry.close()
}

private fun <T> withMockSentry(closure: () -> T): T =
Mockito.mockStatic(Sentry::class.java).use {
it.`when`<Any> { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes)
it.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
closure.invoke()
}

private fun createRecord(
topic: String = "my-topic",
headers: RecordHeaders = RecordHeaders(),
Expand Down Expand Up @@ -93,49 +101,33 @@ class SentryKafkaRecordInterceptorTest {
}

@Test
fun `intercept creates forked scopes`() {
fun `intercept forks root scopes`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }

verify(scopes).forkedScopes("SentryKafkaRecordInterceptor")
verify(forkedScopes).makeCurrent()
}

@Test
fun `intercept continues trace from headers`() {
val forkedScopes = mock<IScopes>()
whenever(forkedScopes.options).thenReturn(options)
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)

val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)

val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
val record = createRecordWithHeaders(sentryTrace = sentryTraceValue)

interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }

verify(forkedScopes)
.continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull())
}

@Test
fun `intercept calls continueTrace with null when no headers`() {
val forkedScopes = mock<IScopes>()
whenever(forkedScopes.options).thenReturn(options)
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes)

val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)

val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }

verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
}
Expand All @@ -148,7 +140,19 @@ class SentryKafkaRecordInterceptorTest {

val result = interceptor.intercept(record, consumer)

verify(scopes, never()).forkedScopes(any())
verify(forkedScopes, never()).makeCurrent()
assertEquals(record, result)
}

@Test
fun `does not create span when origin is ignored`() {
options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN))
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

val result = interceptor.intercept(record, consumer)

verify(forkedScopes, never()).makeCurrent()
assertEquals(record, result)
}

Expand All @@ -159,7 +163,7 @@ class SentryKafkaRecordInterceptorTest {
whenever(delegate.intercept(record, consumer)).thenReturn(record)

val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }

verify(delegate).intercept(record, consumer)
}
Expand All @@ -170,8 +174,7 @@ class SentryKafkaRecordInterceptorTest {
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
val record = createRecord()

// intercept first to set up context
interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }
interceptor.success(record, consumer)

verify(delegate).success(record, consumer)
Expand All @@ -184,7 +187,7 @@ class SentryKafkaRecordInterceptorTest {
val record = createRecord()
val exception = RuntimeException("processing failed")

interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }
interceptor.failure(record, exception, consumer)

verify(delegate).failure(record, exception, consumer)
Expand Down Expand Up @@ -214,13 +217,10 @@ class SentryKafkaRecordInterceptorTest {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

// intercept sets up context in ThreadLocal
interceptor.intercept(record, consumer)
withMockSentry { interceptor.intercept(record, consumer) }

// clearThreadState should clean up without success/failure being called
interceptor.clearThreadState(consumer)

// lifecycle token should have been closed
verify(lifecycleToken).close()
}

Expand All @@ -242,28 +242,25 @@ class SentryKafkaRecordInterceptorTest {
whenever(forkedScopes2.startTransaction(any<TransactionContext>(), any())).thenReturn(tx2)

var callCount = 0
whenever(scopes.forkedScopes(any())).thenAnswer {
callCount++
if (callCount == 1) {
val forkedScopes1 = mock<IScopes>()
whenever(forkedScopes1.options).thenReturn(options)
whenever(forkedScopes1.makeCurrent()).thenReturn(lifecycleToken)
val tx1 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes1)
whenever(forkedScopes1.startTransaction(any<TransactionContext>(), any())).thenReturn(tx1)
forkedScopes1
} else {
forkedScopes2
}
}

val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val record = createRecord()

// First intercept sets up context
interceptor.intercept(record, consumer)
Mockito.mockStatic(Sentry::class.java).use { mockSentry ->
mockSentry.`when`<Any> { Sentry.getCurrentScopes() }.thenReturn(scopes)
mockSentry
.`when`<Any> { Sentry.forkedRootScopes(any()) }
.thenAnswer {
callCount++
if (callCount == 1) forkedScopes else forkedScopes2
}

// First intercept sets up context
interceptor.intercept(record, consumer)

// Second intercept without success/failure โ€” should clean up stale context first
interceptor.intercept(record, consumer)
// Second intercept without success/failure โ€” should clean up stale context first
interceptor.intercept(record, consumer)
}

// First lifecycle token should have been closed by the defensive cleanup
verify(lifecycleToken).close()
Expand Down
Loading