Skip to content

Commit 99e4d0c

Browse files
committed
fix comments
1 parent 3faf1c8 commit 99e4d0c

3 files changed

Lines changed: 61 additions & 28 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,28 @@ private RowRangeIndex(List<Range> ranges) {
4343
}
4444

4545
public static RowRangeIndex create(List<Range> ranges) {
46+
return create(ranges, true);
47+
}
48+
49+
public static RowRangeIndex create(List<Range> ranges, boolean mergeAdjacent) {
4650
checkArgument(ranges != null, "Ranges cannot be null");
47-
return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, true));
51+
return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, mergeAdjacent));
4852
}
4953

5054
public List<Range> ranges() {
5155
return Collections.unmodifiableList(ranges);
5256
}
5357

58+
public boolean contains(long start, long end) {
59+
int candidate = lowerBound(ends, start);
60+
return candidate < starts.length && starts[candidate] <= start && ends[candidate] >= end;
61+
}
62+
63+
public boolean containsExactly(long start, long end) {
64+
int candidate = lowerBound(starts, start);
65+
return candidate < starts.length && starts[candidate] == start && ends[candidate] == end;
66+
}
67+
5468
public boolean intersects(long start, long end) {
5569
int candidate = lowerBound(ends, start);
5670
return candidate < starts.length && starts[candidate] <= end;

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,9 @@
5757
import java.util.LinkedHashMap;
5858
import java.util.List;
5959
import java.util.Map;
60-
import java.util.Map.Entry;
61-
import java.util.NavigableMap;
6260
import java.util.Objects;
6361
import java.util.Optional;
6462
import java.util.Set;
65-
import java.util.TreeMap;
6663
import java.util.function.Function;
6764
import java.util.stream.Collectors;
6865

@@ -636,20 +633,22 @@ Optional<RuntimeException> checkRowIdExistence(
636633
return Optional.empty();
637634
}
638635

639-
NavigableMap<Long, Long> existingRanges = new TreeMap<>();
640-
for (SimpleFileEntry base : baseEntries) {
641-
if (base.firstRowId() != null && !dedicatedStorageFile(base.fileName())) {
642-
existingRanges.put(base.firstRowId(), base.rowCount());
643-
}
644-
}
636+
List<Range> existingRanges =
637+
baseEntries.stream()
638+
.filter(
639+
base ->
640+
base.firstRowId() != null
641+
&& !dedicatedStorageFile(base.fileName()))
642+
.map(SimpleFileEntry::nonNullRowIdRange)
643+
.collect(Collectors.toList());
644+
RowRangeIndex existingIndex = RowRangeIndex.create(existingRanges, false);
645645

646646
for (SimpleFileEntry entry : filesToCheck) {
647+
long rowRangeEnd = entry.firstRowId() + entry.rowCount() - 1;
647648
boolean exists =
648649
dedicatedStorageFile(entry.fileName())
649-
? rowIdRangeCovered(
650-
existingRanges, entry.firstRowId(), entry.rowCount())
651-
: rowIdRangeExists(
652-
existingRanges, entry.firstRowId(), entry.rowCount());
650+
? existingIndex.contains(entry.firstRowId(), rowRangeEnd)
651+
: existingIndex.containsExactly(entry.firstRowId(), rowRangeEnd);
653652
if (!exists) {
654653
return Optional.of(
655654
new RuntimeException(
@@ -668,20 +667,6 @@ Optional<RuntimeException> checkRowIdExistence(
668667
return Optional.empty();
669668
}
670669

671-
private static boolean rowIdRangeCovered(
672-
NavigableMap<Long, Long> ranges, long firstRowId, long rowCount) {
673-
Entry<Long, Long> range = ranges.floorEntry(firstRowId);
674-
return range != null
675-
&& range.getKey() <= firstRowId
676-
&& range.getKey() + range.getValue() >= firstRowId + rowCount;
677-
}
678-
679-
private static boolean rowIdRangeExists(
680-
NavigableMap<Long, Long> ranges, long firstRowId, long rowCount) {
681-
Long existingRowCount = ranges.get(firstRowId);
682-
return existingRowCount != null && existingRowCount == rowCount;
683-
}
684-
685670
private static boolean dedicatedStorageFile(String fileName) {
686671
return isBlobFile(fileName) || isVectorStoreFile(fileName);
687672
}

paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,23 @@ void testCheckRowIdExistenceBaseFileRewritten() {
454454
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
455455
}
456456

457+
@Test
458+
void testCheckRowIdExistenceNormalFileRejectsAdjacentDataFiles() {
459+
ConflictDetection detection = createConflictDetection();
460+
461+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
462+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L));
463+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L));
464+
465+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
466+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 4L));
467+
468+
Optional<RuntimeException> result =
469+
detection.checkRowIdExistence(baseEntries, deltaEntries, 4L);
470+
assertThat(result).isPresent();
471+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
472+
}
473+
457474
@Test
458475
void testCheckRowIdExistenceDedicatedFileCoveredByDataFiles() {
459476
ConflictDetection detection = createConflictDetection();
@@ -467,6 +484,23 @@ void testCheckRowIdExistenceDedicatedFileCoveredByDataFiles() {
467484
assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 4L)).isEmpty();
468485
}
469486

487+
@Test
488+
void testCheckRowIdExistenceDedicatedFileRejectsAdjacentDataFiles() {
489+
ConflictDetection detection = createConflictDetection();
490+
491+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
492+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L));
493+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L));
494+
495+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
496+
deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 4L));
497+
498+
Optional<RuntimeException> result =
499+
detection.checkRowIdExistence(baseEntries, deltaEntries, 4L);
500+
assertThat(result).isPresent();
501+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
502+
}
503+
470504
@Test
471505
void testCheckRowIdExistenceDedicatedFileRejectsRangeNotCoveredByOneDataFile() {
472506
ConflictDetection detection = createConflictDetection();

0 commit comments

Comments
 (0)