@@ -30,17 +30,7 @@ public static void demonstrate(final String bootstrapServers) {
3030 final CountDownLatch consumedLatch = new CountDownLatch (1 );
3131 final Thread consumerThread = startKafkaConsumerThread (TOPIC , bootstrapServers , consumedLatch );
3232
33- final Properties producerProperties = new Properties ();
34- producerProperties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
35- producerProperties .put (
36- ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
37- producerProperties .put (
38- ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
39- producerProperties .put (
40- ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
41- producerProperties .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , 2000 );
42- producerProperties .put (ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
43- producerProperties .put (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3000 );
33+ final Properties producerProperties = getProducerProperties (bootstrapServers );
4434
4535 final ITransaction transaction = Sentry .startTransaction ("kafka-demo" , "demo" );
4636 try (ISentryLifecycleToken ignored = transaction .makeCurrent ()) {
@@ -74,21 +64,7 @@ private static Thread startKafkaConsumerThread(
7464 final Thread consumerThread =
7565 new Thread (
7666 () -> {
77- final Properties consumerProperties = new Properties ();
78- consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
79- consumerProperties .put (
80- ConsumerConfig .GROUP_ID_CONFIG , "sentry-console-sample-" + UUID .randomUUID ());
81- consumerProperties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
82- consumerProperties .put (
83- ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
84- consumerProperties .put (
85- ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
86- StringDeserializer .class .getName ());
87- consumerProperties .put (
88- ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG ,
89- SentryKafkaConsumerInterceptor .class .getName ());
90- consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
91- consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
67+ final Properties consumerProperties = getConsumerProperties (bootstrapServers );
9268
9369 try (KafkaConsumer <String , String > consumer =
9470 new KafkaConsumer <>(consumerProperties )) {
@@ -110,4 +86,42 @@ private static Thread startKafkaConsumerThread(
11086 consumerThread .start ();
11187 return consumerThread ;
11288 }
89+
90+ private static Properties getConsumerProperties (String bootstrapServers ) {
91+ final Properties consumerProperties = new Properties ();
92+
93+ consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
94+ consumerProperties .put (
95+ ConsumerConfig .GROUP_ID_CONFIG , "sentry-console-sample-" + UUID .randomUUID ());
96+ consumerProperties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
97+ consumerProperties .put (
98+ ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class .getName ());
99+ consumerProperties .put (
100+ ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
101+ StringDeserializer .class .getName ());
102+ consumerProperties .put (
103+ ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG ,
104+ SentryKafkaConsumerInterceptor .class .getName ());
105+ consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
106+ consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
107+
108+ return consumerProperties ;
109+ }
110+
111+ private static Properties getProducerProperties (String bootstrapServers ) {
112+ final Properties producerProperties = new Properties ();
113+
114+ producerProperties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
115+ producerProperties .put (
116+ ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
117+ producerProperties .put (
118+ ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
119+ producerProperties .put (
120+ ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
121+ producerProperties .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , 2000 );
122+ producerProperties .put (ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
123+ producerProperties .put (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3000 );
124+
125+ return producerProperties ;
126+ }
113127}
0 commit comments