Skip to content

Commit fa956d8

Browse files
astubbsclaude
andcommitted
fix: make committed offset accurate on partition assignment (confluentinc#893)
Cherry-pick of confluentinc#893 (author: sangreal). Race condition in PartitionState: createOffsetAndMetadata() called tryToEncodeOffsets() and getOffsetToCommit() separately. Between the two calls, incompletes could drain, causing a higher offset to be committed than intended. After rebalance, the consumer fetches a non-existent offset and triggers auto.offset.reset (data loss or replay). Fix: tryToEncodeOffsets() now calls getOffsetToCommit() once at the top and returns a Tuple<Optional<String>, Long> so the offset and payload are computed atomically from the same state snapshot. Upstream PR: confluentinc#893 Approved by Roman Kolesnev, run in production for >1 week. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 37e57fb commit fa956d8

1 file changed

Lines changed: 13 additions & 8 deletions

File tree

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Copyright (C) 2020-2024 Confluent, Inc.
55
*/
66

7+
import io.confluent.parallelconsumer.ParallelConsumer;
78
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
89
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
910
import io.confluent.parallelconsumer.internal.PCModule;
@@ -410,8 +411,11 @@ public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
410411

411412
// visible for testing
412413
protected OffsetAndMetadata createOffsetAndMetadata() {
413-
Optional<String> payloadOpt = tryToEncodeOffsets();
414-
long nextOffset = getOffsetToCommit();
414+
// use tuple to make sure getOffsetToCommit is invoked only once to avoid dirty read
415+
// and commit the wrong offset
416+
ParallelConsumer.Tuple<Optional<String>, Long> tuple = tryToEncodeOffsets();
417+
Optional<String> payloadOpt = tuple.getLeft();
418+
long nextOffset = tuple.getRight();
415419
return payloadOpt
416420
.map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets))
417421
.orElseGet(() -> new OffsetAndMetadata(nextOffset));
@@ -481,29 +485,30 @@ public long getOffsetHighestSequentialSucceeded() {
481485
*
482486
* @return if possible, the String encoded offset map
483487
*/
484-
private Optional<String> tryToEncodeOffsets() {
488+
private ParallelConsumer.Tuple<Optional<String>, Long> tryToEncodeOffsets() {
489+
long offsetOfNextExpectedMessage = getOffsetToCommit();
490+
485491
if (incompleteOffsets.isEmpty()) {
486492
setAllowedMoreRecords(true);
487-
return empty();
493+
return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage);
488494
}
489495

490496
try {
491497
// todo refactor use of null shouldn't be needed. Is OffsetMapCodecManager stateful? remove null #233
492-
long offsetOfNextExpectedMessage = getOffsetToCommit();
493498
var offsetRange = getOffsetHighestSucceeded() - offsetOfNextExpectedMessage;
494499
String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
495500
ratioPayloadUsedDistributionSummary.record(offsetMapPayload.length() / (double) offsetRange);
496501
ratioMetadataSpaceUsedDistributionSummary.record(offsetMapPayload.length() / (double) OffsetMapCodecManager.DefaultMaxMetadataSize);
497502
boolean mustStrip = updateBlockFromEncodingResult(offsetMapPayload);
498503
if (mustStrip) {
499-
return empty();
504+
return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage);
500505
} else {
501-
return of(offsetMapPayload);
506+
return ParallelConsumer.Tuple.pairOf(of(offsetMapPayload), offsetOfNextExpectedMessage);
502507
}
503508
} catch (NoEncodingPossibleException e) {
504509
setAllowedMoreRecords(false);
505510
log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
506-
return empty();
511+
return ParallelConsumer.Tuple.pairOf(empty(), offsetOfNextExpectedMessage);
507512
}
508513
}
509514

0 commit comments

Comments
 (0)