Skip to content

Commit 82cfc37

Browse files
adinauerclaude
andcommitted
feat(kafka): Add consumer demo to console sample
Show end-to-end Kafka queue tracing in the console sample by running a background consumer thread, producing a message, and waiting for consume before exit.\n\nAdd a no-arg constructor to SentryKafkaConsumerInterceptor so kafka-clients can instantiate it from interceptor.classes, and add test coverage for that constructor. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 007d27f commit 82cfc37

File tree

4 files changed

+101
-1
lines changed

4 files changed

+101
-1
lines changed

sentry-kafka/api/sentry-kafka.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ public final class io/sentry/kafka/BuildConfig {
55

66
public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
77
public static final field TRACE_ORIGIN Ljava/lang/String;
8+
public fun <init> ()V
89
public fun <init> (Lio/sentry/IScopes;)V
910
public fun close ()V
1011
public fun configure (Ljava/util/Map;)V

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.sentry.BaggageHeader;
44
import io.sentry.IScopes;
55
import io.sentry.ITransaction;
6+
import io.sentry.ScopesAdapter;
67
import io.sentry.SentryTraceHeader;
78
import io.sentry.SpanDataConvention;
89
import io.sentry.SpanStatus;
@@ -29,6 +30,10 @@ public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInter
2930

3031
private final @NotNull IScopes scopes;
3132

33+
public SentryKafkaConsumerInterceptor() {
34+
this(ScopesAdapter.getInstance());
35+
}
36+
3237
public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
3338
this.scopes = scopes;
3439
}

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package io.sentry.kafka
22

33
import io.sentry.IScopes
44
import io.sentry.ITransaction
5+
import io.sentry.Sentry
56
import io.sentry.SentryOptions
67
import io.sentry.TransactionContext
78
import io.sentry.TransactionOptions
9+
import io.sentry.test.initForTest
10+
import kotlin.test.AfterTest
11+
import kotlin.test.BeforeTest
812
import kotlin.test.Test
913
import kotlin.test.assertSame
1014
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -19,6 +23,20 @@ import org.mockito.kotlin.whenever
1923

2024
class SentryKafkaConsumerInterceptorTest {
2125

26+
@BeforeTest
27+
fun setup() {
28+
initForTest {
29+
it.dsn = "https://key@sentry.io/proj"
30+
it.isEnableQueueTracing = true
31+
it.tracesSampleRate = 1.0
32+
}
33+
}
34+
35+
@AfterTest
36+
fun teardown() {
37+
Sentry.close()
38+
}
39+
2240
@Test
2341
fun `does nothing when queue tracing is disabled`() {
2442
val scopes = mock<IScopes>()
@@ -64,6 +82,16 @@ class SentryKafkaConsumerInterceptorTest {
6482
interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
6583
}
6684

85+
@Test
86+
fun `no-arg constructor uses current scopes`() {
87+
val interceptor = SentryKafkaConsumerInterceptor<String, String>()
88+
val records = singleRecordBatch()
89+
90+
val result = interceptor.onConsume(records)
91+
92+
assertSame(records, result)
93+
}
94+
6795
private fun singleRecordBatch(): ConsumerRecords<String, String> {
6896
val partition = TopicPartition("my-topic", 0)
6997
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")

sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,27 @@
33
import io.sentry.*;
44
import io.sentry.clientreport.DiscardReason;
55
import io.sentry.jcache.SentryJCacheWrapper;
6+
import io.sentry.kafka.SentryKafkaConsumerInterceptor;
67
import io.sentry.kafka.SentryKafkaProducerInterceptor;
78
import io.sentry.protocol.Message;
89
import io.sentry.protocol.User;
10+
import java.time.Duration;
911
import java.util.Collections;
1012
import java.util.Properties;
13+
import java.util.UUID;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.TimeUnit;
1116
import javax.cache.Cache;
1217
import javax.cache.CacheManager;
1318
import javax.cache.Caching;
1419
import javax.cache.configuration.MutableConfiguration;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecords;
22+
import org.apache.kafka.clients.consumer.KafkaConsumer;
1523
import org.apache.kafka.clients.producer.KafkaProducer;
1624
import org.apache.kafka.clients.producer.ProducerConfig;
1725
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.common.serialization.StringDeserializer;
1827
import org.apache.kafka.common.serialization.StringSerializer;
1928

2029
public class Main {
@@ -261,6 +270,10 @@ private static void captureMetrics() {
261270
}
262271

263272
private static void demonstrateKafkaTracing() {
273+
final String topic = "sentry-topic-console-sample";
274+
final CountDownLatch consumedLatch = new CountDownLatch(1);
275+
final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch);
276+
264277
final Properties producerProperties = new Properties();
265278
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
266279
producerProperties.put(
@@ -273,17 +286,70 @@ private static void demonstrateKafkaTracing() {
273286
final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo");
274287
try (ISentryLifecycleToken ignored = transaction.makeCurrent()) {
275288
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
276-
producer.send(new ProducerRecord<>("sentry-topic", "sentry-kafka sample message")).get();
289+
Thread.sleep(500);
290+
producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get();
277291
} catch (InterruptedException e) {
278292
Thread.currentThread().interrupt();
279293
} catch (Exception ignoredException) {
280294
// local broker may not be available when running the sample
281295
}
296+
297+
try {
298+
consumedLatch.await(5, TimeUnit.SECONDS);
299+
} catch (InterruptedException e) {
300+
Thread.currentThread().interrupt();
301+
}
282302
} finally {
303+
consumerThread.interrupt();
304+
try {
305+
consumerThread.join(1000);
306+
} catch (InterruptedException e) {
307+
Thread.currentThread().interrupt();
308+
}
283309
transaction.finish();
284310
}
285311
}
286312

313+
private static Thread startKafkaConsumerThread(
314+
final String topic, final CountDownLatch consumedLatch) {
315+
final Thread consumerThread =
316+
new Thread(
317+
() -> {
318+
final Properties consumerProperties = new Properties();
319+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
320+
consumerProperties.put(
321+
ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID());
322+
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
323+
consumerProperties.put(
324+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
325+
consumerProperties.put(
326+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
327+
StringDeserializer.class.getName());
328+
consumerProperties.put(
329+
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
330+
SentryKafkaConsumerInterceptor.class.getName());
331+
332+
try (KafkaConsumer<String, String> consumer =
333+
new KafkaConsumer<>(consumerProperties)) {
334+
consumer.subscribe(Collections.singletonList(topic));
335+
336+
while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) {
337+
final ConsumerRecords<String, String> records =
338+
consumer.poll(Duration.ofMillis(500));
339+
if (!records.isEmpty()) {
340+
consumedLatch.countDown();
341+
break;
342+
}
343+
}
344+
} catch (Exception ignored) {
345+
// local broker may not be available when running the sample
346+
}
347+
},
348+
"sentry-kafka-sample-consumer");
349+
consumerThread.start();
350+
return consumerThread;
351+
}
352+
287353
private static class SomeEventProcessor implements EventProcessor {
288354
@Override
289355
public SentryEvent process(SentryEvent event, Hint hint) {

0 commit comments

Comments
 (0)