@@ -28,7 +28,7 @@ private KafkaShowcase() {}
2828
2929 public static void demonstrate (final String bootstrapServers ) {
3030 final CountDownLatch consumedLatch = new CountDownLatch (1 );
31- final Thread consumerThread = startKafkaConsumerThread (TOPIC , bootstrapServers , consumedLatch );
31+ final Thread consumerThread = startKafkaConsumerThread (bootstrapServers , consumedLatch );
3232
3333 final Properties producerProperties = getProducerProperties (bootstrapServers );
3434
@@ -60,7 +60,7 @@ public static void demonstrate(final String bootstrapServers) {
6060 }
6161
6262 private static Thread startKafkaConsumerThread (
63- final String topic , final String bootstrapServers , final CountDownLatch consumedLatch ) {
63+ final String bootstrapServers , final CountDownLatch consumedLatch ) {
6464 final Thread consumerThread =
6565 new Thread (
6666 () -> {
@@ -90,6 +90,10 @@ private static Thread startKafkaConsumerThread(
9090 private static Properties getConsumerProperties (String bootstrapServers ) {
9191 final Properties consumerProperties = new Properties ();
9292
93+ consumerProperties .put (
94+ ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG ,
95+ SentryKafkaConsumerInterceptor .class .getName ());
96+
9397 consumerProperties .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
9498 consumerProperties .put (
9599 ConsumerConfig .GROUP_ID_CONFIG , "sentry-console-sample-" + UUID .randomUUID ());
@@ -99,9 +103,6 @@ private static Properties getConsumerProperties(String bootstrapServers) {
99103 consumerProperties .put (
100104 ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
101105 StringDeserializer .class .getName ());
102- consumerProperties .put (
103- ConsumerConfig .INTERCEPTOR_CLASSES_CONFIG ,
104- SentryKafkaConsumerInterceptor .class .getName ());
105106 consumerProperties .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 2000 );
106107 consumerProperties .put (ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
107108
@@ -111,13 +112,14 @@ private static Properties getConsumerProperties(String bootstrapServers) {
111112 private static Properties getProducerProperties (String bootstrapServers ) {
112113 final Properties producerProperties = new Properties ();
113114
115+ producerProperties .put (
116+ ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
117+
114118 producerProperties .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
115119 producerProperties .put (
116120 ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
117121 producerProperties .put (
118122 ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
119- producerProperties .put (
120- ProducerConfig .INTERCEPTOR_CLASSES_CONFIG , SentryKafkaProducerInterceptor .class .getName ());
121123 producerProperties .put (ProducerConfig .MAX_BLOCK_MS_CONFIG , 2000 );
122124 producerProperties .put (ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 2000 );
123125 producerProperties .put (ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3000 );
0 commit comments