@@ -26,11 +26,11 @@ public final class KafkaShowcase {
2626
2727 private KafkaShowcase () {}
2828
29- public static void demonstrate (final String bootstrapServers ) {
29+ public static void runKafkaWithSentryInterceptors (final String bootstrapServers ) {
3030 final CountDownLatch consumedLatch = new CountDownLatch (1 );
31- final Thread consumerThread = startKafkaConsumerThread ( bootstrapServers , consumedLatch );
32-
33- final Properties producerProperties = getProducerProperties (bootstrapServers );
31+ final Thread consumerThread =
32+ startConsumerWithSentryInterceptor ( bootstrapServers , consumedLatch );
33+ final Properties producerProperties = createProducerPropertiesWithSentry (bootstrapServers );
3434
3535 final ITransaction transaction = Sentry .startTransaction ("kafka-demo" , "demo" );
3636 try (ISentryLifecycleToken ignored = transaction .makeCurrent ()) {
@@ -59,12 +59,55 @@ public static void demonstrate(final String bootstrapServers) {
5959 }
6060 }
6161
62- private static Thread startKafkaConsumerThread (
62+ public static Properties createProducerPropertiesWithSentry (final String bootstrapServers ) {
63+ final Properties producerProperties = new Properties ();
64+ producerProperties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
65+ producerProperties .put (
66+ ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
67+ producerProperties .put (
68+ ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
69+
70+ // Required for Sentry queue tracing in kafka-clients producer setup.
71+ producerProperties .put (
72+ ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
73+
74+ // Optional tuning for sample stability in CI/local runs.
75+ producerProperties .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , 2000 );
76+ producerProperties .put (ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
77+ producerProperties .put (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3000 );
78+
79+ return producerProperties ;
80+ }
81+
82+ public static Properties createConsumerPropertiesWithSentry (final String bootstrapServers ) {
83+ final Properties consumerProperties = new Properties ();
84+ consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
85+ consumerProperties .put (
86+ ConsumerConfig .GROUP_ID_CONFIG , "sentry-console-sample-" + UUID .randomUUID ());
87+ consumerProperties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
88+ consumerProperties .put (
89+ ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
90+ consumerProperties .put (
91+ ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
92+
93+ // Required for Sentry queue tracing in kafka-clients consumer setup.
94+ consumerProperties .put (
95+ ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaConsumerInterceptor .class .getName ());
96+
97+ // Optional tuning for sample stability in CI/local runs.
98+ consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
99+ consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
100+
101+ return consumerProperties ;
102+ }
103+
104+ private static Thread startConsumerWithSentryInterceptor (
63105 final String bootstrapServers , final CountDownLatch consumedLatch ) {
64106 final Thread consumerThread =
65107 new Thread (
66108 () -> {
67- final Properties consumerProperties = getConsumerProperties (bootstrapServers );
109+ final Properties consumerProperties =
110+ createConsumerPropertiesWithSentry (bootstrapServers );
68111
69112 try (KafkaConsumer <String , String > consumer =
70113 new KafkaConsumer <>(consumerProperties )) {
@@ -86,44 +129,4 @@ private static Thread startKafkaConsumerThread(
86129 consumerThread .start ();
87130 return consumerThread ;
88131 }
89-
90- private static Properties getConsumerProperties (String bootstrapServers ) {
91- final Properties consumerProperties = new Properties ();
92-
93- consumerProperties .put (
94- ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG ,
95- SentryKafkaConsumerInterceptor .class .getName ());
96-
97- consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
98- consumerProperties .put (
99- ConsumerConfig .GROUP_ID_CONFIG , "sentry-console-sample-" + UUID .randomUUID ());
100- consumerProperties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
101- consumerProperties .put (
102- ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
103- consumerProperties .put (
104- ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
105- StringDeserializer .class .getName ());
106- consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
107- consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
108-
109- return consumerProperties ;
110- }
111-
112- private static Properties getProducerProperties (String bootstrapServers ) {
113- final Properties producerProperties = new Properties ();
114-
115- producerProperties .put (
116- ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
117-
118- producerProperties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
119- producerProperties .put (
120- ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
121- producerProperties .put (
122- ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
123- producerProperties .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , 2000 );
124- producerProperties .put (ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
125- producerProperties .put (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3000 );
126-
127- return producerProperties ;
128- }
129132}
0 commit comments