Skip to content

Commit df91d0c

Browse files
adinauerclaude
andcommitted
fix(kafka): [Queue Instrumentation 21] Preserve third-party baggage on Kafka producer records
`SentryKafkaProducerInterceptor.injectHeaders(...)` previously removed and overwrote the outgoing `baggage` header on every record, discarding any third-party baggage entries already present (e.g. set by another vendor's instrumentation or the application itself). Read the existing `baggage` header values off the `ProducerRecord` and pass them to `TracingUtils.trace(...)`. The downstream `BaggageHeader.fromBaggageAndOutgoingHeader` preserves non-`sentry-*` entries in the outgoing header while Sentry continues to own its own keys. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent adf85eb commit df91d0c

2 files changed

Lines changed: 53 additions & 1 deletion

File tree

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
import io.sentry.util.SpanUtils;
1414
import io.sentry.util.TracingUtils;
1515
import java.nio.charset.StandardCharsets;
16+
import java.util.ArrayList;
17+
import java.util.List;
1618
import java.util.Map;
1719
import org.apache.kafka.clients.producer.ProducerInterceptor;
1820
import org.apache.kafka.clients.producer.ProducerRecord;
1921
import org.apache.kafka.clients.producer.RecordMetadata;
22+
import org.apache.kafka.common.header.Header;
2023
import org.apache.kafka.common.header.Headers;
2124
import org.jetbrains.annotations.ApiStatus;
2225
import org.jetbrains.annotations.NotNull;
@@ -97,8 +100,10 @@ public void close() {}
97100
public void configure(final @Nullable Map<String, ?> configs) {}
98101

99102
private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) {
103+
final @Nullable List<String> existingBaggageHeaders =
104+
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
100105
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
101-
TracingUtils.trace(scopes, null, span);
106+
TracingUtils.trace(scopes, existingBaggageHeaders, span);
102107
if (tracingHeaders != null) {
103108
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
104109
headers.remove(sentryTraceHeader.getName());
@@ -120,4 +125,19 @@ private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan
120125
String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis()))
121126
.getBytes(StandardCharsets.UTF_8));
122127
}
128+
129+
private static @Nullable List<String> readHeaderValues(
130+
final @NotNull Headers headers, final @NotNull String name) {
131+
@Nullable List<String> values = null;
132+
for (final @NotNull Header header : headers.headers(name)) {
133+
final byte @Nullable [] value = header.value();
134+
if (value != null) {
135+
if (values == null) {
136+
values = new ArrayList<>();
137+
}
138+
values.add(new String(value, StandardCharsets.UTF_8));
139+
}
140+
}
141+
return values;
142+
}
123143
}

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.sentry.kafka
22

3+
import io.sentry.BaggageHeader
34
import io.sentry.IScopes
45
import io.sentry.ISentryLifecycleToken
56
import io.sentry.Sentry
@@ -79,6 +80,37 @@ class SentryKafkaProducerInterceptorTest {
7980
assertTrue(enqueuedTime > 0)
8081
}
8182

83+
@Test
84+
fun `preserves pre-existing third-party baggage header entries`() {
85+
val tx = createTransaction()
86+
val interceptor = SentryKafkaProducerInterceptor<String, String>(scopes)
87+
val record = ProducerRecord<String, String>("my-topic", "key", "value")
88+
record
89+
.headers()
90+
.add(
91+
BaggageHeader.BAGGAGE_HEADER,
92+
"othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8),
93+
)
94+
95+
interceptor.onSend(record)
96+
97+
val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList()
98+
assertEquals(1, baggageHeaders.size)
99+
val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8)
100+
assertTrue(
101+
baggageValue.contains("othervendor=someValue"),
102+
"expected third-party baggage entry preserved, got: $baggageValue",
103+
)
104+
assertTrue(
105+
baggageValue.contains("another=thing"),
106+
"expected third-party baggage entry preserved, got: $baggageValue",
107+
)
108+
assertTrue(
109+
baggageValue.contains("sentry-"),
110+
"expected Sentry baggage entries appended, got: $baggageValue",
111+
)
112+
}
113+
82114
@Test
83115
fun `does not create span when queue tracing is disabled`() {
84116
val tx = createTransaction()

0 commit comments

Comments
 (0)