Skip to content

Commit e86169f

Browse files
adinauerclaude
andcommitted
fix(spring-jakarta): [Queue Instrumentation 12] Add Kafka retry count attribute
Set messaging.message.retry.count on queue.process transactions when the Spring Kafka delivery attempt header is present. This keeps retry context on consumer traces without changing transaction lifecycle behavior. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 6d91bdc commit e86169f

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.sentry.TransactionContext;
1212
import io.sentry.TransactionOptions;
1313
import io.sentry.util.SpanUtils;
14+
import java.nio.ByteBuffer;
1415
import java.nio.charset.StandardCharsets;
1516
import java.util.Collections;
1617
import java.util.List;
@@ -21,6 +22,7 @@
2122
import org.jetbrains.annotations.NotNull;
2223
import org.jetbrains.annotations.Nullable;
2324
import org.springframework.kafka.listener.RecordInterceptor;
25+
import org.springframework.kafka.support.KafkaHeaders;
2426

2527
/**
2628
* A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka
@@ -161,6 +163,11 @@ private boolean isIgnored() {
161163
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
162164
}
163165

166+
final @Nullable Integer retryCount = retryCount(record);
167+
if (retryCount != null) {
168+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);
169+
}
170+
164171
final @Nullable String enqueuedTimeStr =
165172
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
166173
if (enqueuedTimeStr != null) {
@@ -178,6 +185,25 @@ private boolean isIgnored() {
178185
return transaction;
179186
}
180187

188+
private @Nullable Integer retryCount(final @NotNull ConsumerRecord<K, V> record) {
189+
final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
190+
if (header == null) {
191+
return null;
192+
}
193+
194+
final byte[] value = header.value();
195+
if (value == null || value.length != Integer.BYTES) {
196+
return null;
197+
}
198+
199+
final int attempt = ByteBuffer.wrap(value).getInt();
200+
if (attempt <= 0) {
201+
return null;
202+
}
203+
204+
return attempt - 1;
205+
}
206+
181207
private void finishStaleContext() {
182208
if (currentContext.get() != null) {
183209
finishSpan(SpanStatus.UNKNOWN, null);

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import io.sentry.Sentry
77
import io.sentry.SentryOptions
88
import io.sentry.SentryTraceHeader
99
import io.sentry.SentryTracer
10+
import io.sentry.SpanDataConvention
1011
import io.sentry.TransactionContext
1112
import io.sentry.test.initForTest
13+
import java.nio.ByteBuffer
1214
import java.nio.charset.StandardCharsets
1315
import kotlin.test.AfterTest
1416
import kotlin.test.BeforeTest
1517
import kotlin.test.Test
1618
import kotlin.test.assertEquals
19+
import kotlin.test.assertNull
1720
import org.apache.kafka.clients.consumer.Consumer
1821
import org.apache.kafka.clients.consumer.ConsumerRecord
1922
import org.apache.kafka.common.header.internals.RecordHeaders
@@ -24,6 +27,7 @@ import org.mockito.kotlin.never
2427
import org.mockito.kotlin.verify
2528
import org.mockito.kotlin.whenever
2629
import org.springframework.kafka.listener.RecordInterceptor
30+
import org.springframework.kafka.support.KafkaHeaders
2731

2832
class SentryKafkaRecordInterceptorTest {
2933

@@ -32,6 +36,7 @@ class SentryKafkaRecordInterceptorTest {
3236
private lateinit var options: SentryOptions
3337
private lateinit var consumer: Consumer<String, String>
3438
private lateinit var lifecycleToken: ISentryLifecycleToken
39+
private lateinit var transaction: SentryTracer
3540

3641
@BeforeTest
3742
fun setup() {
@@ -52,8 +57,9 @@ class SentryKafkaRecordInterceptorTest {
5257
whenever(forkedScopes.options).thenReturn(options)
5358
whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken)
5459

55-
val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
56-
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any())).thenReturn(tx)
60+
transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes)
61+
whenever(forkedScopes.startTransaction(any<TransactionContext>(), any()))
62+
.thenReturn(transaction)
5763
}
5864

5965
@AfterTest
@@ -81,6 +87,7 @@ class SentryKafkaRecordInterceptorTest {
8187
sentryTrace: String? = null,
8288
baggage: String? = null,
8389
enqueuedTime: Long? = null,
90+
deliveryAttempt: Int? = null,
8491
): ConsumerRecord<String, String> {
8592
val headers = RecordHeaders()
8693
sentryTrace?.let {
@@ -95,6 +102,12 @@ class SentryKafkaRecordInterceptorTest {
95102
it.toString().toByteArray(StandardCharsets.UTF_8),
96103
)
97104
}
105+
deliveryAttempt?.let {
106+
headers.add(
107+
KafkaHeaders.DELIVERY_ATTEMPT,
108+
ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(),
109+
)
110+
}
98111
val record = ConsumerRecord<String, String>("my-topic", 0, 0L, "key", "value")
99112
headers.forEach { record.headers().add(it) }
100113
return record
@@ -132,6 +145,26 @@ class SentryKafkaRecordInterceptorTest {
132145
verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull())
133146
}
134147

148+
@Test
149+
fun `sets retry count from delivery attempt header`() {
150+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
151+
val record = createRecordWithHeaders(deliveryAttempt = 3)
152+
153+
withMockSentry { interceptor.intercept(record, consumer) }
154+
155+
assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
156+
}
157+
158+
@Test
159+
fun `does not set retry count when delivery attempt header is missing`() {
160+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
161+
val record = createRecord()
162+
163+
withMockSentry { interceptor.intercept(record, consumer) }
164+
165+
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
166+
}
167+
135168
@Test
136169
fun `does not create span when queue tracing is disabled`() {
137170
options.isEnableQueueTracing = false

0 commit comments

Comments
 (0)