Skip to content

Commit 37b7d28

Browse files
authored
Merge pull request #5320 from getsentry/fix/queue-instrumentation-kafka-baggage-headers
fix(kafka): [Queue Instrumentation 24] Read all baggage headers on consumers
2 parents 6186aca + f020319 commit 37b7d28

4 files changed

Lines changed: 74 additions & 6 deletions

File tree

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.sentry.util.SpanUtils;
1616
import java.nio.ByteBuffer;
1717
import java.nio.charset.StandardCharsets;
18-
import java.util.Collections;
18+
import java.util.ArrayList;
1919
import java.util.List;
2020
import java.util.concurrent.Callable;
2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -215,9 +215,8 @@ private void finishTransaction(
215215
private <K, V> @Nullable TransactionContext continueTrace(
216216
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
217217
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
218-
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
219218
final @Nullable List<String> baggageHeaders =
220-
baggage != null ? Collections.singletonList(baggage) : null;
219+
headerValues(record, BaggageHeader.BAGGAGE_HEADER);
221220
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
222221
}
223222

@@ -265,4 +264,18 @@ private void finishTransaction(
265264
}
266265
return new String(header.value(), StandardCharsets.UTF_8);
267266
}
267+
268+
private <K, V> @Nullable List<String> headerValues(
269+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
270+
@Nullable List<String> values = null;
271+
for (final @NotNull Header header : record.headers().headers(headerName)) {
272+
if (header.value() != null) {
273+
if (values == null) {
274+
values = new ArrayList<>();
275+
}
276+
values.add(new String(header.value(), StandardCharsets.UTF_8));
277+
}
278+
}
279+
return values;
280+
}
268281
}

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ class SentryKafkaConsumerTracingTest {
111111
verify(lifecycleToken).close()
112112
}
113113

114+
@Test
115+
fun `withTracing passes all baggage headers to continueTrace`() {
116+
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
117+
val record =
118+
createRecord(
119+
sentryTrace = sentryTraceValue,
120+
baggageHeaders = listOf("third=party", "sentry-sample_rate=1"),
121+
)
122+
123+
tracing.withTracingImpl(record, Callable { "done" })
124+
125+
verify(forkedScopes)
126+
.continueTrace(eq(sentryTraceValue), eq(listOf("third=party", "sentry-sample_rate=1")))
127+
}
128+
114129
@Test
115130
fun `withTracing skips scope forking when queue tracing is disabled`() {
116131
options.isEnableQueueTracing = false
@@ -193,6 +208,7 @@ class SentryKafkaConsumerTracingTest {
193208
topic: String = "my-topic",
194209
sentryTrace: String? = null,
195210
baggage: String? = null,
211+
baggageHeaders: List<String>? = null,
196212
messageId: String? = null,
197213
deliveryAttempt: Int? = null,
198214
enqueuedTime: String? = null,
@@ -205,6 +221,9 @@ class SentryKafkaConsumerTracingTest {
205221
baggage?.let {
206222
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
207223
}
224+
baggageHeaders?.forEach {
225+
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
226+
}
208227
messageId?.let {
209228
headers.add(SpanDataConvention.MESSAGING_MESSAGE_ID, it.toByteArray(StandardCharsets.UTF_8))
210229
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import io.sentry.util.SpanUtils;
1515
import java.nio.ByteBuffer;
1616
import java.nio.charset.StandardCharsets;
17-
import java.util.Collections;
17+
import java.util.ArrayList;
1818
import java.util.List;
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -126,9 +126,8 @@ private boolean isIgnored() {
126126
private @Nullable TransactionContext continueTrace(
127127
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
128128
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
129-
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
130129
final @Nullable List<String> baggageHeaders =
131-
baggage != null ? Collections.singletonList(baggage) : null;
130+
headerValues(record, BaggageHeader.BAGGAGE_HEADER);
132131
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
133132
}
134133

@@ -243,6 +242,20 @@ private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwa
243242
return new String(header.value(), StandardCharsets.UTF_8);
244243
}
245244

245+
private @Nullable List<String> headerValues(
246+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
247+
@Nullable List<String> values = null;
248+
for (final @NotNull Header header : record.headers().headers(headerName)) {
249+
if (header.value() != null) {
250+
if (values == null) {
251+
values = new ArrayList<>();
252+
}
253+
values.add(new String(header.value(), StandardCharsets.UTF_8));
254+
}
255+
}
256+
return values;
257+
}
258+
246259
private static final class SentryRecordContext {
247260
final @NotNull ISentryLifecycleToken lifecycleToken;
248261
final @Nullable ITransaction transaction;

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class SentryKafkaRecordInterceptorTest {
8181
private fun createRecordWithHeaders(
8282
sentryTrace: String? = null,
8383
baggage: String? = null,
84+
baggageHeaders: List<String>? = null,
8485
enqueuedTime: String? = null,
8586
deliveryAttempt: Int? = null,
8687
): ConsumerRecord<String, String> {
@@ -91,6 +92,9 @@ class SentryKafkaRecordInterceptorTest {
9192
baggage?.let {
9293
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
9394
}
95+
baggageHeaders?.forEach {
96+
headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8))
97+
}
9498
enqueuedTime?.let {
9599
headers.add(
96100
SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER,
@@ -141,6 +145,25 @@ class SentryKafkaRecordInterceptorTest {
141145
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
142146
}
143147

148+
@Test
149+
fun `intercept passes all baggage headers to continueTrace`() {
150+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
151+
val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1"
152+
val record =
153+
createRecordWithHeaders(
154+
sentryTrace = sentryTraceValue,
155+
baggageHeaders = listOf("third=party", "sentry-sample_rate=1"),
156+
)
157+
158+
interceptor.intercept(record, consumer)
159+
160+
verify(forkedScopes)
161+
.continueTrace(
162+
org.mockito.kotlin.eq(sentryTraceValue),
163+
org.mockito.kotlin.eq(listOf("third=party", "sentry-sample_rate=1")),
164+
)
165+
}
166+
144167
@Test
145168
fun `sets retry count from delivery attempt header`() {
146169
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

0 commit comments

Comments
 (0)