Skip to content

Commit d0b2380

Browse files
adinauerclaude
andcommitted
fix(kafka): [Queue Instrumentation 25] Finish producer spans on failures
Keep a local producer child span reference and always finish it when instrumentation fails after span creation. This preserves fail-open send behavior without leaking unfinished queue.publish spans. Add a regression test covering header injection failures. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent f020319 commit d0b2380

2 files changed

Lines changed: 47 additions & 4 deletions

File tree

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public SentryKafkaProducerInterceptor(
5959
return record;
6060
}
6161

62+
@Nullable ISpan span = null;
6263
try {
6364
final @NotNull SpanOptions spanOptions = new SpanOptions();
6465
spanOptions.setOrigin(traceOrigin);
65-
final @NotNull ISpan span =
66-
activeSpan.startChild("queue.publish", record.topic(), spanOptions);
66+
span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
6767
if (span.isNoOp()) {
6868
return record;
6969
}
@@ -72,14 +72,20 @@ public SentryKafkaProducerInterceptor(
7272
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
7373

7474
injectHeaders(record.headers(), span);
75-
7675
span.setStatus(SpanStatus.OK);
77-
span.finish();
7876
} catch (Throwable t) {
77+
if (span != null) {
78+
span.setThrowable(t);
79+
span.setStatus(SpanStatus.INTERNAL_ERROR);
80+
}
7981
scopes
8082
.getOptions()
8183
.getLogger()
8284
.log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t);
85+
} finally {
86+
if (span != null && !span.isFinished()) {
87+
span.finish();
88+
}
8389
}
8490

8591
return record;

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ package io.sentry.kafka
33
import io.sentry.BaggageHeader
44
import io.sentry.IScopes
55
import io.sentry.ISentryLifecycleToken
6+
import io.sentry.ISpan
67
import io.sentry.Sentry
78
import io.sentry.SentryOptions
89
import io.sentry.SentryTraceHeader
910
import io.sentry.SentryTracer
11+
import io.sentry.SpanOptions
12+
import io.sentry.SpanStatus
1013
import io.sentry.TransactionContext
1114
import io.sentry.test.initForTest
1215
import java.nio.charset.StandardCharsets
@@ -18,7 +21,12 @@ import kotlin.test.assertNotNull
1821
import kotlin.test.assertSame
1922
import kotlin.test.assertTrue
2023
import org.apache.kafka.clients.producer.ProducerRecord
24+
import org.apache.kafka.common.header.Header
25+
import org.apache.kafka.common.header.Headers
26+
import org.mockito.kotlin.any
27+
import org.mockito.kotlin.eq
2128
import org.mockito.kotlin.mock
29+
import org.mockito.kotlin.verify
2230
import org.mockito.kotlin.whenever
2331

2432
class SentryKafkaProducerInterceptorTest {
@@ -111,6 +119,35 @@ class SentryKafkaProducerInterceptorTest {
111119
)
112120
}
113121

122+
@Test
123+
fun `finishes span with error when header injection fails`() {
124+
val activeSpan = mock<ISpan>()
125+
val span = mock<ISpan>()
126+
val headers = mock<Headers>()
127+
val record = mock<ProducerRecord<String, String>>()
128+
val exception = RuntimeException("boom")
129+
whenever(scopes.span).thenReturn(activeSpan)
130+
whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any<SpanOptions>()))
131+
.thenReturn(span)
132+
whenever(span.isNoOp).thenReturn(false)
133+
whenever(span.isFinished).thenReturn(false)
134+
whenever(span.toSentryTrace())
135+
.thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"))
136+
whenever(span.toBaggageHeader(null)).thenReturn(null)
137+
whenever(record.topic()).thenReturn("my-topic")
138+
whenever(record.headers()).thenReturn(headers)
139+
whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList<Header>())
140+
whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception)
141+
142+
val interceptor = SentryKafkaProducerInterceptor<String, String>(scopes)
143+
144+
interceptor.onSend(record)
145+
146+
verify(span).setStatus(SpanStatus.INTERNAL_ERROR)
147+
verify(span).setThrowable(exception)
148+
verify(span).finish()
149+
}
150+
114151
@Test
115152
fun `does not create span when queue tracing is disabled`() {
116153
val tx = createTransaction()

0 commit comments

Comments
 (0)