Skip to content

Commit 78061b8

Browse files
If the partition count or kafka IO size is large, then skip committin… (#37510)
* If the partition count or kafka IO size is large, then skip committing offsets that are not changed. Reduce kafka commit load * Address PR review feedback for idle partition optimization - Refactor commitCheckpointMark to use Java streams (per @johnjcasey) Changed from explicit for-loop to streams-based filtering for better code consistency with existing patterns - Add debug logging for idle partitions (per @tomstepp) Log the count of idle partitions skipped during each commit to aid in monitoring and debugging the optimization - Implement time-based periodic commits (per @tomstepp) Track last commit time per partition and ensure commits happen at least every 10 minutes even for idle partitions. This supports time lag monitoring use cases where customers track time since last commit. - Add unit test for idle partition behavior (per @tomstepp) New test KafkaUnboundedReaderIdlePartitionTest verifies that: * Idle partitions are not committed repeatedly * Active partitions trigger commits correctly * Uses mock consumer to track commit calls All changes maintain backward compatibility and follow Apache Beam coding standards (spotless formatting applied). * Fix test to follow Beam patterns for MockConsumer initialization Rewrote KafkaUnboundedReaderIdlePartitionTest to follow the exact pattern used in KafkaIOTest.java: - Proper MockConsumer initialization with partition metadata - Correct setup of beginning/end offsets - Consumer records with proper offsets and timestamps - schedulePollTask for record enqueueing based on position - Override commitSync to track commit calls - Use reader.start() before reader.advance() This ensures the test properly initializes the Kafka consumer and doesn't fail with IllegalStateException during source.split(). --------- Co-authored-by: Kishore Pola <kpola@paloaltonetworks.com>
1 parent 927ee2c commit 78061b8

2 files changed

Lines changed: 361 additions & 29 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.Comparator;
26+
import java.util.HashMap;
2627
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
@@ -79,6 +80,11 @@
7980
*/
8081
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
8182

83+
// Track last successfully committed offsets to suppress no-op commits for idle partitions.
84+
private final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
85+
// Track last commit time per partition to ensure periodic commits for time lag monitoring.
86+
private final Map<TopicPartition, Instant> lastCommitTimes = new HashMap<>();
87+
8288
///////////////////// Reader API ////////////////////////////////////////////////////////////
8389
@SuppressWarnings("FutureReturnValueIgnored")
8490
@Override
@@ -375,6 +381,8 @@ public boolean offsetBasedDeduplicationSupported() {
375381
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = Duration.millis(20);
376382
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
377383
private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL = Duration.standardMinutes(10);
384+
// Maximum time between commits for idle partitions (for time lag monitoring).
385+
private static final Duration MAX_IDLE_COMMIT_INTERVAL = Duration.standardMinutes(10);
378386

379387
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
380388
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
@@ -611,37 +619,88 @@ private void consumerPollLoop() {
611619

612620
private void commitCheckpointMark() {
613621
KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);
614-
if (checkpointMark != null) {
615-
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
616-
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
617-
Instant now = Instant.now();
622+
if (checkpointMark == null) {
623+
return;
624+
}
618625

619-
try {
620-
consumer.commitSync(
621-
checkpointMark.getPartitions().stream()
622-
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
623-
.collect(
624-
Collectors.toMap(
625-
p -> new TopicPartition(p.getTopic(), p.getPartition()),
626-
p -> new OffsetAndMetadata(p.getNextOffset()))));
626+
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
627+
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
628+
Instant now = Instant.now();
629+
630+
try {
631+
// Commit only partitions whose offsets have advanced since the last successful commit
632+
// for this reader, or partitions that haven't been committed within MAX_IDLE_COMMIT_INTERVAL.
633+
// This suppresses no-op commits for idle partitions while ensuring periodic commits
634+
// for time lag monitoring.
635+
Map<TopicPartition, OffsetAndMetadata> toCommit =
636+
checkpointMark.getPartitions().stream()
637+
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
638+
.filter(
639+
p -> {
640+
TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition());
641+
Long prev = lastCommittedOffsets.get(tp);
642+
long next = p.getNextOffset();
643+
Instant lastCommitTime = lastCommitTimes.get(tp);
644+
645+
// Commit if offset has advanced
646+
if (prev == null || next > prev) {
647+
return true;
648+
}
649+
650+
// Also commit if partition hasn't been committed within max idle interval
651+
if (lastCommitTime == null
652+
|| now.isAfter(lastCommitTime.plus(MAX_IDLE_COMMIT_INTERVAL))) {
653+
return true;
654+
}
655+
656+
return false;
657+
})
658+
.collect(
659+
Collectors.toMap(
660+
p -> new TopicPartition(p.getTopic(), p.getPartition()),
661+
p -> new OffsetAndMetadata(p.getNextOffset())));
662+
663+
int totalPartitions = checkpointMark.getPartitions().size();
664+
int idlePartitions = totalPartitions - toCommit.size();
665+
if (idlePartitions > 0) {
666+
LOG.debug(
667+
"{}: Skipping commit for {} idle partitions ({} of {} partitions active)",
668+
this,
669+
idlePartitions,
670+
toCommit.size(),
671+
totalPartitions);
672+
}
673+
674+
if (toCommit.isEmpty()) {
675+
// Nothing advanced since last successful commit; avoid noisy commitSync().
676+
return;
677+
}
678+
679+
consumer.commitSync(toCommit);
680+
681+
// Only update after a successful commit.
682+
for (Map.Entry<TopicPartition, OffsetAndMetadata> e : toCommit.entrySet()) {
683+
lastCommittedOffsets.put(e.getKey(), e.getValue().offset());
684+
lastCommitTimes.put(e.getKey(), now);
685+
}
686+
687+
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
688+
} catch (Exception e) {
689+
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
690+
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
691+
if (now.isAfter(nextAllowedCommitFailLogTime)) {
692+
LOG.warn(
693+
String.format(
694+
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
695+
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
696+
e);
627697
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
628-
} catch (Exception e) {
629-
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
630-
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
631-
if (now.isAfter(nextAllowedCommitFailLogTime)) {
632-
LOG.warn(
633-
String.format(
634-
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
635-
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
636-
e);
637-
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
638-
} else {
639-
LOG.info(
640-
String.format(
641-
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
642-
this, checkpointMark),
643-
e);
644-
}
698+
} else {
699+
LOG.info(
700+
String.format(
701+
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
702+
this, checkpointMark),
703+
e);
645704
}
646705
}
647706
}

0 commit comments

Comments
 (0)