Skip to content

Commit a4e21ac

Browse files
committed
fix comments
1 parent 33454d4 commit a4e21ac

3 files changed

Lines changed: 63 additions & 28 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,28 @@ private RowRangeIndex(List<Range> ranges) {
4343
}
4444

4545
public static RowRangeIndex create(List<Range> ranges) {
46+
return create(ranges, true);
47+
}
48+
49+
public static RowRangeIndex create(List<Range> ranges, boolean mergeAdjacent) {
4650
checkArgument(ranges != null, "Ranges cannot be null");
47-
return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, true));
51+
return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, mergeAdjacent));
4852
}
4953

5054
public List<Range> ranges() {
5155
return Collections.unmodifiableList(ranges);
5256
}
5357

58+
public boolean contains(long start, long end) {
59+
int candidate = lowerBound(ends, start);
60+
return candidate < starts.length && starts[candidate] <= start && ends[candidate] >= end;
61+
}
62+
63+
public boolean containsExactly(long start, long end) {
64+
int candidate = lowerBound(starts, start);
65+
return candidate < starts.length && starts[candidate] == start && ends[candidate] == end;
66+
}
67+
5468
public boolean intersects(long start, long end) {
5569
int candidate = lowerBound(ends, start);
5670
return candidate < starts.length && starts[candidate] <= end;

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.apache.paimon.types.RowType;
3838
import org.apache.paimon.utils.FileStorePathFactory;
3939
import org.apache.paimon.utils.Pair;
40+
import org.apache.paimon.utils.Range;
4041
import org.apache.paimon.utils.RangeHelper;
42+
import org.apache.paimon.utils.RowRangeIndex;
4143
import org.apache.paimon.utils.SnapshotManager;
4244

4345
import org.slf4j.Logger;
@@ -54,12 +56,9 @@
5456
import java.util.LinkedHashMap;
5557
import java.util.List;
5658
import java.util.Map;
57-
import java.util.Map.Entry;
58-
import java.util.NavigableMap;
5959
import java.util.Objects;
6060
import java.util.Optional;
6161
import java.util.Set;
62-
import java.util.TreeMap;
6362
import java.util.function.Function;
6463
import java.util.stream.Collectors;
6564

@@ -569,20 +568,22 @@ Optional<RuntimeException> checkRowIdExistence(
569568
return Optional.empty();
570569
}
571570

572-
NavigableMap<Long, Long> existingRanges = new TreeMap<>();
573-
for (SimpleFileEntry base : baseEntries) {
574-
if (base.firstRowId() != null && !dedicatedStorageFile(base.fileName())) {
575-
existingRanges.put(base.firstRowId(), base.rowCount());
576-
}
577-
}
571+
List<Range> existingRanges =
572+
baseEntries.stream()
573+
.filter(
574+
base ->
575+
base.firstRowId() != null
576+
&& !dedicatedStorageFile(base.fileName()))
577+
.map(SimpleFileEntry::nonNullRowIdRange)
578+
.collect(Collectors.toList());
579+
RowRangeIndex existingIndex = RowRangeIndex.create(existingRanges, false);
578580

579581
for (SimpleFileEntry entry : filesToCheck) {
582+
long rowRangeEnd = entry.firstRowId() + entry.rowCount() - 1;
580583
boolean exists =
581584
dedicatedStorageFile(entry.fileName())
582-
? rowIdRangeCovered(
583-
existingRanges, entry.firstRowId(), entry.rowCount())
584-
: rowIdRangeExists(
585-
existingRanges, entry.firstRowId(), entry.rowCount());
585+
? existingIndex.contains(entry.firstRowId(), rowRangeEnd)
586+
: existingIndex.containsExactly(entry.firstRowId(), rowRangeEnd);
586587
if (!exists) {
587588
return Optional.of(
588589
new RuntimeException(
@@ -601,20 +602,6 @@ Optional<RuntimeException> checkRowIdExistence(
601602
return Optional.empty();
602603
}
603604

604-
private static boolean rowIdRangeCovered(
605-
NavigableMap<Long, Long> ranges, long firstRowId, long rowCount) {
606-
Entry<Long, Long> range = ranges.floorEntry(firstRowId);
607-
return range != null
608-
&& range.getKey() <= firstRowId
609-
&& range.getKey() + range.getValue() >= firstRowId + rowCount;
610-
}
611-
612-
private static boolean rowIdRangeExists(
613-
NavigableMap<Long, Long> ranges, long firstRowId, long rowCount) {
614-
Long existingRowCount = ranges.get(firstRowId);
615-
return existingRowCount != null && existingRowCount == rowCount;
616-
}
617-
618605
private static boolean dedicatedStorageFile(String fileName) {
619606
return isBlobFile(fileName) || isVectorStoreFile(fileName);
620607
}

paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,23 @@ void testCheckRowIdExistenceBaseFileRewritten() {
419419
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
420420
}
421421

422+
@Test
423+
void testCheckRowIdExistenceNormalFileRejectsAdjacentDataFiles() {
424+
ConflictDetection detection = createConflictDetection();
425+
426+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
427+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L));
428+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L));
429+
430+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
431+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 4L));
432+
433+
Optional<RuntimeException> result =
434+
detection.checkRowIdExistence(baseEntries, deltaEntries, 4L);
435+
assertThat(result).isPresent();
436+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
437+
}
438+
422439
@Test
423440
void testCheckRowIdExistenceDedicatedFileCoveredByDataFiles() {
424441
ConflictDetection detection = createConflictDetection();
@@ -432,6 +449,23 @@ void testCheckRowIdExistenceDedicatedFileCoveredByDataFiles() {
432449
assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 4L)).isEmpty();
433450
}
434451

452+
@Test
453+
void testCheckRowIdExistenceDedicatedFileRejectsAdjacentDataFiles() {
454+
ConflictDetection detection = createConflictDetection();
455+
456+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
457+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L));
458+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L));
459+
460+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
461+
deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 4L));
462+
463+
Optional<RuntimeException> result =
464+
detection.checkRowIdExistence(baseEntries, deltaEntries, 4L);
465+
assertThat(result).isPresent();
466+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
467+
}
468+
435469
@Test
436470
void testCheckRowIdExistenceDedicatedFileRejectsRangeNotCoveredByOneDataFile() {
437471
ConflictDetection detection = createConflictDetection();

0 commit comments

Comments
 (0)