Skip to content

Commit 1f848a7

Browse files
adinauerclaude
andcommitted
feat(kafka): [Queue Instrumentation 17] Add manual consumer tracing helper
Add an experimental helper for wrapping raw Kafka consumer record processing in queue.process transactions. This exposes Kafka consumer tracing outside interceptor-based integrations. Capture messaging metadata and distributed tracing context in the helper so future queue instrumentation can reuse the same behavior. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent bd9d3b5 commit 1f848a7

3 files changed

Lines changed: 496 additions & 0 deletions

File tree

sentry-kafka/api/sentry-kafka.api

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k
1313
public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
1414
}
1515

16+
public final class io/sentry/kafka/SentryKafkaConsumerTracing {
17+
public static final field TRACE_ORIGIN Ljava/lang/String;
18+
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Runnable;)V
19+
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
20+
}
21+
1622
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
1723
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
1824
public static final field TRACE_ORIGIN Ljava/lang/String;
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package io.sentry.kafka;
2+
3+
import io.sentry.BaggageHeader;
4+
import io.sentry.DateUtils;
5+
import io.sentry.IScopes;
6+
import io.sentry.ISentryLifecycleToken;
7+
import io.sentry.ITransaction;
8+
import io.sentry.ScopesAdapter;
9+
import io.sentry.SentryTraceHeader;
10+
import io.sentry.SpanDataConvention;
11+
import io.sentry.SpanStatus;
12+
import io.sentry.TransactionContext;
13+
import io.sentry.TransactionOptions;
14+
import io.sentry.util.SpanUtils;
15+
import java.nio.ByteBuffer;
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.concurrent.Callable;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.common.header.Header;
22+
import org.jetbrains.annotations.ApiStatus;
23+
import org.jetbrains.annotations.NotNull;
24+
import org.jetbrains.annotations.Nullable;
25+
26+
/** Helper methods for instrumenting raw Kafka consumer record processing. */
27+
@ApiStatus.Experimental
28+
public final class SentryKafkaConsumerTracing {
29+
30+
public static final @NotNull String TRACE_ORIGIN = "manual.queue.kafka.consumer";
31+
32+
private static final @NotNull String CREATOR = "SentryKafkaConsumerTracing";
33+
private static final @NotNull String DELIVERY_ATTEMPT_HEADER = "kafka_deliveryAttempt";
34+
private static final @NotNull String MESSAGE_ID_HEADER = "messaging.message.id";
35+
36+
private final @NotNull IScopes scopes;
37+
38+
SentryKafkaConsumerTracing(final @NotNull IScopes scopes) {
39+
this.scopes = scopes;
40+
}
41+
42+
/**
43+
* Runs the provided {@link Callable} with a Kafka consumer processing transaction for the given
44+
* record.
45+
*
46+
* @param record the Kafka record being processed
47+
* @param callable the processing callback
48+
* @return the return value of the callback
49+
* @param <K> the Kafka record key type
50+
* @param <V> the Kafka record value type
51+
* @param <U> the callback return type
52+
*/
53+
public static <K, V, U> U withTracing(
54+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Callable<U> callable)
55+
throws Exception {
56+
return new SentryKafkaConsumerTracing(ScopesAdapter.getInstance())
57+
.withTracingImpl(record, callable);
58+
}
59+
60+
/**
61+
* Runs the provided {@link Runnable} with a Kafka consumer processing transaction for the given
62+
* record.
63+
*
64+
* @param record the Kafka record being processed
65+
* @param runnable the processing callback
66+
* @param <K> the Kafka record key type
67+
* @param <V> the Kafka record value type
68+
*/
69+
public static <K, V> void withTracing(
70+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Runnable runnable) {
71+
new SentryKafkaConsumerTracing(ScopesAdapter.getInstance()).withTracingImpl(record, runnable);
72+
}
73+
74+
<K, V, U> U withTracingImpl(
75+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Callable<U> callable)
76+
throws Exception {
77+
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
78+
return callable.call();
79+
}
80+
81+
final @NotNull IScopes forkedScopes;
82+
final @NotNull ISentryLifecycleToken lifecycleToken;
83+
try {
84+
forkedScopes = scopes.forkedRootScopes(CREATOR);
85+
lifecycleToken = forkedScopes.makeCurrent();
86+
} catch (Throwable ignored) {
87+
return callable.call();
88+
}
89+
90+
try (final @NotNull ISentryLifecycleToken ignored = lifecycleToken) {
91+
final @Nullable ITransaction transaction = startTransaction(forkedScopes, record);
92+
boolean didError = false;
93+
@Nullable Throwable callbackThrowable = null;
94+
95+
try {
96+
return callable.call();
97+
} catch (Throwable t) {
98+
didError = true;
99+
callbackThrowable = t;
100+
throw t;
101+
} finally {
102+
finishTransaction(
103+
transaction, didError ? SpanStatus.INTERNAL_ERROR : SpanStatus.OK, callbackThrowable);
104+
}
105+
}
106+
}
107+
108+
<K, V> void withTracingImpl(
109+
final @NotNull ConsumerRecord<K, V> record, final @NotNull Runnable runnable) {
110+
try {
111+
withTracingImpl(
112+
record,
113+
() -> {
114+
runnable.run();
115+
return null;
116+
});
117+
} catch (Throwable t) {
118+
throwUnchecked(t);
119+
}
120+
}
121+
122+
@SuppressWarnings("unchecked")
123+
private static <T extends Throwable> void throwUnchecked(final @NotNull Throwable throwable)
124+
throws T {
125+
throw (T) throwable;
126+
}
127+
128+
private boolean isIgnored() {
129+
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN);
130+
}
131+
132+
private <K, V> @Nullable ITransaction startTransaction(
133+
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
134+
try {
135+
final @Nullable TransactionContext continued = continueTrace(forkedScopes, record);
136+
if (!forkedScopes.getOptions().isTracingEnabled()) {
137+
return null;
138+
}
139+
140+
final @NotNull TransactionContext txContext =
141+
continued != null ? continued : new TransactionContext("queue.process", "queue.process");
142+
txContext.setName("queue.process");
143+
txContext.setOperation("queue.process");
144+
145+
final @NotNull TransactionOptions txOptions = new TransactionOptions();
146+
txOptions.setOrigin(TRACE_ORIGIN);
147+
txOptions.setBindToScope(true);
148+
149+
final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions);
150+
if (transaction.isNoOp()) {
151+
return null;
152+
}
153+
154+
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
155+
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
156+
157+
final @Nullable String messageId = headerValue(record, MESSAGE_ID_HEADER);
158+
if (messageId != null) {
159+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
160+
}
161+
162+
final int bodySize = record.serializedValueSize();
163+
if (bodySize >= 0) {
164+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize);
165+
}
166+
167+
final @Nullable Integer retryCount = retryCount(record);
168+
if (retryCount != null) {
169+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount);
170+
}
171+
172+
final @Nullable Long receiveLatency = receiveLatency(record);
173+
if (receiveLatency != null) {
174+
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, receiveLatency);
175+
}
176+
177+
return transaction;
178+
} catch (Throwable ignored) {
179+
return null;
180+
}
181+
}
182+
183+
private void finishTransaction(
184+
final @Nullable ITransaction transaction,
185+
final @NotNull SpanStatus status,
186+
final @Nullable Throwable throwable) {
187+
if (transaction == null || transaction.isNoOp()) {
188+
return;
189+
}
190+
191+
try {
192+
transaction.setStatus(status);
193+
if (throwable != null) {
194+
transaction.setThrowable(throwable);
195+
}
196+
transaction.finish();
197+
} catch (Throwable ignored) {
198+
// Instrumentation must never break customer processing.
199+
}
200+
}
201+
202+
private <K, V> @Nullable TransactionContext continueTrace(
203+
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
204+
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
205+
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
206+
final @Nullable List<String> baggageHeaders =
207+
baggage != null ? Collections.singletonList(baggage) : null;
208+
return forkedScopes.continueTrace(sentryTrace, baggageHeaders);
209+
}
210+
211+
private <K, V> @Nullable Integer retryCount(final @NotNull ConsumerRecord<K, V> record) {
212+
final @Nullable Header header = record.headers().lastHeader(DELIVERY_ATTEMPT_HEADER);
213+
if (header == null) {
214+
return null;
215+
}
216+
217+
final byte[] value = header.value();
218+
if (value == null || value.length != Integer.BYTES) {
219+
return null;
220+
}
221+
222+
final int attempt = ByteBuffer.wrap(value).getInt();
223+
if (attempt <= 0) {
224+
return null;
225+
}
226+
227+
return attempt - 1;
228+
}
229+
230+
private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
231+
final @Nullable String enqueuedTimeStr =
232+
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
233+
if (enqueuedTimeStr == null) {
234+
return null;
235+
}
236+
237+
try {
238+
final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr);
239+
final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis());
240+
final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000);
241+
return latencyMs >= 0 ? latencyMs : null;
242+
} catch (NumberFormatException ignored) {
243+
return null;
244+
}
245+
}
246+
247+
private <K, V> @Nullable String headerValue(
248+
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
249+
final @Nullable Header header = record.headers().lastHeader(headerName);
250+
if (header == null || header.value() == null) {
251+
return null;
252+
}
253+
return new String(header.value(), StandardCharsets.UTF_8);
254+
}
255+
}

0 commit comments

Comments
 (0)