@@ -3,10 +3,13 @@ package io.sentry.kafka
33import io.sentry.BaggageHeader
44import io.sentry.IScopes
55import io.sentry.ISentryLifecycleToken
6+ import io.sentry.ISpan
67import io.sentry.Sentry
78import io.sentry.SentryOptions
89import io.sentry.SentryTraceHeader
910import io.sentry.SentryTracer
11+ import io.sentry.SpanOptions
12+ import io.sentry.SpanStatus
1013import io.sentry.TransactionContext
1114import io.sentry.test.initForTest
1215import java.nio.charset.StandardCharsets
@@ -18,7 +21,12 @@ import kotlin.test.assertNotNull
1821import kotlin.test.assertSame
1922import kotlin.test.assertTrue
2023import org.apache.kafka.clients.producer.ProducerRecord
24+ import org.apache.kafka.common.header.Header
25+ import org.apache.kafka.common.header.Headers
26+ import org.mockito.kotlin.any
27+ import org.mockito.kotlin.eq
2128import org.mockito.kotlin.mock
29+ import org.mockito.kotlin.verify
2230import org.mockito.kotlin.whenever
2331
2432class SentryKafkaProducerInterceptorTest {
@@ -111,6 +119,35 @@ class SentryKafkaProducerInterceptorTest {
111119 )
112120 }
113121
122+ @Test
123+ fun `finishes span with error when header injection fails` () {
124+ val activeSpan = mock<ISpan >()
125+ val span = mock<ISpan >()
126+ val headers = mock<Headers >()
127+ val record = mock<ProducerRecord <String , String >>()
128+ val exception = RuntimeException (" boom" )
129+ whenever(scopes.span).thenReturn(activeSpan)
130+ whenever(activeSpan.startChild(eq(" queue.publish" ), eq(" my-topic" ), any<SpanOptions >()))
131+ .thenReturn(span)
132+ whenever(span.isNoOp).thenReturn(false )
133+ whenever(span.isFinished).thenReturn(false )
134+ whenever(span.toSentryTrace())
135+ .thenReturn(SentryTraceHeader (" 2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" ))
136+ whenever(span.toBaggageHeader(null )).thenReturn(null )
137+ whenever(record.topic()).thenReturn(" my-topic" )
138+ whenever(record.headers()).thenReturn(headers)
139+ whenever(headers.headers(BaggageHeader .BAGGAGE_HEADER )).thenReturn(emptyList<Header >())
140+ whenever(headers.remove(SentryTraceHeader .SENTRY_TRACE_HEADER )).thenThrow(exception)
141+
142+ val interceptor = SentryKafkaProducerInterceptor <String , String >(scopes)
143+
144+ interceptor.onSend(record)
145+
146+ verify(span).setStatus(SpanStatus .INTERNAL_ERROR )
147+ verify(span).setThrowable(exception)
148+ verify(span).finish()
149+ }
150+
114151 @Test
115152 fun `does not create span when queue tracing is disabled` () {
116153 val tx = createTransaction()
0 commit comments