Skip to content

Commit 94b468a

Browse files
authored
[core] Drop unsafe global indexes during row-id reassign (#8166)
This PR updates metadata-only row-id reassignment to drop individual global index entries whose row ranges cannot be safely rewritten, instead of failing the whole row-id reassignment. When a global index entry's row range is not fully covered by the data-file row-id mapping, keeping the old entry would be unsafe because the index file still stores row IDs relative to the old range. Dropping only that entry lets row-id reassignment proceed while allowing the missing global index range to be rebuilt later.
1 parent 1384d72 commit 94b468a

4 files changed

Lines changed: 147 additions & 24 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassigner.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.LinkedHashMap;
5757
import java.util.List;
5858
import java.util.Map;
59+
import java.util.Optional;
5960
import java.util.Set;
6061

6162
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
@@ -665,12 +666,21 @@ private RewrittenIndexManifest rewriteIndexManifest(
665666
continue;
666667
}
667668

669+
Optional<Range> newRange = mappingIndex.map(globalIndex.rowRange());
670+
if (!newRange.isPresent()) {
671+
LOG.warn(
672+
"Drop global index file '{}' from table {} during row-id reassignment because its row range {} cannot be rewritten safely.",
673+
indexFile.fileName(),
674+
table.name(),
675+
globalIndex.rowRange());
676+
continue;
677+
}
678+
Range rewrittenRange = newRange.get();
668679
globalIndexFileCount++;
669-
Range newRange = mappingIndex.map(globalIndex.rowRange());
670680
GlobalIndexMeta newGlobalIndex =
671681
new GlobalIndexMeta(
672-
newRange.from,
673-
newRange.to,
682+
rewrittenRange.from,
683+
rewrittenRange.to,
674684
globalIndex.indexFieldId(),
675685
globalIndex.extraFieldIds(),
676686
globalIndex.indexMeta());

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndex.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import java.util.Collections;
2525
import java.util.Comparator;
2626
import java.util.List;
27+
import java.util.Optional;
2728

2829
import static org.apache.paimon.utils.Preconditions.checkArgument;
29-
import static org.apache.paimon.utils.Preconditions.checkState;
3030

3131
/** Index for row-range mappings. */
3232
final class RowRangeMappingIndex {
@@ -70,13 +70,14 @@ static Mapping mapping(long oldStart, long oldEnd, long newStart) {
7070
return new Mapping(oldStart, oldEnd, newStart);
7171
}
7272

73-
Range map(Range oldRange) {
73+
Optional<Range> map(Range oldRange) {
7474
checkArgument(oldRange != null, "Old row range cannot be null.");
7575
checkArgument(oldRange.from <= oldRange.to, "Invalid old row range %s.", oldRange);
7676

7777
long cursor = oldRange.from;
78-
Long newFrom = null;
78+
long newFrom = Long.MIN_VALUE;
7979
long newTo = Long.MIN_VALUE;
80+
boolean mapped = false;
8081

8182
for (int i = lowerBound(oldEnds, cursor); i < mappings.size(); i++) {
8283
Mapping mapping = mappings.get(i);
@@ -88,13 +89,11 @@ Range map(Range oldRange) {
8889
long segmentNewFrom = mapping.newStart + cursor - mapping.oldStart;
8990
long segmentNewTo = mapping.newStart + segmentTo - mapping.oldStart;
9091

91-
if (newFrom == null) {
92+
if (!mapped) {
9293
newFrom = segmentNewFrom;
93-
} else {
94-
checkState(
95-
newTo + 1 == segmentNewFrom,
96-
"Global index row range %s maps to non-contiguous new row range.",
97-
oldRange);
94+
mapped = true;
95+
} else if (newTo + 1 != segmentNewFrom) {
96+
return Optional.empty();
9897
}
9998
newTo = segmentNewTo;
10099
cursor = segmentTo + 1;
@@ -103,11 +102,10 @@ Range map(Range oldRange) {
103102
}
104103
}
105104

106-
checkState(
107-
cursor > oldRange.to && newFrom != null,
108-
"Global index row range %s is not fully covered by data file row-id mappings.",
109-
oldRange);
110-
return new Range(newFrom, newTo);
105+
if (cursor <= oldRange.to) {
106+
return Optional.empty();
107+
}
108+
return Optional.of(new Range(newFrom, newTo));
111109
}
112110

113111
private static int lowerBound(long[] sorted, long target) {

paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionRowIdReassignerTest.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
3030
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
3131
import org.apache.paimon.index.GlobalIndexMeta;
32+
import org.apache.paimon.index.IndexFileMeta;
3233
import org.apache.paimon.io.DataFileMeta;
3334
import org.apache.paimon.manifest.FileEntry;
3435
import org.apache.paimon.manifest.FileKind;
3536
import org.apache.paimon.manifest.FileSource;
3637
import org.apache.paimon.manifest.IndexManifestEntry;
38+
import org.apache.paimon.manifest.IndexManifestFile;
3739
import org.apache.paimon.manifest.ManifestEntry;
3840
import org.apache.paimon.manifest.ManifestFile;
3941
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -55,6 +57,7 @@
5557
import org.apache.paimon.types.DataTypes;
5658
import org.apache.paimon.utils.Pair;
5759
import org.apache.paimon.utils.Range;
60+
import org.apache.paimon.utils.SnapshotManager;
5861

5962
import org.junit.jupiter.api.Disabled;
6063
import org.junit.jupiter.api.Test;
@@ -385,6 +388,47 @@ public void testReassignGlobalIndexRowRanges() throws Exception {
385388
assertThat(readPayloads(table, predicate)).containsExactly("v4");
386389
}
387390

391+
@Test
392+
public void testDropUnsafeGlobalIndexEntryWhenRangeCannotBeRewrittenByMetadataOnly()
393+
throws Exception {
394+
FileStoreTable table = createTableWithInterleavedPartitions();
395+
createBTreeIndex(table);
396+
replaceGlobalIndexRangesWithPartitionSpanningRanges(table);
397+
Snapshot before = table.snapshotManager().latestSnapshot();
398+
399+
assertThat(globalIndexRanges(table))
400+
.containsExactly(
401+
new Range(0, 4),
402+
new Range(0, 4),
403+
new Range(0, 4),
404+
new Range(1, 3),
405+
new Range(1, 3));
406+
407+
DataEvolutionRowIdReassigner.Result result =
408+
new DataEvolutionRowIdReassigner(table)
409+
.reassign("test-drop-unsafe-sparse-index-range-entry");
410+
411+
assertThat(result.reassigned).isTrue();
412+
assertThat(result.skipReason).isNull();
413+
assertThat(result.previousSnapshotId).isEqualTo(before.id());
414+
assertThat(result.newSnapshotId).isEqualTo(before.id() + 1);
415+
assertThat(result.firstAssignedRowId).isEqualTo(5L);
416+
assertThat(result.nextRowId).isEqualTo(10L);
417+
assertThat(result.fileCount).isEqualTo(5L);
418+
assertThat(result.rowCount).isEqualTo(5L);
419+
assertThat(result.indexFileCount).isEqualTo(0L);
420+
421+
Map<String, List<Long>> rowIdsByPartition = rowIdsByPartition(table);
422+
assertThat(rowIdsByPartition).hasSize(2);
423+
assertThat(rowIdsByPartition).containsEntry("pt=a/", Arrays.asList(5L, 6L, 7L));
424+
assertThat(rowIdsByPartition).containsEntry("pt=b/", Arrays.asList(8L, 9L));
425+
assertThat(globalIndexRanges(table)).isEmpty();
426+
427+
Predicate predicate =
428+
new PredicateBuilder(table.rowType()).equal(table.rowType().getFieldIndex("id"), 4);
429+
assertThat(readPayloads(table, predicate)).containsExactly("v4");
430+
}
431+
388432
@Test
389433
public void testReassignManyOutOfOrderPartitionEntries() throws Exception {
390434
verifyReassignOutOfOrderPartitionEntries(LARGE_ENTRY_COUNT, LARGE_MANIFEST_FILE_COUNT);
@@ -883,6 +927,80 @@ private void createBTreeIndex(FileStoreTable table) throws Exception {
883927
}
884928
}
885929

930+
private void replaceGlobalIndexRangesWithPartitionSpanningRanges(FileStoreTable table)
931+
throws Exception {
932+
Snapshot latest = table.snapshotManager().latestSnapshot();
933+
IndexManifestFile indexManifestFile = table.store().indexManifestFileFactory().create();
934+
List<IndexManifestEntry> rewritten = new ArrayList<>();
935+
for (IndexManifestEntry entry : indexManifestFile.read(latest.indexManifest())) {
936+
GlobalIndexMeta globalIndex = entry.indexFile().globalIndexMeta();
937+
assertThat(globalIndex).isNotNull();
938+
939+
Range staleRowRange;
940+
String partition = table.store().pathFactory().getPartitionString(entry.partition());
941+
if (partition.equals("pt=a/")) {
942+
staleRowRange = new Range(0, 4);
943+
} else if (partition.equals("pt=b/")) {
944+
staleRowRange = new Range(1, 3);
945+
} else {
946+
throw new IllegalStateException("Unexpected partition " + partition);
947+
}
948+
949+
GlobalIndexMeta staleGlobalIndex =
950+
new GlobalIndexMeta(
951+
staleRowRange.from,
952+
staleRowRange.to,
953+
globalIndex.indexFieldId(),
954+
globalIndex.extraFieldIds(),
955+
globalIndex.indexMeta());
956+
IndexFileMeta indexFile = entry.indexFile();
957+
rewritten.add(
958+
new IndexManifestEntry(
959+
entry.kind(),
960+
entry.partition(),
961+
entry.bucket(),
962+
new IndexFileMeta(
963+
indexFile.indexType(),
964+
indexFile.fileName(),
965+
indexFile.fileSize(),
966+
indexFile.rowCount(),
967+
indexFile.dvRanges(),
968+
indexFile.externalPath(),
969+
staleGlobalIndex)));
970+
}
971+
972+
String staleIndexManifest = indexManifestFile.writeWithoutRolling(rewritten);
973+
Snapshot staleSnapshot =
974+
new Snapshot(
975+
latest.version(),
976+
latest.id(),
977+
latest.schemaId(),
978+
latest.baseManifestList(),
979+
latest.baseManifestListSize(),
980+
latest.deltaManifestList(),
981+
latest.deltaManifestListSize(),
982+
latest.changelogManifestList(),
983+
latest.changelogManifestListSize(),
984+
staleIndexManifest,
985+
latest.commitUser(),
986+
latest.commitIdentifier(),
987+
latest.commitKind(),
988+
latest.timeMillis(),
989+
latest.totalRecordCount(),
990+
latest.deltaRecordCount(),
991+
latest.changelogRecordCount(),
992+
latest.watermark(),
993+
latest.statistics(),
994+
latest.properties(),
995+
latest.nextRowId());
996+
SnapshotManager snapshotManager = table.snapshotManager();
997+
snapshotManager
998+
.fileIO()
999+
.overwriteFileUtf8(
1000+
snapshotManager.snapshotPath(latest.id()), staleSnapshot.toJson());
1001+
snapshotManager.invalidateCache();
1002+
}
1003+
8861004
private List<Range> globalIndexRanges(FileStoreTable table) {
8871005
List<Range> ranges = new ArrayList<>();
8881006
List<IndexManifestEntry> entries =

paimon-core/src/test/java/org/apache/paimon/append/dataevolution/RowRangeMappingIndexTest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Arrays;
2626

2727
import static org.assertj.core.api.Assertions.assertThat;
28-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2928

3029
/** Tests for {@link RowRangeMappingIndex}. */
3130
public class RowRangeMappingIndexTest {
@@ -36,7 +35,7 @@ public void testMapSingleRange() {
3635
RowRangeMappingIndex.create(
3736
Arrays.asList(RowRangeMappingIndex.mapping(10, 19, 100)));
3837

39-
assertThat(index.map(new Range(12, 15))).isEqualTo(new Range(102, 105));
38+
assertThat(index.map(new Range(12, 15))).hasValue(new Range(102, 105));
4039
}
4140

4241
@Test
@@ -48,7 +47,7 @@ public void testMapAcrossContiguousRanges() {
4847
RowRangeMappingIndex.mapping(15, 19, 105),
4948
RowRangeMappingIndex.mapping(20, 24, 110)));
5049

51-
assertThat(index.map(new Range(12, 22))).isEqualTo(new Range(102, 112));
50+
assertThat(index.map(new Range(12, 22))).hasValue(new Range(102, 112));
5251
}
5352

5453
@Test
@@ -59,8 +58,7 @@ public void testMapFailsWhenOldRangeIsNotCovered() {
5958
RowRangeMappingIndex.mapping(10, 14, 100),
6059
RowRangeMappingIndex.mapping(20, 24, 105)));
6160

62-
assertThatThrownBy(() -> index.map(new Range(12, 22)))
63-
.hasMessageContaining("is not fully covered");
61+
assertThat(index.map(new Range(12, 22))).isEmpty();
6462
}
6563

6664
@Test
@@ -71,7 +69,6 @@ public void testMapFailsWhenNewRangeIsNotContiguous() {
7169
RowRangeMappingIndex.mapping(10, 14, 100),
7270
RowRangeMappingIndex.mapping(15, 19, 200)));
7371

74-
assertThatThrownBy(() -> index.map(new Range(12, 17)))
75-
.hasMessageContaining("maps to non-contiguous new row range");
72+
assertThat(index.map(new Range(12, 17))).isEmpty();
7673
}
7774
}

0 commit comments

Comments
 (0)