Skip to content

Commit 6099047

Browse files
adinauerclaude
andcommitted
feat(spring-jakarta): Add Kafka consumer instrumentation
Add SentryKafkaRecordInterceptor that creates queue.process transactions for incoming Kafka records. Forks scopes per record, extracts sentry-trace and baggage headers for distributed tracing via continueTrace, and calculates messaging.message.receive.latency from the enqueued-time header. Composes with existing RecordInterceptor via delegation. Span lifecycle is managed through success/failure callbacks. Add SentryKafkaConsumerBeanPostProcessor to register the interceptor on ConcurrentKafkaListenerContainerFactory beans. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 5049ffc commit 6099047

File tree

5 files changed

+537
-0
lines changed

5 files changed

+537
-0
lines changed

sentry-spring-jakarta/api/sentry-spring-jakarta.api

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand
244244
public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object;
245245
}
246246

247+
public final class io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered {
248+
public fun <init> ()V
249+
public fun getOrder ()I
250+
public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
251+
}
252+
247253
public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered {
248254
public fun <init> ()V
249255
public fun getOrder ()I
@@ -254,6 +260,15 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : o
254260
public fun <init> (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V
255261
}
256262

263+
public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor {
264+
public fun <init> (Lio/sentry/IScopes;)V
265+
public fun <init> (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V
266+
public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
267+
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
268+
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
269+
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
270+
}
271+
257272
public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration {
258273
public fun <init> ()V
259274
public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.sentry.spring.jakarta.kafka;
2+
3+
import io.sentry.ScopesAdapter;
4+
import java.lang.reflect.Field;
5+
import org.jetbrains.annotations.ApiStatus;
6+
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
8+
import org.springframework.beans.BeansException;
9+
import org.springframework.beans.factory.config.BeanPostProcessor;
10+
import org.springframework.core.Ordered;
11+
import org.springframework.core.PriorityOrdered;
12+
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
13+
import org.springframework.kafka.listener.RecordInterceptor;
14+
15+
/**
16+
* Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory}
17+
* beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate.
18+
*/
19+
@ApiStatus.Internal
20+
public final class SentryKafkaConsumerBeanPostProcessor
21+
implements BeanPostProcessor, PriorityOrdered {
22+
23+
@Override
24+
@SuppressWarnings("unchecked")
25+
public @NotNull Object postProcessAfterInitialization(
26+
final @NotNull Object bean, final @NotNull String beanName) throws BeansException {
27+
if (bean instanceof AbstractKafkaListenerContainerFactory) {
28+
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory =
29+
(AbstractKafkaListenerContainerFactory<?, ?, ?>) bean;
30+
31+
final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory);
32+
if (existing instanceof SentryKafkaRecordInterceptor) {
33+
return bean;
34+
}
35+
36+
@SuppressWarnings("rawtypes")
37+
final RecordInterceptor sentryInterceptor =
38+
new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing);
39+
factory.setRecordInterceptor(sentryInterceptor);
40+
}
41+
return bean;
42+
}
43+
44+
@SuppressWarnings("unchecked")
45+
private @Nullable RecordInterceptor<?, ?> getExistingInterceptor(
46+
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
47+
try {
48+
final @NotNull Field field =
49+
AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor");
50+
field.setAccessible(true);
51+
return (RecordInterceptor<?, ?>) field.get(factory);
52+
} catch (NoSuchFieldException | IllegalAccessException e) {
53+
return null;
54+
}
55+
}
56+
57+
@Override
58+
public int getOrder() {
59+
return Ordered.LOWEST_PRECEDENCE;
60+
}
61+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package io.sentry.spring.jakarta.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.IScopes;
5+
import io.sentry.ISentryLifecycleToken;
6+
import io.sentry.ITransaction;
7+
import io.sentry.SentryTraceHeader;
8+
import io.sentry.SpanDataConvention;
9+
import io.sentry.SpanStatus;
10+
import io.sentry.TransactionContext;
11+
import io.sentry.TransactionOptions;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import org.apache.kafka.clients.consumer.Consumer;
16+
import org.apache.kafka.clients.consumer.ConsumerRecord;
17+
import org.apache.kafka.common.header.Header;
18+
import org.jetbrains.annotations.ApiStatus;
19+
import org.jetbrains.annotations.NotNull;
20+
import org.jetbrains.annotations.Nullable;
21+
import org.springframework.kafka.listener.RecordInterceptor;
22+
23+
/**
24+
* A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka
25+
* records with distributed tracing support.
26+
*/
27+
@ApiStatus.Internal
28+
public final class SentryKafkaRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
29+
30+
static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer";
31+
32+
private final @NotNull IScopes scopes;
33+
private final @Nullable RecordInterceptor<K, V> delegate;
34+
35+
private static final @NotNull ThreadLocal<SentryRecordContext> currentContext =
36+
new ThreadLocal<>();
37+
38+
public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) {
39+
this(scopes, null);
40+
}
41+
42+
public SentryKafkaRecordInterceptor(
43+
final @NotNull IScopes scopes, final @Nullable RecordInterceptor<K, V> delegate) {
44+
this.scopes = scopes;
45+
this.delegate = delegate;
46+
}
47+
48+
@Override
49+
public @Nullable ConsumerRecord<K, V> intercept(
50+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
51+
if (!scopes.getOptions().isEnableQueueTracing()) {
52+
return delegateIntercept(record, consumer);
53+
}
54+
55+
final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
56+
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();
57+
58+
continueTrace(forkedScopes, record);
59+
60+
final @Nullable ITransaction transaction = startTransaction(forkedScopes, record);
61+
currentContext.set(new SentryRecordContext(lifecycleToken, transaction));
62+
63+
return delegateIntercept(record, consumer);
64+
}
65+
66+
@Override
67+
public void success(
68+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
69+
try {
70+
if (delegate != null) {
71+
delegate.success(record, consumer);
72+
}
73+
} finally {
74+
finishSpan(SpanStatus.OK, null);
75+
}
76+
}
77+
78+
@Override
79+
public void failure(
80+
final @NotNull ConsumerRecord<K, V> record,
81+
final @NotNull Exception exception,
82+
final @NotNull Consumer<K, V> consumer) {
83+
try {
84+
if (delegate != null) {
85+
delegate.failure(record, exception, consumer);
86+
}
87+
} finally {
88+
finishSpan(SpanStatus.INTERNAL_ERROR, exception);
89+
}
90+
}
91+
92+
@Override
93+
public void afterRecord(
94+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
95+
if (delegate != null) {
96+
delegate.afterRecord(record, consumer);
97+
}
98+
}
99+
100+
private @Nullable ConsumerRecord<K, V> delegateIntercept(
101+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
102+
if (delegate != null) {
103+
return delegate.intercept(record, consumer);
104+
}
105+
return record;
106+
}
107+
108+
private void continueTrace(
109+
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
110+
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
111+
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
112+
final @Nullable List<String> baggageHeaders =
113+
baggage != null ? Collections.singletonList(baggage) : null;
114+
forkedScopes.continueTrace(sentryTrace, baggageHeaders);
115+
}
116+
117+
private @Nullable ITransaction startTransaction(
118+
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
119+
if (!forkedScopes.getOptions().isTracingEnabled()) {
120+
return null;
121+
}
122+
123+
final @NotNull TransactionOptions txOptions = new TransactionOptions();
124+
txOptions.setOrigin(TRACE_ORIGIN);
125+
txOptions.setBindToScope(true);
126+
127+
final @NotNull ITransaction transaction =
128+
forkedScopes.startTransaction(
129+
new TransactionContext("queue.process", "queue.process"), txOptions);
130+
131+
if (transaction.isNoOp()) {
132+
return null;
133+
}
134+
135+
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
136+
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
137+
138+
final @Nullable String messageId = headerValue(record, "messaging.message.id");
139+
if (messageId != null) {
140+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
141+
}
142+
143+
final @Nullable String enqueuedTimeStr =
144+
headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER);
145+
if (enqueuedTimeStr != null) {
146+
try {
147+
final long enqueuedTime = Long.parseLong(enqueuedTimeStr);
148+
final long latencyMs = System.currentTimeMillis() - enqueuedTime;
149+
if (latencyMs >= 0) {
150+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs);
151+
}
152+
} catch (NumberFormatException ignored) {
153+
// ignore malformed header
154+
}
155+
}
156+
157+
return transaction;
158+
}
159+
160+
private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) {
161+
final @Nullable SentryRecordContext ctx = currentContext.get();
162+
if (ctx == null) {
163+
return;
164+
}
165+
currentContext.remove();
166+
167+
try {
168+
final @Nullable ITransaction transaction = ctx.transaction;
169+
if (transaction != null) {
170+
transaction.setStatus(status);
171+
if (throwable != null) {
172+
transaction.setThrowable(throwable);
173+
}
174+
transaction.finish();
175+
}
176+
} finally {
177+
ctx.lifecycleToken.close();
178+
}
179+
}
180+
181+
private @Nullable String headerValue(
182+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
183+
final @Nullable Header header = record.headers().lastHeader(headerName);
184+
if (header == null || header.value() == null) {
185+
return null;
186+
}
187+
return new String(header.value(), StandardCharsets.UTF_8);
188+
}
189+
190+
private static final class SentryRecordContext {
191+
final @NotNull ISentryLifecycleToken lifecycleToken;
192+
final @Nullable ITransaction transaction;
193+
194+
SentryRecordContext(
195+
final @NotNull ISentryLifecycleToken lifecycleToken,
196+
final @Nullable ITransaction transaction) {
197+
this.lifecycleToken = lifecycleToken;
198+
this.transaction = transaction;
199+
}
200+
}
201+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.sentry.spring.jakarta.kafka
2+
3+
import kotlin.test.Test
4+
import kotlin.test.assertSame
5+
import kotlin.test.assertTrue
6+
import org.mockito.kotlin.mock
7+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
8+
import org.springframework.kafka.core.ConsumerFactory
9+
10+
class SentryKafkaConsumerBeanPostProcessorTest {
11+
12+
@Test
13+
fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() {
14+
val consumerFactory = mock<ConsumerFactory<String, String>>()
15+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
16+
factory.consumerFactory = consumerFactory
17+
18+
val processor = SentryKafkaConsumerBeanPostProcessor()
19+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
20+
21+
// Verify via reflection that the interceptor was set
22+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
23+
field.isAccessible = true
24+
val interceptor = field.get(factory)
25+
assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>)
26+
}
27+
28+
@Test
29+
fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() {
30+
val consumerFactory = mock<ConsumerFactory<String, String>>()
31+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
32+
factory.consumerFactory = consumerFactory
33+
34+
val processor = SentryKafkaConsumerBeanPostProcessor()
35+
// First wrap
36+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
37+
38+
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
39+
field.isAccessible = true
40+
val firstInterceptor = field.get(factory)
41+
42+
// Second wrap — should be idempotent
43+
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
44+
val secondInterceptor = field.get(factory)
45+
46+
assertSame(firstInterceptor, secondInterceptor)
47+
}
48+
49+
@Test
50+
fun `does not wrap non-factory beans`() {
51+
val someBean = "not a factory"
52+
val processor = SentryKafkaConsumerBeanPostProcessor()
53+
54+
val result = processor.postProcessAfterInitialization(someBean, "someBean")
55+
56+
assertSame(someBean, result)
57+
}
58+
}

0 commit comments

Comments
 (0)