Skip to content

Commit 98eac93

Browse files
adinauerclaudecoolguyzone
authored
docs(java): Add Kafka queue tracing docs (#17503)
Add Kafka queue tracing docs for the Java SDK This adds a dedicated Kafka integration page for Java, covers both the Spring Boot and raw `kafka-clients` paths, and links the generic queues docs to the new page. I kept this as a dedicated integration page instead of expanding the generic queues page so the Kafka-specific setup, caveats, and examples stay in one reviewable place. This branch is written against the Java SDK PR collection branch `feat/queue-instrumentation` in `getsentry/sentry-java` (`getsentry/sentry-java#5249`). The docs also call out the current SDK caveats around Spring producer spans and Sentry OpenTelemetry integrations so the behavior matches that collection branch. --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Alex Krawiec <alex.krawiec@sentry.io>
1 parent a53ebab commit 98eac93

3 files changed

Lines changed: 455 additions & 19 deletions

File tree

docs/platforms/java/common/configuration/options.mdx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,12 @@ Whether cache operations (`get`, `put`, `remove`, `flush`) should be traced. Whe
303303

304304
</SdkOption>
305305

306+
<SdkOption name="enableQueueTracing" type="boolean" defaultValue="false" availableSince="8.41.0">
307+
308+
Whether Kafka queue operations should be traced. When enabled, the SDK creates `queue.publish` spans for producer sends and `queue.process` transactions for consumer record processing. See the <PlatformLink to="/integrations/kafka/">Kafka integration</PlatformLink> for setup instructions.
309+
310+
</SdkOption>
311+
306312
## Profiling Options
307313

308314
<SdkOption name="profileSessionSampleRate" type="float" availableSince="8.23.0">
@@ -318,4 +324,4 @@ Whether the continuous profiling lifecycle is controlled manually or based on th
318324
- `manual`: **default** Profiler must be started and stopped through `Sentry.startProfiler()` and `Sentry.stopProfiler()` APIs
319325
- `trace`: Profiler is started and stopped automatically whenever a sampled trace starts or finishes
320326

321-
</SdkOption>
327+
</SdkOption>
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
---
2+
title: Kafka Integration
3+
description: "Learn how to trace Kafka queue operations with Sentry."
4+
---
5+
6+
Sentry's Kafka integration lets you trace both production and consumption. In Spring Boot, this happens automatically. If you're using raw `kafka-clients`, you'll need to instrument producers and consumers with `sentry-kafka`.
7+
8+
Once configured, queue spans will appear in Sentry's [Queues dashboard](https://sentry.io/orgredirect/organizations/:orgslug/insights/backend/queues/).
9+
10+
Kafka queue tracing is available in Sentry Java SDK version `8.41.0` and later.
11+
12+
<PlatformSection supported={["java.spring-boot"]}>
13+
14+
If you're using Spring Kafka (`KafkaTemplate` / `@KafkaListener`), Sentry instruments your producers and consumers automatically. If you're using `kafka-clients` directly, see the [Java Kafka docs](/platforms/java/integrations/kafka/) for manual instrumentation with `sentry-kafka`.
15+
16+
### Install
17+
18+
Add `sentry-kafka` alongside your existing Spring Kafka dependency:
19+
20+
```groovy {tabTitle:Gradle}
21+
implementation 'io.sentry:sentry-kafka:{{@inject packages.version('sentry.java.kafka', '8.41.0') }}'
22+
implementation 'org.springframework.kafka:spring-kafka'
23+
```
24+
25+
```xml {tabTitle:Maven}
26+
<dependency>
27+
<groupId>io.sentry</groupId>
28+
<artifactId>sentry-kafka</artifactId>
29+
<version>{{@inject packages.version('sentry.java.kafka', '8.41.0') }}</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.springframework.kafka</groupId>
33+
<artifactId>spring-kafka</artifactId>
34+
</dependency>
35+
```
36+
37+
### Configure
38+
39+
Enable queue tracing in your application properties:
40+
41+
```properties {2} {tabTitle:application.properties}
42+
sentry.dsn=___DSN___
43+
sentry.enable-queue-tracing=true
44+
sentry.traces-sample-rate=1.0
45+
```
46+
47+
```yaml {3} {tabTitle:application.yml}
48+
sentry:
49+
dsn: ___DSN___
50+
enable-queue-tracing: true
51+
traces-sample-rate: 1.0
52+
```
53+
54+
Now every `KafkaTemplate.send(...)` call produces a `queue.publish` span if there's a transaction running, and record-based `@KafkaListener` methods produce `queue.process` transactions. Sentry injects propagation headers (`sentry-trace`, `baggage`) into outgoing records and continues the trace on the consumer side.
55+
56+
### Producer
57+
58+
```java {tabTitle:Java}
59+
import org.springframework.kafka.core.KafkaTemplate;
60+
import org.springframework.web.bind.annotation.GetMapping;
61+
import org.springframework.web.bind.annotation.RestController;
62+
63+
@RestController
64+
public class OrderController {
65+
66+
private final KafkaTemplate<String, String> kafkaTemplate;
67+
68+
public OrderController(KafkaTemplate<String, String> kafkaTemplate) {
69+
this.kafkaTemplate = kafkaTemplate;
70+
}
71+
72+
@GetMapping("/order")
73+
public String placeOrder() {
74+
// Sentry automatically records a queue.publish span for this send.
75+
kafkaTemplate.send("orders", "order-payload");
76+
return "ok";
77+
}
78+
}
79+
```
80+
81+
```kotlin {tabTitle:Kotlin}
82+
import org.springframework.kafka.core.KafkaTemplate
83+
import org.springframework.web.bind.annotation.GetMapping
84+
import org.springframework.web.bind.annotation.RestController
85+
86+
@RestController
87+
class OrderController(private val kafkaTemplate: KafkaTemplate<String, String>) {
88+
89+
@GetMapping("/order")
90+
fun placeOrder(): String {
91+
// Sentry automatically records a queue.publish span for this send.
92+
kafkaTemplate.send("orders", "order-payload")
93+
return "ok"
94+
}
95+
}
96+
```
97+
98+
### Consumer
99+
100+
```java {tabTitle:Java}
101+
import org.apache.kafka.clients.consumer.ConsumerRecord;
102+
import org.springframework.kafka.annotation.KafkaListener;
103+
import org.springframework.stereotype.Component;
104+
105+
@Component
106+
public class OrderConsumer {
107+
108+
@KafkaListener(topics = "orders", groupId = "order-group")
109+
public void onOrder(ConsumerRecord<String, String> record) {
110+
// Sentry automatically records a queue.process transaction for this method.
111+
processOrder(record.value());
112+
}
113+
}
114+
```
115+
116+
```kotlin {tabTitle:Kotlin}
117+
import org.apache.kafka.clients.consumer.ConsumerRecord
118+
import org.springframework.kafka.annotation.KafkaListener
119+
import org.springframework.stereotype.Component
120+
121+
@Component
122+
class OrderConsumer {
123+
124+
@KafkaListener(topics = "orders", groupId = "order-group")
125+
fun onOrder(record: ConsumerRecord<String, String>) {
126+
// Sentry automatically records a queue.process transaction for this method.
127+
processOrder(record.value())
128+
}
129+
}
130+
```
131+
132+
### Enable Retry Count
133+
134+
Sentry sets `messaging.message.retry.count` from Spring Kafka's
135+
`kafka_deliveryAttempt` header. Spring Kafka only adds this header when delivery
136+
attempt headers are enabled and the listener uses an error handler or after-rollback
137+
processor that supports delivery attempts.
138+
139+
The following example uses Spring Kafka's `DefaultErrorHandler`. If your app already
140+
uses retry handling that supports delivery attempts, keep your existing handler and
141+
enable the header on the listener container:
142+
143+
```java {tabTitle:Java}
144+
import org.springframework.context.annotation.Bean;
145+
import org.springframework.kafka.config.ContainerCustomizer;
146+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
147+
import org.springframework.kafka.listener.DefaultErrorHandler;
148+
import org.springframework.util.backoff.FixedBackOff;
149+
150+
@Bean
151+
DefaultErrorHandler kafkaErrorHandler() {
152+
return new DefaultErrorHandler(new FixedBackOff(0L, 1L));
153+
}
154+
155+
@Bean
156+
ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>
157+
kafkaContainerCustomizer() {
158+
return container -> container.getContainerProperties().setDeliveryAttemptHeader(true);
159+
}
160+
```
161+
162+
```kotlin {tabTitle:Kotlin}
163+
import org.springframework.context.annotation.Bean
164+
import org.springframework.kafka.config.ContainerCustomizer
165+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
166+
import org.springframework.kafka.listener.DefaultErrorHandler
167+
import org.springframework.util.backoff.FixedBackOff
168+
169+
@Bean
170+
fun kafkaErrorHandler(): DefaultErrorHandler {
171+
return DefaultErrorHandler(FixedBackOff(0L, 1L))
172+
}
173+
174+
@Bean
175+
fun kafkaContainerCustomizer() =
176+
ContainerCustomizer<Any, Any, ConcurrentMessageListenerContainer<Any, Any>> { container ->
177+
container.containerProperties.setDeliveryAttemptHeader(true)
178+
}
179+
```
180+
181+
Without that header, Sentry does not set `messaging.message.retry.count`.
182+
183+
</PlatformSection>
184+
185+
<PlatformSection notSupported={["java.spring-boot"]}>
186+
187+
For applications using `kafka-clients` directly (without Spring), use the `sentry-kafka` module. If you're using Kafka through Spring Boot, use the [Spring Boot Kafka docs](/platforms/java/guides/spring-boot/integrations/kafka/) instead.
188+
189+
### Install
190+
191+
```groovy {tabTitle:Gradle}
192+
implementation 'io.sentry:sentry-kafka:{{@inject packages.version('sentry.java.kafka', '8.41.0') }}'
193+
```
194+
195+
```xml {tabTitle:Maven}
196+
<dependency>
197+
<groupId>io.sentry</groupId>
198+
<artifactId>sentry-kafka</artifactId>
199+
<version>{{@inject packages.version('sentry.java.kafka', '8.41.0') }}</version>
200+
</dependency>
201+
```
202+
203+
```scala {tabTitle:SBT}
204+
libraryDependencies += "io.sentry" % "sentry-kafka" % "{{@inject packages.version('sentry.java.kafka', '8.41.0') }}"
205+
```
206+
207+
For other dependency managers, use the same Maven coordinates: `io.sentry:sentry-kafka`.
208+
209+
### Configure
210+
211+
Enable queue tracing when initializing Sentry:
212+
213+
```java {4} {tabTitle:Java}
214+
Sentry.init(options -> {
215+
options.setDsn("___DSN___");
216+
options.setTracesSampleRate(1.0);
217+
options.setEnableQueueTracing(true);
218+
});
219+
```
220+
221+
```kotlin {4} {tabTitle:Kotlin}
222+
Sentry.init { options ->
223+
options.dsn = "___DSN___"
224+
options.tracesSampleRate = 1.0
225+
options.isEnableQueueTracing = true
226+
}
227+
```
228+
229+
### Instrument the Producer
230+
231+
Wrap your `KafkaProducer` with `SentryKafkaProducer.wrap()`. Every `send()` call then records a `queue.publish` span and injects Sentry propagation headers into the record.
232+
233+
```java {7} {tabTitle:Java}
234+
import io.sentry.kafka.SentryKafkaProducer;
235+
import org.apache.kafka.clients.producer.KafkaProducer;
236+
import org.apache.kafka.clients.producer.Producer;
237+
import org.apache.kafka.clients.producer.ProducerRecord;
238+
239+
KafkaProducer<String, String> rawProducer = new KafkaProducer<>(producerProps);
240+
Producer<String, String> producer = SentryKafkaProducer.wrap(rawProducer);
241+
242+
producer.send(new ProducerRecord<>("orders", "order-payload"));
243+
```
244+
245+
```kotlin {6} {tabTitle:Kotlin}
246+
import io.sentry.kafka.SentryKafkaProducer
247+
import org.apache.kafka.clients.producer.KafkaProducer
248+
import org.apache.kafka.clients.producer.ProducerRecord
249+
250+
val rawProducer = KafkaProducer<String, String>(producerProps)
251+
val producer = SentryKafkaProducer.wrap(rawProducer)
252+
253+
producer.send(ProducerRecord("orders", "order-payload"))
254+
```
255+
256+
A `queue.publish` span is created only when there is an active transaction in scope. Sentry trace headers are always injected (even without an active span) so the consumer can continue the trace.
257+
258+
### Instrument the Consumer
259+
260+
Wrap each record's processing callback with `SentryKafkaConsumerTracing.withTracing()`. This creates a `queue.process` transaction per record, continues the distributed trace from producer headers, and calculates receive latency automatically.
261+
262+
If you're also using OpenTelemetry Kafka instrumentation, don't instrument the same consumer callback with `withTracing()`. This helper is not automatically suppressed under OpenTelemetry today, so using both can create duplicate `queue.process` transactions.
263+
264+
```java {12-14} {tabTitle:Java}
265+
import io.sentry.kafka.SentryKafkaConsumerTracing;
266+
import org.apache.kafka.clients.consumer.ConsumerRecord;
267+
import org.apache.kafka.clients.consumer.ConsumerRecords;
268+
import org.apache.kafka.clients.consumer.KafkaConsumer;
269+
270+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
271+
consumer.subscribe(List.of("orders"));
272+
273+
while (running) {
274+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
275+
for (ConsumerRecord<String, String> record : records) {
276+
SentryKafkaConsumerTracing.withTracing(record, () -> {
277+
processOrder(record.value());
278+
});
279+
}
280+
}
281+
}
282+
```
283+
284+
```kotlin {tabTitle:Kotlin}
285+
import io.sentry.kafka.SentryKafkaConsumerTracing
286+
import org.apache.kafka.clients.consumer.KafkaConsumer
287+
288+
KafkaConsumer<String, String>(consumerProps).use { consumer ->
289+
consumer.subscribe(listOf("orders"))
290+
291+
while (running) {
292+
val records = consumer.poll(Duration.ofMillis(500))
293+
for (record in records) {
294+
SentryKafkaConsumerTracing.withTracing(record) {
295+
processOrder(record.value())
296+
}
297+
}
298+
}
299+
}
300+
```
301+
302+
Use the `Callable` overload when your processing code throws checked exceptions:
303+
304+
```java {tabTitle:Java (Callable)}
305+
SentryKafkaConsumerTracing.withTracing(record, () -> {
306+
return processOrder(record.value()); // can throw checked exceptions
307+
});
308+
```
309+
310+
```kotlin {tabTitle:Kotlin (Callable)}
311+
import java.util.concurrent.Callable
312+
313+
SentryKafkaConsumerTracing.withTracing(
314+
record,
315+
Callable {
316+
processOrder(record.value())
317+
},
318+
)
319+
```
320+
321+
</PlatformSection>
322+
323+
## Span Data
324+
325+
| Attribute | Type | Description |
326+
| ----------------------------------- | ------ | ---------------------------------------------------------------------------------------------------- |
327+
| `messaging.system` | string | Always `"kafka"` |
328+
| `messaging.destination.name` | string | Kafka topic name |
329+
| `messaging.message.id` | string | Value of the `messaging.message.id` record header, if present |
330+
| `messaging.message.body.size` | int | Serialized value size in bytes |
331+
| `messaging.message.retry.count` | int | Number of previous delivery attempts (from Kafka's `kafka_deliveryAttempt` header), if present |
332+
| `messaging.message.receive.latency` | int | Time in milliseconds between the producer sending the record and the consumer starting to process it |
333+
334+
## Limitations
335+
336+
- **Async listeners not supported.** `@KafkaListener` methods that return a `CompletableFuture` or `Mono`/`Flux` are not instrumented correctly; use synchronous listeners.
337+
- **Batch listeners not supported.** `@KafkaListener` methods that consume batches, such as `ConsumerRecords<?, ?>` or `List<ConsumerRecord<...>>`, are not instrumented yet.
338+
- **Spring Boot auto-instrumentation is disabled when using Sentry OpenTelemetry integrations.**

0 commit comments

Comments
 (0)