Skip to content

Commit 4a48e54

Browse files
committed
fix(spring-jakarta): [Queue Instrumentation 29] Set body_size on Spring Kafka consumer transaction
The Spring Kafka consumer path (`SentryKafkaRecordInterceptor`) never set `messaging.message.body_size`, while the raw Kafka consumer helper (`SentryKafkaConsumerTracing`) already sets it from `ConsumerRecord#serializedValueSize()`. Both are first-party Kafka consumer integrations shipped in the same stack and should emit the same messaging schema so dashboards and queries remain consistent across Spring vs. raw Kafka setups. Mirror the raw helper: set `SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE` on the `queue.process` transaction when `serializedValueSize() >= 0`. Add regression tests for both the positive and the -1 (unknown) cases. #skip-changelog
1 parent e0bb87f commit 4a48e54

2 files changed

Lines changed: 41 additions & 3 deletions

File tree

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ private boolean isIgnored() {
177177
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
178178
}
179179

180+
final int bodySize = record.serializedValueSize();
181+
if (bodySize >= 0) {
182+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize);
183+
}
184+
180185
final @Nullable Integer retryCount = retryCount(record);
181186
if (retryCount != null) {
182187
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import io.sentry.kafka.SentryKafkaProducerInterceptor
1313
import io.sentry.test.initForTest
1414
import java.nio.ByteBuffer
1515
import java.nio.charset.StandardCharsets
16+
import java.util.Optional
1617
import kotlin.test.AfterTest
1718
import kotlin.test.BeforeTest
1819
import kotlin.test.Test
@@ -22,6 +23,7 @@ import kotlin.test.assertTrue
2223
import org.apache.kafka.clients.consumer.Consumer
2324
import org.apache.kafka.clients.consumer.ConsumerRecord
2425
import org.apache.kafka.common.header.internals.RecordHeaders
26+
import org.apache.kafka.common.record.TimestampType
2527
import org.mockito.kotlin.any
2628
import org.mockito.kotlin.mock
2729
import org.mockito.kotlin.never
@@ -72,10 +74,21 @@ class SentryKafkaRecordInterceptorTest {
7274
private fun createRecord(
7375
topic: String = "my-topic",
7476
headers: RecordHeaders = RecordHeaders(),
77+
serializedValueSize: Int = -1,
7578
): ConsumerRecord<String, String> {
76-
val record = ConsumerRecord<String, String>(topic, 0, 0L, "key", "value")
77-
headers.forEach { record.headers().add(it) }
78-
return record
79+
return ConsumerRecord(
80+
topic,
81+
0,
82+
0L,
83+
System.currentTimeMillis(),
84+
TimestampType.CREATE_TIME,
85+
3,
86+
serializedValueSize,
87+
"key",
88+
"value",
89+
headers,
90+
Optional.empty(),
91+
)
7992
}
8093

8194
private fun createRecordWithHeaders(
@@ -164,6 +177,26 @@ class SentryKafkaRecordInterceptorTest {
164177
)
165178
}
166179

180+
@Test
181+
fun `sets body size from serializedValueSize`() {
182+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
183+
val record = createRecord(serializedValueSize = 42)
184+
185+
interceptor.intercept(record, consumer)
186+
187+
assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
188+
}
189+
190+
@Test
191+
fun `does not set body size when serializedValueSize is negative`() {
192+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
193+
val record = createRecord(serializedValueSize = -1)
194+
195+
interceptor.intercept(record, consumer)
196+
197+
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE))
198+
}
199+
167200
@Test
168201
fun `sets retry count from delivery attempt header`() {
169202
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

0 commit comments

Comments
 (0)