Skip to content

Commit 63bb76c

Browse files
adinauerclaude
andcommitted
docs(java): Add Kotlin Kafka snippets
Add Kotlin variants for Kafka retry-count setup and consumer tracing examples. Update custom queue instrumentation docs with Kotlin examples and clarify Kafka queue instrumentation wording. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 2e2f2f4 commit 63bb76c

2 files changed

Lines changed: 131 additions & 7 deletions

File tree

docs/platforms/java/common/integrations/kafka.mdx

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: Kafka Integration
33
description: "Learn how to trace Kafka queue operations with Sentry."
44
---
55

6-
Sentry's Kafka integration lets you trace both production and consumption. In Spring Boot, this happens automatically. If you're using raw `kafka-clients`, you'll need to instrument producers and consumers with `sentry-kafka`.
6+
Sentry's Kafka integration lets you trace both production and consumption. In Spring Boot, this happens automatically. If you're using raw `kafka-clients`, you'll need to instrument producers and consumers with `sentry-kafka`.
77

88
Once configured, queue spans will appear in Sentry's [Queues dashboard](https://sentry.io/orgredirect/organizations/:orgslug/insights/backend/queues/).
99

@@ -157,6 +157,25 @@ ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, O
157157
}
158158
```
159159

160+
```kotlin {tabTitle:Kotlin}
161+
import org.springframework.context.annotation.Bean
162+
import org.springframework.kafka.config.ContainerCustomizer
163+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
164+
import org.springframework.kafka.listener.DefaultErrorHandler
165+
import org.springframework.util.backoff.FixedBackOff
166+
167+
@Bean
168+
fun kafkaErrorHandler(): DefaultErrorHandler {
169+
return DefaultErrorHandler(FixedBackOff(0L, 1L))
170+
}
171+
172+
@Bean
173+
fun kafkaContainerCustomizer() =
174+
ContainerCustomizer<Any, Any, ConcurrentMessageListenerContainer<Any, Any>> { container ->
175+
container.containerProperties.setDeliveryAttemptHeader(true)
176+
}
177+
```
178+
160179
Without that header, Sentry does not set `messaging.message.retry.count`.
161180

162181
</PlatformSection>
@@ -183,7 +202,7 @@ implementation 'io.sentry:sentry-kafka:{{@inject packages.version('sentry.java.k
183202
libraryDependencies += "io.sentry" % "sentry-kafka" % "{{@inject packages.version('sentry.java.kafka', '8.37.0') }}"
184203
```
185204

186-
For other dependency managers, check out the [central Maven repository](https://search.maven.org/artifact/io.sentry/sentry-kafka).
205+
For other dependency managers, use the same Maven coordinates: `io.sentry:sentry-kafka`.
187206

188207
### Configure
189208

@@ -286,6 +305,17 @@ SentryKafkaConsumerTracing.withTracing(record, () -> {
286305
});
287306
```
288307

308+
```kotlin {tabTitle:Kotlin (Callable)}
309+
import java.util.concurrent.Callable
310+
311+
SentryKafkaConsumerTracing.withTracing(
312+
record,
313+
Callable {
314+
processOrder(record.value())
315+
},
316+
)
317+
```
318+
289319
</PlatformSection>
290320

291321
## Span Data

docs/platforms/java/common/tracing/instrumentation/custom-instrumentation/queues-module.mdx

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ sidebar_order: 3000
44
description: "Learn how to manually instrument your code to use Sentry's Queues module. "
55
---
66

7-
Sentry comes with automatic instrumentation for the most common messaging queue systems. If you're using Kafka, see the <PlatformLink to="/integrations/kafka/">Kafka integration</PlatformLink> — it instruments producers and consumers automatically without code changes. For other queue systems that aren't supported yet, you can still instrument custom spans and transactions around your queue producers and consumers to ensure that you have performance data about your messaging queues.
7+
Sentry provides queue instrumentation for Kafka. If you're using Kafka, see the <PlatformLink to="/integrations/kafka/">Kafka integration</PlatformLink> for the supported automatic and manual setup paths. For other queue systems, you can instrument custom spans and transactions around your queue producers and consumers to capture performance data about your messaging queues.
88

99
## Producer Instrumentation
1010

@@ -20,7 +20,7 @@ Your `queue.publish` span must exist inside a transaction in order to be recogni
2020

2121
You must also include trace headers in your message so that your consumers can continue your trace once your message is picked up.
2222

23-
```java
23+
```java {tabTitle:Java}
2424
MyCustomQueue connection = MyCustomQueue.connect();
2525

2626
// The message you want to send to the queue
@@ -62,6 +62,48 @@ try {
6262
}
6363
```
6464

65+
```kotlin {tabTitle:Kotlin}
66+
val connection = MyCustomQueue.connect()
67+
68+
// The message you want to send to the queue
69+
val queue = "messages"
70+
val message = "Hello World!"
71+
val messageId = "abc123"
72+
73+
// Create transaction
74+
val transaction = Sentry.startTransaction("queue_producer_transaction", "function")
75+
76+
try {
77+
val traceparent = Sentry.getTraceparent()
78+
val baggage = Sentry.getBaggage()
79+
80+
val span = transaction.startChild("queue.publish", "queue_producer")
81+
try {
82+
// Set span data
83+
span.setData("messaging.message.id", messageId)
84+
span.setData("messaging.destination.name", queue)
85+
span.setData("messaging.message.body.size", message.toByteArray(StandardCharsets.UTF_8).size)
86+
87+
// Publish the message to the queue (including current time stamp)
88+
val now = Instant.now().epochSecond
89+
90+
connection.publish(queue, message, now, traceparent, baggage)
91+
} catch (e: Exception) {
92+
span.setThrowable(e)
93+
span.setStatus(SpanStatus.INTERNAL_ERROR)
94+
throw e
95+
} finally {
96+
span.finish()
97+
}
98+
} catch (e: Exception) {
99+
transaction.setThrowable(e)
100+
transaction.setStatus(SpanStatus.INTERNAL_ERROR)
101+
throw e
102+
} finally {
103+
transaction.finish()
104+
}
105+
```
106+
65107
## Consumer Instrumentation
66108

67109
To start capturing performance metrics, use the `startChild()` function to wrap your queue consumers. Your span `op` must be set to `queue.process`. Include the following span data to enrich your consumer spans with queue metrics:
@@ -78,7 +120,7 @@ Your `queue.process` span must exist inside a transaction in order to be recogni
78120

79121
Use `Sentry.continueTrace()` to connect your consumer spans to their associated producer spans, and `setStatus()` to mark the trace of your message as success or failed.
80122

81-
```java
123+
```java {tabTitle:Java}
82124
MyCustomQueue connection = MyCustomQueue.connect();
83125

84126
// Pick up message from queues
@@ -90,18 +132,21 @@ Instant now = Instant.now();
90132
Instant messageTime = Instant.ofEpochSecond(message.getTimestamp());
91133
Duration latency = Duration.between(messageTime, now);
92134

135+
// Extract the Sentry trace headers from the message
136+
SentryTraceHeader sentryTraceHeader = message.getSentryTraceHeader();
137+
BaggageHeader baggageHeader = message.getBaggageHeader();
138+
93139
// Create transaction
94140
final TransactionContext transactionContext = Sentry.continueTrace(sentryTraceHeader, baggageHeader);
95141
ITransaction transaction = Sentry.startTransaction(transactionContext, "queue_consumer_transaction", "function");
96142

97143
try {
98-
ISpan span = transaction.startChild("queue.process", "queue_consumer")
144+
ISpan span = transaction.startChild("queue.process", "queue_consumer");
99145

100146
try {
101147
// Set span data
102148
span.setData("messaging.message.id", message.getMessageId());
103149
span.setData("messaging.destination.name", queue);
104-
//
105150
span.setData("messaging.message.body.size", message.getBody().getBytes(StandardCharsets.UTF_8).length);
106151
span.setData("messaging.message.receive.latency", latency.toMillis());
107152
span.setData("messaging.message.retry.count", 0);
@@ -123,3 +168,52 @@ try {
123168
transaction.finish();
124169
}
125170
```
171+
172+
```kotlin {tabTitle:Kotlin}
173+
val connection = MyCustomQueue.connect()
174+
175+
// Pick up message from queues
176+
val queue = "messages"
177+
val message = connection.consume(queue)
178+
179+
// Calculate latency (optional, but valuable)
180+
val now = Instant.now()
181+
val messageTime = Instant.ofEpochSecond(message.getTimestamp())
182+
val latency = Duration.between(messageTime, now)
183+
184+
// Extract the Sentry trace headers from the message
185+
val sentryTraceHeader = message.getSentryTraceHeader()
186+
val baggageHeader = message.getBaggageHeader()
187+
188+
// Create transaction
189+
val transactionContext = Sentry.continueTrace(sentryTraceHeader, baggageHeader)
190+
val transaction = Sentry.startTransaction(transactionContext, "queue_consumer_transaction", "function")
191+
192+
try {
193+
val span = transaction.startChild("queue.process", "queue_consumer")
194+
195+
try {
196+
// Set span data
197+
span.setData("messaging.message.id", message.getMessageId())
198+
span.setData("messaging.destination.name", queue)
199+
span.setData("messaging.message.body.size", message.getBody().toByteArray(StandardCharsets.UTF_8).size)
200+
span.setData("messaging.message.receive.latency", latency.toMillis())
201+
span.setData("messaging.message.retry.count", 0)
202+
203+
// Process the message
204+
processMessage(message)
205+
} catch (e: Exception) {
206+
span.setThrowable(e)
207+
span.setStatus(SpanStatus.INTERNAL_ERROR)
208+
throw e
209+
} finally {
210+
span.finish()
211+
}
212+
} catch (e: Exception) {
213+
transaction.setThrowable(e)
214+
transaction.setStatus(SpanStatus.INTERNAL_ERROR)
215+
throw e
216+
} finally {
217+
transaction.finish()
218+
}
219+
```

0 commit comments

Comments
 (0)