Skip to content

Commit 24cff6d

Browse files
adinauerclaude
andcommitted
fix(spring-jakarta): [Queue Instrumentation 13] Align enqueue time with Python
Store sentry-task-enqueued-time as epoch seconds and compute receive latency from seconds on the consumer side. This aligns Java Kafka queue instrumentation with sentry-python Celery behavior for cross-SDK interoperability. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e86169f commit 24cff6d

File tree

4 files changed

+24
-7
lines changed

4 files changed

+24
-7
lines changed

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.sentry.spring.jakarta.kafka;
22

33
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
45
import io.sentry.IScopes;
56
import io.sentry.ISentryLifecycleToken;
67
import io.sentry.ITransaction;
@@ -172,8 +173,9 @@ private boolean isIgnored() {
172173
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
173174
if (enqueuedTimeStr != null) {
174175
try {
175-
final long enqueuedTime = Long.parseLong(enqueuedTimeStr);
176-
final long latencyMs = System.currentTimeMillis() - enqueuedTime;
176+
final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr);
177+
final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis());
178+
final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000);
177179
if (latencyMs >= 0) {
178180
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs);
179181
}

sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.sentry.spring.jakarta.kafka;
22

33
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
45
import io.sentry.IScopes;
56
import io.sentry.ISpan;
67
import io.sentry.SentryTraceHeader;
@@ -107,6 +108,7 @@ private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan
107108
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
108109
headers.add(
109110
SENTRY_ENQUEUED_TIME_HEADER,
110-
String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
111+
String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis()))
112+
.getBytes(StandardCharsets.UTF_8));
111113
}
112114
}

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import kotlin.test.BeforeTest
1717
import kotlin.test.Test
1818
import kotlin.test.assertEquals
1919
import kotlin.test.assertNull
20+
import kotlin.test.assertTrue
2021
import org.apache.kafka.clients.consumer.Consumer
2122
import org.apache.kafka.clients.consumer.ConsumerRecord
2223
import org.apache.kafka.common.header.internals.RecordHeaders
@@ -86,7 +87,7 @@ class SentryKafkaRecordInterceptorTest {
8687
private fun createRecordWithHeaders(
8788
sentryTrace: String? = null,
8889
baggage: String? = null,
89-
enqueuedTime: Long? = null,
90+
enqueuedTime: String? = null,
9091
deliveryAttempt: Int? = null,
9192
): ConsumerRecord<String, String> {
9293
val headers = RecordHeaders()
@@ -99,7 +100,7 @@ class SentryKafkaRecordInterceptorTest {
99100
enqueuedTime?.let {
100101
headers.add(
101102
SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER,
102-
it.toString().toByteArray(StandardCharsets.UTF_8),
103+
it.toByteArray(StandardCharsets.UTF_8),
103104
)
104105
}
105106
deliveryAttempt?.let {
@@ -165,6 +166,18 @@ class SentryKafkaRecordInterceptorTest {
165166
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
166167
}
167168

169+
@Test
170+
fun `sets receive latency from enqueued time in epoch seconds`() {
171+
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
172+
val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString()
173+
val record = createRecordWithHeaders(enqueuedTime = enqueuedTime)
174+
175+
withMockSentry { interceptor.intercept(record, consumer) }
176+
177+
val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY)
178+
assertTrue(latency is Long && latency >= 0)
179+
}
180+
168181
@Test
169182
fun `does not create span when queue tracing is disabled`() {
170183
options.isEnableQueueTracing = false

sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ class SentryProducerInterceptorTest {
104104
val enqueuedTimeHeader =
105105
resultHeaders.lastHeader(SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER)
106106
assertNotNull(enqueuedTimeHeader, "sentry-task-enqueued-time header should be injected")
107-
val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toLong()
108-
assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch millis value")
107+
val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble()
108+
assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch seconds value")
109109
}
110110

111111
@Test

0 commit comments

Comments
 (0)