33import io .sentry .ISentryLifecycleToken ;
44import io .sentry .ITransaction ;
55import io .sentry .Sentry ;
6- import io .sentry .kafka .SentryKafkaConsumerInterceptor ;
6+ import io .sentry .kafka .SentryKafkaConsumerTracing ;
77import io .sentry .kafka .SentryKafkaProducerInterceptor ;
88import java .time .Duration ;
99import java .util .Collections ;
1212import java .util .concurrent .CountDownLatch ;
1313import java .util .concurrent .TimeUnit ;
1414import org .apache .kafka .clients .consumer .ConsumerConfig ;
15+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
1516import org .apache .kafka .clients .consumer .ConsumerRecords ;
1617import org .apache .kafka .clients .consumer .KafkaConsumer ;
1718import org .apache .kafka .clients .producer .KafkaProducer ;
@@ -26,10 +27,9 @@ public final class KafkaShowcase {
2627
2728 private KafkaShowcase () {}
2829
29- public static void runKafkaWithSentryInterceptors (final String bootstrapServers ) {
30+ public static void runKafkaWithSentryTracing (final String bootstrapServers ) {
3031 final CountDownLatch consumedLatch = new CountDownLatch (1 );
31- final Thread consumerThread =
32- startConsumerWithSentryInterceptor (bootstrapServers , consumedLatch );
32+ final Thread consumerThread = startConsumerWithSentryTracing (bootstrapServers , consumedLatch );
3333 final Properties producerProperties = createProducerPropertiesWithSentry (bootstrapServers );
3434
3535 final ITransaction transaction = Sentry .startTransaction ("kafka-demo" , "demo" );
@@ -79,7 +79,7 @@ public static Properties createProducerPropertiesWithSentry(final String bootstr
7979 return producerProperties ;
8080 }
8181
82- public static Properties createConsumerPropertiesWithSentry (final String bootstrapServers ) {
82+ public static Properties createConsumerProperties (final String bootstrapServers ) {
8383 final Properties consumerProperties = new Properties ();
8484 consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
8585 consumerProperties .put (
@@ -90,24 +90,19 @@ public static Properties createConsumerPropertiesWithSentry(final String bootstr
9090 consumerProperties .put (
9191 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
9292
93- // Required for Sentry queue tracing in kafka-clients consumer setup.
94- consumerProperties .put (
95- ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaConsumerInterceptor .class .getName ());
96-
9793 // Optional tuning for sample stability in CI/local runs.
9894 consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
9995 consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
10096
10197 return consumerProperties ;
10298 }
10399
104- private static Thread startConsumerWithSentryInterceptor (
100+ private static Thread startConsumerWithSentryTracing (
105101 final String bootstrapServers , final CountDownLatch consumedLatch ) {
106102 final Thread consumerThread =
107103 new Thread (
108104 () -> {
109- final Properties consumerProperties =
110- createConsumerPropertiesWithSentry (bootstrapServers );
105+ final Properties consumerProperties = createConsumerProperties (bootstrapServers );
111106
112107 try (KafkaConsumer <String , String > consumer =
113108 new KafkaConsumer <>(consumerProperties )) {
@@ -116,9 +111,11 @@ private static Thread startConsumerWithSentryInterceptor(
116111 while (!Thread .currentThread ().isInterrupted () && consumedLatch .getCount () > 0 ) {
117112 final ConsumerRecords <String , String > records =
118113 consumer .poll (Duration .ofMillis (500 ));
119- if (!records .isEmpty ()) {
120- consumedLatch .countDown ();
121- break ;
114+ for (final ConsumerRecord <String , String > record : records ) {
115+ SentryKafkaConsumerTracing .withTracing (record , consumedLatch ::countDown );
116+ if (consumedLatch .getCount () == 0 ) {
117+ break ;
118+ }
122119 }
123120 }
124121 } catch (Exception ignored ) {
0 commit comments