Skip to content

Commit 67997a3

Browse files
Remove Guava Stopwatch from Kafka consumer.poll() loop
Replace Stopwatch (backed by System.nanoTime()) with lightweight System.currentTimeMillis() for timeout tracking in the hot consumer.poll() loop. The Stopwatch was adding more latency than it measured when Kafka had prefetched records ready to return immediately. This change keeps the updateSuccessfulRpcMetrics() calls and all shared metric infrastructure (KafkaMetrics, KafkaSinkMetrics) intact.
1 parent f47f81c commit 67997a3

2 files changed

Lines changed: 5 additions & 14 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
5454
import org.apache.beam.sdk.util.Preconditions;
5555
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
56-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
5756
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
5857
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
5958
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
@@ -411,7 +410,6 @@ public boolean offsetBasedDeduplicationSupported() {
411410
private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
412411

413412
private KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
414-
private Stopwatch stopwatch = Stopwatch.createUnstarted();
415413

416414
private Set<String> kafkaTopics;
417415

@@ -580,13 +578,12 @@ private void consumerPollLoop() {
580578
while (!closed.get()) {
581579
try {
582580
if (records.isEmpty()) {
583-
stopwatch.start();
581+
long startMillis = System.currentTimeMillis();
584582
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
585-
stopwatch.stop();
583+
long elapsedMillis = System.currentTimeMillis() - startMillis;
586584
for (String kafkaTopic : kafkaTopics) {
587585
kafkaResults.updateSuccessfulRpcMetrics(
588-
kafkaTopic,
589-
java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
586+
kafkaTopic, java.time.Duration.ofMillis(elapsedMillis));
590587
}
591588
} else if (availableRecordsQueue.offer(
592589
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5959
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
6060
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
61-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
6261
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
6362
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
6463
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
@@ -567,19 +566,14 @@ public ProcessContinuation processElement(
567566
long expectedOffset = tracker.currentRestriction().getFrom();
568567
consumer.resume(Collections.singleton(topicPartition));
569568
consumer.seek(topicPartition, expectedOffset);
570-
final Stopwatch pollTimer = Stopwatch.createUnstarted();
571569

572570
final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
573571
try {
574572
while (Duration.ZERO.compareTo(remainingTimeout) < 0) {
575-
// TODO: Remove this timer and use the existing fetch-latency-avg metric.
576-
// A consumer will often have prefetches waiting to be returned immediately in which case
577-
// this timer may contribute more latency than it measures.
578-
// See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
579-
pollTimer.reset().start();
573+
long startMillis = System.currentTimeMillis();
580574
// Fetch the next records.
581575
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(remainingTimeout);
582-
final Duration elapsed = pollTimer.elapsed();
576+
final Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - startMillis);
583577
try {
584578
remainingTimeout = remainingTimeout.minus(elapsed);
585579
} catch (ArithmeticException e) {

0 commit comments

Comments
 (0)