Skip to content

Commit bfdd516

Browse files
authored
[core] Minor refactor conflicts check for deletion vectors files (#6369)
1 parent d0fc63c commit bfdd516

5 files changed

Lines changed: 163 additions & 74 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
299299
options.commitMaxRetryWait(),
300300
options.commitStrictModeLastSafeSnapshot().orElse(null),
301301
options.rowTrackingEnabled(),
302-
!schema.primaryKeys().isEmpty(),
303302
options.deletionVectorsEnabled(),
304303
newIndexFileHandler());
305304
}

paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.manifest;
2020

2121
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.index.DeletionVectorMeta;
2223
import org.apache.paimon.index.IndexFileMeta;
2324
import org.apache.paimon.table.BucketMode;
2425
import org.apache.paimon.utils.Pair;
@@ -27,14 +28,18 @@
2728

2829
import java.util.ArrayList;
2930
import java.util.HashMap;
31+
import java.util.HashSet;
32+
import java.util.LinkedHashMap;
3033
import java.util.List;
3134
import java.util.Map;
3235
import java.util.Objects;
36+
import java.util.Set;
3337
import java.util.stream.Collectors;
3438

3539
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
3640
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
3741
import static org.apache.paimon.utils.Preconditions.checkArgument;
42+
import static org.apache.paimon.utils.Preconditions.checkState;
3843

3944
/** IndexManifestFile Handler. */
4045
public class IndexManifestFileHandler {
@@ -115,15 +120,48 @@ static class GlobalCombiner implements IndexManifestFileCombiner {
115120
public List<IndexManifestEntry> combine(
116121
List<IndexManifestEntry> prevIndexFiles, List<IndexManifestEntry> newIndexFiles) {
117122
Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
123+
Set<String> dvDataFiles = new HashSet<>();
118124
for (IndexManifestEntry entry : prevIndexFiles) {
119125
indexEntries.put(entry.indexFile().fileName(), entry);
126+
LinkedHashMap<String, DeletionVectorMeta> dvRanges = entry.indexFile().dvRanges();
127+
if (dvRanges != null) {
128+
dvDataFiles.addAll(dvRanges.keySet());
129+
}
120130
}
121131

122132
for (IndexManifestEntry entry : newIndexFiles) {
133+
String fileName = entry.indexFile().fileName();
134+
LinkedHashMap<String, DeletionVectorMeta> dvRanges = entry.indexFile().dvRanges();
123135
if (entry.kind() == FileKind.ADD) {
124-
indexEntries.put(entry.indexFile().fileName(), entry);
136+
checkState(
137+
!indexEntries.containsKey(fileName),
138+
"Trying to add file %s which is already added.",
139+
fileName);
140+
if (dvRanges != null) {
141+
for (String dataFile : dvRanges.keySet()) {
142+
checkState(
143+
!dvDataFiles.contains(dataFile),
144+
"Trying to add dv for data file %s which is already added.",
145+
dataFile);
146+
dvDataFiles.add(dataFile);
147+
}
148+
}
149+
indexEntries.put(fileName, entry);
125150
} else {
126-
indexEntries.remove(entry.indexFile().fileName());
151+
checkState(
152+
indexEntries.containsKey(fileName),
153+
"Trying to delete file %s which is not exists.",
154+
fileName);
155+
if (dvRanges != null) {
156+
for (String dataFile : dvRanges.keySet()) {
157+
checkState(
158+
dvDataFiles.contains(dataFile),
159+
"Trying to delete dv for data file %s which is not exists.",
160+
dataFile);
161+
dvDataFiles.remove(dataFile);
162+
}
163+
}
164+
indexEntries.remove(fileName);
127165
}
128166
}
129167
return new ArrayList<>(indexEntries.values());

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 69 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ public class FileStoreCommitImpl implements FileStoreCommit {
154154
@Nullable private Long strictModeLastSafeSnapshot;
155155
private final InternalRowPartitionComputer partitionComputer;
156156
private final boolean rowTrackingEnabled;
157-
private final boolean isPkTable;
158157
private final boolean deletionVectorsEnabled;
159158
private final IndexFileHandler indexFileHandler;
160159

@@ -194,7 +193,6 @@ public FileStoreCommitImpl(
194193
long commitMaxRetryWait,
195194
@Nullable Long strictModeLastSafeSnapshot,
196195
boolean rowTrackingEnabled,
197-
boolean isPkTable,
198196
boolean deletionVectorsEnabled,
199197
IndexFileHandler indexFileHandler) {
200198
this.snapshotCommit = snapshotCommit;
@@ -240,7 +238,6 @@ public FileStoreCommitImpl(
240238
this.statsFileHandler = statsFileHandler;
241239
this.bucketMode = bucketMode;
242240
this.rowTrackingEnabled = rowTrackingEnabled;
243-
this.isPkTable = isPkTable;
244241
this.deletionVectorsEnabled = deletionVectorsEnabled;
245242
this.indexFileHandler = indexFileHandler;
246243
}
@@ -728,23 +725,23 @@ private void collectChanges(
728725
.forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, commitMessage, m)));
729726
commitMessage
730727
.newFilesIncrement()
731-
.newIndexFiles()
728+
.deletedIndexFiles()
732729
.forEach(
733730
m ->
734731
appendIndexFiles.add(
735732
new IndexManifestEntry(
736-
FileKind.ADD,
733+
FileKind.DELETE,
737734
commitMessage.partition(),
738735
commitMessage.bucket(),
739736
m)));
740737
commitMessage
741738
.newFilesIncrement()
742-
.deletedIndexFiles()
739+
.newIndexFiles()
743740
.forEach(
744741
m ->
745742
appendIndexFiles.add(
746743
new IndexManifestEntry(
747-
FileKind.DELETE,
744+
FileKind.ADD,
748745
commitMessage.partition(),
749746
commitMessage.bucket(),
750747
m)));
@@ -766,23 +763,23 @@ private void collectChanges(
766763
.forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, commitMessage, m)));
767764
commitMessage
768765
.compactIncrement()
769-
.newIndexFiles()
766+
.deletedIndexFiles()
770767
.forEach(
771768
m ->
772769
compactIndexFiles.add(
773770
new IndexManifestEntry(
774-
FileKind.ADD,
771+
FileKind.DELETE,
775772
commitMessage.partition(),
776773
commitMessage.bucket(),
777774
m)));
778775
commitMessage
779776
.compactIncrement()
780-
.deletedIndexFiles()
777+
.newIndexFiles()
781778
.forEach(
782779
m ->
783780
compactIndexFiles.add(
784781
new IndexManifestEntry(
785-
FileKind.DELETE,
782+
FileKind.ADD,
786783
commitMessage.partition(),
787784
commitMessage.bucket(),
788785
m)));
@@ -1419,7 +1416,7 @@ private void noConflictsOrFail(
14191416
List<IndexManifestEntry> deltaIndexEntries,
14201417
CommitKind commitKind) {
14211418
String baseCommitUser = snapshot.commitUser();
1422-
if (checkForDeletionVector(commitKind)) {
1419+
if (checkForDeletionVector()) {
14231420
// Enrich dvName in fileEntry to checker for base ADD dv and delta DELETE dv.
14241421
// For example:
14251422
// If the base file is <ADD baseFile1, ADD dv1>,
@@ -1443,52 +1440,72 @@ private void noConflictsOrFail(
14431440
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
14441441
allEntries.addAll(deltaEntries);
14451442

1446-
if (commitKind != CommitKind.OVERWRITE) {
1447-
// total buckets within the same partition should remain the same
1448-
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
1449-
for (SimpleFileEntry entry : allEntries) {
1450-
if (entry.totalBuckets() <= 0) {
1451-
continue;
1452-
}
1453-
1454-
if (!totalBuckets.containsKey(entry.partition())) {
1455-
totalBuckets.put(entry.partition(), entry.totalBuckets());
1456-
continue;
1457-
}
1458-
1459-
int old = totalBuckets.get(entry.partition());
1460-
if (old == entry.totalBuckets()) {
1461-
continue;
1462-
}
1463-
1464-
Pair<RuntimeException, RuntimeException> conflictException =
1465-
createConflictException(
1466-
"Total buckets of partition "
1467-
+ entry.partition()
1468-
+ " changed from "
1469-
+ old
1470-
+ " to "
1471-
+ entry.totalBuckets()
1472-
+ " without overwrite. Give up committing.",
1473-
baseCommitUser,
1474-
baseEntries,
1475-
deltaEntries,
1476-
null);
1477-
LOG.warn("", conflictException.getLeft());
1478-
throw conflictException.getRight();
1479-
}
1480-
}
1443+
checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, baseCommitUser);
14811444

1445+
Function<Throwable, RuntimeException> conflictException =
1446+
conflictException(baseCommitUser, baseEntries, deltaEntries);
14821447
Collection<SimpleFileEntry> mergedEntries;
14831448
try {
14841449
// merge manifest entries and also check if the files we want to delete are still there
14851450
mergedEntries = FileEntry.mergeEntries(allEntries);
14861451
} catch (Throwable e) {
1487-
throw conflictException(commitUser, baseEntries, deltaEntries).apply(e);
1452+
throw conflictException.apply(e);
1453+
}
1454+
1455+
checkNoDeleteInMergedEntries(mergedEntries, conflictException);
1456+
checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, baseCommitUser);
1457+
}
1458+
1459+
private void checkBucketKeepSame(
1460+
List<SimpleFileEntry> baseEntries,
1461+
List<SimpleFileEntry> deltaEntries,
1462+
CommitKind commitKind,
1463+
List<SimpleFileEntry> allEntries,
1464+
String baseCommitUser) {
1465+
if (commitKind == CommitKind.OVERWRITE) {
1466+
return;
14881467
}
14891468

1490-
assertNoDelete(mergedEntries, conflictException(commitUser, baseEntries, deltaEntries));
1469+
// total buckets within the same partition should remain the same
1470+
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
1471+
for (SimpleFileEntry entry : allEntries) {
1472+
if (entry.totalBuckets() <= 0) {
1473+
continue;
1474+
}
1475+
1476+
if (!totalBuckets.containsKey(entry.partition())) {
1477+
totalBuckets.put(entry.partition(), entry.totalBuckets());
1478+
continue;
1479+
}
1480+
1481+
int old = totalBuckets.get(entry.partition());
1482+
if (old == entry.totalBuckets()) {
1483+
continue;
1484+
}
1485+
1486+
Pair<RuntimeException, RuntimeException> conflictException =
1487+
createConflictException(
1488+
"Total buckets of partition "
1489+
+ entry.partition()
1490+
+ " changed from "
1491+
+ old
1492+
+ " to "
1493+
+ entry.totalBuckets()
1494+
+ " without overwrite. Give up committing.",
1495+
baseCommitUser,
1496+
baseEntries,
1497+
deltaEntries,
1498+
null);
1499+
LOG.warn("", conflictException.getLeft());
1500+
throw conflictException.getRight();
1501+
}
1502+
}
14911503

1504+
private void checkKeyRangeNoConflicts(
1505+
List<SimpleFileEntry> baseEntries,
1506+
List<SimpleFileEntry> deltaEntries,
1507+
Collection<SimpleFileEntry> mergedEntries,
1508+
String baseCommitUser) {
14921509
// fast exit for file store without keys
14931510
if (keyComparator == null) {
14941511
return;
@@ -1548,26 +1565,11 @@ private Function<Throwable, RuntimeException> conflictException(
15481565
};
15491566
}
15501567

1551-
private boolean checkForDeletionVector(CommitKind commitKind) {
1552-
if (!deletionVectorsEnabled) {
1553-
return false;
1554-
}
1555-
1556-
// todo: Add them once contains DELETE type.
1557-
// PK table's compact dv index only contains ADD type, skip conflict detection.
1558-
if (isPkTable && commitKind == CommitKind.COMPACT) {
1559-
return false;
1560-
}
1561-
1562-
// Non-PK table's hash fixed bucket mode only contains ADD type, skip conflict detection.
1563-
if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) {
1564-
return false;
1565-
}
1566-
1567-
return true;
1568+
private boolean checkForDeletionVector() {
1569+
return deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE);
15681570
}
15691571

1570-
private void assertNoDelete(
1572+
private void checkNoDeleteInMergedEntries(
15711573
Collection<SimpleFileEntry> mergedEntries,
15721574
Function<Throwable, RuntimeException> exceptionFunction) {
15731575
try {

paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -912,13 +912,13 @@ public void testDVIndexFiles(boolean bitmap64) throws Exception {
912912
assertThat(dvs.get("f2").isDeleted(4)).isTrue();
913913

914914
// commit 2
915-
CommitMessage commitMessage3 =
916-
store.writeDVIndexFiles(
917-
partition, 0, Collections.singletonMap("f2", Arrays.asList(3)));
918915
List<IndexFileMeta> deleted =
919916
new ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
920917
deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
921-
CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0, deleted);
918+
CommitMessage commitMessage3 = store.removeIndexFiles(partition, 0, deleted);
919+
CommitMessageImpl commitMessage4 =
920+
store.writeDVIndexFiles(
921+
partition, 0, Collections.singletonMap("f2", Arrays.asList(3)));
922922
store.commit(commitMessage3, commitMessage4);
923923

924924
// assert 2

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.apache.paimon.spark.{PaimonAppendTable, PaimonPrimaryKeyTable, Paimon
2323

2424
import org.apache.spark.sql.Row
2525

26+
import java.util.concurrent.Executors
27+
2628
import scala.concurrent.{Await, Future}
2729
import scala.concurrent.ExecutionContext.Implicits.global
2830
import scala.concurrent.duration.DurationInt
@@ -805,4 +807,52 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl
805807
}
806808
}
807809
}
810+
811+
test("Paimon MergeInto: concurrent two merge") {
812+
for (dvEnabled <- Seq("true", "false")) {
813+
withTable("s", "t") {
814+
sql("CREATE TABLE s (id INT, b INT, c INT)")
815+
sql(
816+
"INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
817+
818+
sql(
819+
s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')")
820+
sql(
821+
"INSERT INTO t VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
822+
823+
def doMergeInto(): Unit = {
824+
for (i <- 1 to 9) {
825+
try {
826+
sql(s"""
827+
|MERGE INTO t
828+
|USING (SELECT * FROM s WHERE id = $i)
829+
|ON t.id = s.id
830+
|WHEN MATCHED THEN
831+
|UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
832+
|""".stripMargin)
833+
} catch {
834+
case a: Throwable =>
835+
assert(
836+
a.getMessage.contains("Conflicts during commits") || a.getMessage.contains(
837+
"Missing file"))
838+
}
839+
checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(9)))
840+
}
841+
}
842+
843+
val executor = Executors.newFixedThreadPool(2)
844+
val runnable = new Runnable {
845+
override def run(): Unit = doMergeInto()
846+
}
847+
848+
val future1 = executor.submit(runnable)
849+
val future2 = executor.submit(runnable)
850+
851+
future1.get()
852+
future2.get()
853+
854+
executor.shutdown()
855+
}
856+
}
857+
}
808858
}

0 commit comments

Comments
 (0)