Skip to content

Commit 925ab2b

Browse files
adinauerclaude
andcommitted
fix(kafka): Inject trace headers even without active span
Decouple header injection from span creation in SentryKafkaProducer so that distributed tracing works for background workers, @scheduled jobs, and startup publishers that have no active span. Restructure send() to match the SentryFeignClient/OkHttp pattern: - isIgnored: pure delegate, no headers, no span - No active span: inject headers from PropagationContext, no span - Active span: start child span, inject headers, wrap callback Also simplify the implementation: - Rename injectHeaders to maybeInjectHeaders with encapsulated try/catch (matches Feign's maybeAddTracingHeaders pattern) - Remove outer try/catch around span setup - Remove redundant span.isNoOp() early-return branch - Remove redundant isFinished() guards before finish() calls Co-Authored-By: Claude <noreply@anthropic.com>
1 parent fc23438 commit 925ab2b

2 files changed

Lines changed: 69 additions & 80 deletions

File tree

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java

Lines changed: 41 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -101,43 +101,22 @@ public SentryKafkaProducer(
101101

102102
final @Nullable ISpan activeSpan = scopes.getSpan();
103103
if (activeSpan == null || activeSpan.isNoOp()) {
104+
maybeInjectHeaders(record.headers(), null);
104105
return delegate.send(record, callback);
105106
}
106107

107-
@Nullable ISpan span = null;
108-
try {
109-
final @NotNull SpanOptions spanOptions = new SpanOptions();
110-
spanOptions.setOrigin(traceOrigin);
111-
span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
112-
if (span.isNoOp()) {
113-
return delegate.send(record, callback);
114-
}
115-
116-
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
117-
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
118-
injectHeaders(record.headers(), span);
119-
} catch (Throwable t) {
120-
if (span != null) {
121-
span.setThrowable(t);
122-
span.setStatus(SpanStatus.INTERNAL_ERROR);
123-
if (!span.isFinished()) {
124-
span.finish();
125-
}
126-
}
127-
scopes
128-
.getOptions()
129-
.getLogger()
130-
.log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t);
131-
return delegate.send(record, callback);
132-
}
108+
final @NotNull SpanOptions spanOptions = new SpanOptions();
109+
spanOptions.setOrigin(traceOrigin);
110+
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);
133111

134-
final @NotNull ISpan finalSpan = span;
135-
final @NotNull Callback wrappedCallback = wrapCallback(callback, finalSpan);
112+
span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
113+
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
114+
maybeInjectHeaders(record.headers(), span);
136115

137116
try {
138-
return delegate.send(record, wrappedCallback);
117+
return delegate.send(record, wrapCallback(callback, span));
139118
} catch (Throwable t) {
140-
finishWithError(finalSpan, t);
119+
finishWithError(span, t);
141120
throw t;
142121
}
143122
}
@@ -158,9 +137,7 @@ public SentryKafkaProducer(
158137
.getLogger()
159138
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
160139
} finally {
161-
if (!span.isFinished()) {
162-
span.finish();
163-
}
140+
span.finish();
164141
if (userCallback != null) {
165142
userCallback.onCompletion(metadata, exception);
166143
}
@@ -171,41 +148,46 @@ public SentryKafkaProducer(
171148
private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
172149
span.setThrowable(t);
173150
span.setStatus(SpanStatus.INTERNAL_ERROR);
174-
if (!span.isFinished()) {
175-
span.finish();
176-
}
151+
span.finish();
177152
}
178153

179154
private boolean isIgnored() {
180155
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
181156
}
182157

183-
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
184-
final @Nullable List<String> existingBaggageHeaders =
185-
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
186-
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
187-
TracingUtils.trace(scopes, existingBaggageHeaders, span);
188-
if (tracingHeaders != null) {
189-
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
190-
headers.remove(sentryTraceHeader.getName());
191-
headers.add(
192-
sentryTraceHeader.getName(),
193-
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
194-
195-
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
196-
if (baggageHeader != null) {
197-
headers.remove(baggageHeader.getName());
158+
private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) {
159+
try {
160+
final @Nullable List<String> existingBaggageHeaders =
161+
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
162+
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
163+
TracingUtils.trace(scopes, existingBaggageHeaders, span);
164+
if (tracingHeaders != null) {
165+
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
166+
headers.remove(sentryTraceHeader.getName());
198167
headers.add(
199-
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
168+
sentryTraceHeader.getName(),
169+
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));
170+
171+
final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
172+
if (baggageHeader != null) {
173+
headers.remove(baggageHeader.getName());
174+
headers.add(
175+
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
176+
}
200177
}
201-
}
202178

203-
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
204-
headers.add(
205-
SENTRY_ENQUEUED_TIME_HEADER,
206-
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
207-
.toString()
208-
.getBytes(StandardCharsets.UTF_8));
179+
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
180+
headers.add(
181+
SENTRY_ENQUEUED_TIME_HEADER,
182+
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
183+
.toString()
184+
.getBytes(StandardCharsets.UTF_8));
185+
} catch (Throwable t) {
186+
scopes
187+
.getOptions()
188+
.getLogger()
189+
.log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t);
190+
}
209191
}
210192

211193
private static @Nullable List<String> readHeaderValues(

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import io.sentry.BaggageHeader
44
import io.sentry.IScopes
55
import io.sentry.ISentryLifecycleToken
66
import io.sentry.ISpan
7+
import io.sentry.Scope
8+
import io.sentry.ScopeCallback
79
import io.sentry.Sentry
810
import io.sentry.SentryOptions
911
import io.sentry.SentryTraceHeader
@@ -31,10 +33,10 @@ import org.apache.kafka.common.header.Header
3133
import org.apache.kafka.common.header.Headers
3234
import org.mockito.kotlin.any
3335
import org.mockito.kotlin.argumentCaptor
36+
import org.mockito.kotlin.doAnswer
3437
import org.mockito.kotlin.eq
3538
import org.mockito.kotlin.isNull
3639
import org.mockito.kotlin.mock
37-
import org.mockito.kotlin.never
3840
import org.mockito.kotlin.verify
3941
import org.mockito.kotlin.whenever
4042

@@ -58,6 +60,9 @@ class SentryKafkaProducerTest {
5860
isEnableQueueTracing = true
5961
}
6062
whenever(scopes.options).thenReturn(options)
63+
doAnswer { (it.arguments[0] as ScopeCallback).run(Scope(options)) }
64+
.whenever(scopes)
65+
.configureScope(any())
6166
delegate = mock()
6267
whenever(delegate.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null))
6368
}
@@ -213,14 +218,18 @@ class SentryKafkaProducerTest {
213218
}
214219

215220
@Test
216-
fun `delegates send without span when no active span`() {
221+
fun `injects headers but creates no span when no active span`() {
217222
whenever(scopes.span).thenReturn(null)
218223
val producer = SentryKafkaProducer(delegate, scopes)
219224
val record = ProducerRecord<String, String>("my-topic", "key", "value")
220225

221226
producer.send(record)
222227

223228
verify(delegate).send(eq(record), isNull())
229+
// Headers should still be injected from PropagationContext
230+
assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER))
231+
assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER))
232+
assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER))
224233
}
225234

226235
@Test
@@ -246,12 +255,11 @@ class SentryKafkaProducerTest {
246255
}
247256

248257
@Test
249-
fun `finishes span with error when header injection fails`() {
258+
fun `header injection failure does not prevent send`() {
250259
val activeSpan = mock<ISpan>()
251260
val span = mock<ISpan>()
252261
val headers = mock<Headers>()
253262
val record = mock<ProducerRecord<String, String>>()
254-
val exception = RuntimeException("boom")
255263
whenever(scopes.span).thenReturn(activeSpan)
256264
whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any<SpanOptions>()))
257265
.thenReturn(span)
@@ -263,16 +271,15 @@ class SentryKafkaProducerTest {
263271
whenever(record.topic()).thenReturn("my-topic")
264272
whenever(record.headers()).thenReturn(headers)
265273
whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList<Header>())
266-
whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception)
274+
whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER))
275+
.thenThrow(RuntimeException("boom"))
267276

268277
val producer = SentryKafkaProducer(delegate, scopes)
269278
producer.send(record)
270279

271-
verify(span).setStatus(SpanStatus.INTERNAL_ERROR)
272-
verify(span).setThrowable(exception)
273-
verify(span).finish()
274-
// After header-injection failure, falls back to a plain delegate send (no Sentry callback).
275-
verify(delegate).send(eq(record), isNull())
280+
// Header injection failed silently; send still proceeds with wrapped callback for span
281+
// lifecycle.
282+
verify(delegate).send(eq(record), any<Callback>())
276283
}
277284

278285
@Test
@@ -319,20 +326,20 @@ class SentryKafkaProducerTest {
319326
}
320327

321328
@Test
322-
fun `does not invoke sentry callback wrap when no-op span returned`() {
323-
val activeSpan = mock<ISpan>()
324-
val span = mock<ISpan>()
329+
fun `wraps callback even when child span is no-op`() {
330+
val tx = createTransaction()
331+
// Set max spans to 1 so the child span is no-op (over limit)
332+
options.maxSpans = 0
333+
val producer = SentryKafkaProducer(delegate, scopes)
325334
val record = ProducerRecord<String, String>("my-topic", "key", "value")
326-
whenever(scopes.span).thenReturn(activeSpan)
327-
whenever(activeSpan.isNoOp).thenReturn(false)
328-
whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any<SpanOptions>()))
329-
.thenReturn(span)
330-
whenever(span.isNoOp).thenReturn(true)
331335

332-
val producer = SentryKafkaProducer(delegate, scopes)
333336
producer.send(record)
334337

335-
verify(delegate).send(eq(record), isNull())
336-
verify(span, never()).finish()
338+
// Callback is still wrapped (no-op span finish is harmless)
339+
verify(delegate).send(eq(record), any<Callback>())
340+
// Headers should still be injected from PropagationContext
341+
assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER))
342+
assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER))
343+
assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER))
337344
}
338345
}

0 commit comments

Comments
 (0)