|
3 | 3 | import io.sentry.*; |
4 | 4 | import io.sentry.clientreport.DiscardReason; |
5 | 5 | import io.sentry.jcache.SentryJCacheWrapper; |
6 | | -import io.sentry.kafka.SentryKafkaConsumerInterceptor; |
7 | | -import io.sentry.kafka.SentryKafkaProducerInterceptor; |
8 | 6 | import io.sentry.protocol.Message; |
9 | 7 | import io.sentry.protocol.User; |
10 | | -import java.time.Duration; |
11 | 8 | import java.util.Collections; |
12 | | -import java.util.Properties; |
13 | | -import java.util.UUID; |
14 | | -import java.util.concurrent.CountDownLatch; |
15 | | -import java.util.concurrent.TimeUnit; |
16 | 9 | import javax.cache.Cache; |
17 | 10 | import javax.cache.CacheManager; |
18 | 11 | import javax.cache.Caching; |
19 | 12 | import javax.cache.configuration.MutableConfiguration; |
20 | | -import org.apache.kafka.clients.consumer.ConsumerConfig; |
21 | | -import org.apache.kafka.clients.consumer.ConsumerRecords; |
22 | | -import org.apache.kafka.clients.consumer.KafkaConsumer; |
23 | | -import org.apache.kafka.clients.producer.KafkaProducer; |
24 | | -import org.apache.kafka.clients.producer.ProducerConfig; |
25 | | -import org.apache.kafka.clients.producer.ProducerRecord; |
26 | | -import org.apache.kafka.common.serialization.StringDeserializer; |
27 | | -import org.apache.kafka.common.serialization.StringSerializer; |
28 | 13 |
|
29 | 14 | public class Main { |
30 | 15 |
|
@@ -198,7 +183,7 @@ public static void main(String[] args) throws InterruptedException { |
198 | 183 | // |
199 | 184 | // This uses the native producer interceptor from sentry-kafka. |
200 | 185 | // If no local Kafka broker is available, this block exits quietly. |
201 | | - demonstrateKafkaTracing(); |
| 186 | + KafkaShowcase.demonstrate(); |
202 | 187 |
|
203 | 188 | // Performance feature |
204 | 189 | // |
@@ -269,87 +254,6 @@ private static void captureMetrics() { |
269 | 254 | Sentry.metrics().distribution("distributionMetric", 7.0); |
270 | 255 | } |
271 | 256 |
|
272 | | - private static void demonstrateKafkaTracing() { |
273 | | - final String topic = "sentry-topic-console-sample"; |
274 | | - final CountDownLatch consumedLatch = new CountDownLatch(1); |
275 | | - final Thread consumerThread = startKafkaConsumerThread(topic, consumedLatch); |
276 | | - |
277 | | - final Properties producerProperties = new Properties(); |
278 | | - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
279 | | - producerProperties.put( |
280 | | - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
281 | | - producerProperties.put( |
282 | | - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
283 | | - producerProperties.put( |
284 | | - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); |
285 | | - |
286 | | - final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); |
287 | | - try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { |
288 | | - try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) { |
289 | | - Thread.sleep(500); |
290 | | - producer.send(new ProducerRecord<>(topic, "sentry-kafka sample message")).get(); |
291 | | - } catch (InterruptedException e) { |
292 | | - Thread.currentThread().interrupt(); |
293 | | - } catch (Exception ignoredException) { |
294 | | - // local broker may not be available when running the sample |
295 | | - } |
296 | | - |
297 | | - try { |
298 | | - consumedLatch.await(5, TimeUnit.SECONDS); |
299 | | - } catch (InterruptedException e) { |
300 | | - Thread.currentThread().interrupt(); |
301 | | - } |
302 | | - } finally { |
303 | | - consumerThread.interrupt(); |
304 | | - try { |
305 | | - consumerThread.join(1000); |
306 | | - } catch (InterruptedException e) { |
307 | | - Thread.currentThread().interrupt(); |
308 | | - } |
309 | | - transaction.finish(); |
310 | | - } |
311 | | - } |
312 | | - |
313 | | - private static Thread startKafkaConsumerThread( |
314 | | - final String topic, final CountDownLatch consumedLatch) { |
315 | | - final Thread consumerThread = |
316 | | - new Thread( |
317 | | - () -> { |
318 | | - final Properties consumerProperties = new Properties(); |
319 | | - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); |
320 | | - consumerProperties.put( |
321 | | - ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); |
322 | | - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
323 | | - consumerProperties.put( |
324 | | - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); |
325 | | - consumerProperties.put( |
326 | | - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, |
327 | | - StringDeserializer.class.getName()); |
328 | | - consumerProperties.put( |
329 | | - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, |
330 | | - SentryKafkaConsumerInterceptor.class.getName()); |
331 | | - |
332 | | - try (KafkaConsumer<String, String> consumer = |
333 | | - new KafkaConsumer<>(consumerProperties)) { |
334 | | - consumer.subscribe(Collections.singletonList(topic)); |
335 | | - |
336 | | - while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { |
337 | | - final ConsumerRecords<String, String> records = |
338 | | - consumer.poll(Duration.ofMillis(500)); |
339 | | - if (!records.isEmpty()) { |
340 | | - consumedLatch.countDown(); |
341 | | - break; |
342 | | - } |
343 | | - } |
344 | | - } catch (Exception ignored) { |
345 | | - // local broker may not be available when running the sample |
346 | | - } |
347 | | - }, |
348 | | - "sentry-kafka-sample-consumer"); |
349 | | - consumerThread.start(); |
350 | | - return consumerThread; |
351 | | - } |
352 | | - |
353 | 257 | private static class SomeEventProcessor implements EventProcessor { |
354 | 258 | @Override |
355 | 259 | public SentryEvent process(SentryEvent event, Hint hint) { |
|
0 commit comments