Skip to content

Commit f020319

Browse files
committed
fix(kafka): [Queue Instrumentation 24] Read all baggage headers on consumers
Pass every Kafka baggage header through trace continuation in both the raw Kafka helper and the Spring Kafka record interceptor. Previously both consumer paths used lastHeader("baggage"), which dropped all earlier baggage values and could break interop with upstream OTel or other W3C baggage producers. Reading the full header list preserves the existing baggage context during queue trace continuation.
1 parent 19cb740 commit f020319

4 files changed

Lines changed: 74 additions & 6 deletions

File tree

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.sentry.util.SpanUtils;
1616
import java.nio.ByteBuffer;
1717
import java.nio.charset.StandardCharsets;
18-
import java.util.Collections;
18+
import java.util.ArrayList;
1919
import java.util.List;
2020
import java.util.concurrent.Callable;
2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -215,9 +215,8 @@ private void finishTransaction(
215215
private <K, V> @Nullable TransactionContext continueTrace(
216216
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
217217
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
218-
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
219218
final @Nullable List<String> baggageHeaders =
220-
baggage != null ? Collections.singletonList(baggage) : null;
219+
headerValues(record, BaggageHeader.BAGGAGE_HEADER);
221220
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
222221
}
223222

@@ -265,4 +264,18 @@ private void finishTransaction(
265264
}
266265
return new String(header.value(), StandardCharsets.UTF_8);
267266
}
267+
268+
private <K, V> @Nullable List<String> headerValues(
269+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
270+
@Nullable List<String> values = null;
271+
for (final @NotNull Header header : record.headers().headers(headerName)) {
272+
if (header.value() != null) {
273+
if (values == null) {
274+
values = new ArrayList<>();
275+
}
276+
values.add(new String(header.value(), StandardCharsets.UTF_8));
277+
}
278+
}
279+
return values;
280+
}
268281
}

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ class SentryKafkaConsumerTracingTest {
111111
verify(lifecycleToken).close()
112112
}
113113

114+
@Test
115+
fun `withTracing passes all baggage headers to continueTrace`() {
116+
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
117+
val record =
118+
createRecord(
119+
sentryTrace = sentryTraceValue,
120+
baggageHeaders = listOf("third=party", "sentry-sample_rate=1"),
121+
)
122+
123+
tracing.withTracingImpl(record, Callable { "done" })
124+
125+
verify(forkedScopes)
126+
.continueTrace(eq(sentryTraceValue), eq(listOf("third=party", "sentry-sample_rate=1")))
127+
}
128+
114129
@Test
115130
fun `withTracing skips scope forking when queue tracing is disabled`() {
116131
options.isEnableQueueTracing = false
@@ -193,6 +208,7 @@ class SentryKafkaConsumerTracingTest {
193208
topic: String = "my-topic",
194209
sentryTrace: String? = null,
195210
baggage: String? = null,
211+
baggageHeaders: List<String>? = null,
196212
messageId: String? = null,
197213
deliveryAttempt: Int? = null,
198214
enqueuedTime: String? = null,
@@ -205,6 +221,9 @@ class SentryKafkaConsumerTracingTest {
205221
baggage?.let {
206222
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
207223
}
224+
baggageHeaders?.forEach {
225+
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
226+
}
208227
messageId?.let {
209228
headers.add(SpanDataConvention.MESSAGING_MESSAGE_ID, it.toByteArray(StandardCharsets.UTF_8))
210229
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import io.sentry.util.SpanUtils;
1515
import java.nio.ByteBuffer;
1616
import java.nio.charset.StandardCharsets;
17-
import java.util.Collections;
17+
import java.util.ArrayList;
1818
import java.util.List;
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -126,9 +126,8 @@ private boolean isIgnored() {
126126
private @Nullable TransactionContext continueTrace(
127127
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
128128
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
129-
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
130129
final @Nullable List<String> baggageHeaders =
131-
baggage != null ? Collections.singletonList(baggage) : null;
130+
headerValues(record, BaggageHeader.BAGGAGE_HEADER);
132131
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
133132
}
134133

@@ -243,6 +242,20 @@ private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwa
243242
return new String(header.value(), StandardCharsets.UTF_8);
244243
}
245244

245+
private @Nullable List<String> headerValues(
246+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
247+
@Nullable List<String> values = null;
248+
for (final @NotNull Header header : record.headers().headers(headerName)) {
249+
if (header.value() != null) {
250+
if (values == null) {
251+
values = new ArrayList<>();
252+
}
253+
values.add(new String(header.value(), StandardCharsets.UTF_8));
254+
}
255+
}
256+
return values;
257+
}
258+
246259
private static final class SentryRecordContext {
247260
final @NotNull ISentryLifecycleToken lifecycleToken;
248261
final @Nullable ITransaction transaction;

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class SentryKafkaRecordInterceptorTest {
8181
private fun createRecordWithHeaders(
8282
sentryTrace: String? = null,
8383
baggage: String? = null,
84+
baggageHeaders: List<String>? = null,
8485
enqueuedTime: String? = null,
8586
deliveryAttempt: Int? = null,
8687
): ConsumerRecord<String, String> {
@@ -91,6 +92,9 @@ class SentryKafkaRecordInterceptorTest {
9192
baggage?.let {
9293
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
9394
}
95+
baggageHeaders?.forEach {
96+
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
97+
}
9498
enqueuedTime?.let {
9599
headers.add(
96100
SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER,
@@ -141,6 +145,25 @@ class SentryKafkaRecordInterceptorTest {
141145
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
142146
}
143147

148+
@Test
149+
fun `intercept passes all baggage headers to continueTrace`() {
150+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
151+
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
152+
val record =
153+
createRecordWithHeaders(
154+
sentryTrace = sentryTraceValue,
155+
baggageHeaders = listOf("third=party", "sentry-sample_rate=1"),
156+
)
157+
158+
interceptor.intercept(record, consumer)
159+
160+
verify(forkedScopes)
161+
.continueTrace(
162+
org.mockito.kotlin.eq(sentryTraceValue),
163+
org.mockito.kotlin.eq(listOf("third=party", "sentry-sample_rate=1")),
164+
)
165+
}
166+
144167
@Test
145168
fun `sets retry count from delivery attempt header`() {
146169
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

0 commit comments

Comments
 (0)