Skip to content

Commit 5caa6f6

Browse files
ZouxxyyJingsongLi
authored andcommitted
[core] Add dv conflict detection during commit (#6303)
1 parent c4bf236 commit 5caa6f6

16 files changed

Lines changed: 955 additions & 120 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,10 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
298298
options.commitMinRetryWait(),
299299
options.commitMaxRetryWait(),
300300
options.commitStrictModeLastSafeSnapshot().orElse(null),
301-
options.rowTrackingEnabled());
301+
options.rowTrackingEnabled(),
302+
!schema.primaryKeys().isEmpty(),
303+
options.deletionVectorsEnabled(),
304+
newIndexFileHandler());
302305
}
303306

304307
@Override

paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.paimon.data.InternalRow;
2323
import org.apache.paimon.utils.FileStorePathFactory;
2424
import org.apache.paimon.utils.Filter;
25-
import org.apache.paimon.utils.Preconditions;
2625

2726
import javax.annotation.Nullable;
2827

@@ -40,6 +39,7 @@
4039

4140
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
4241
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
42+
import static org.apache.paimon.utils.Preconditions.checkState;
4343

4444
/** Entry representing a file. */
4545
public interface FileEntry {
@@ -77,7 +77,7 @@ class Identifier {
7777
public final int level;
7878
public final String fileName;
7979
public final List<String> extraFiles;
80-
@Nullable private final byte[] embeddedIndex;
80+
@Nullable public final byte[] embeddedIndex;
8181
@Nullable public final String externalPath;
8282

8383
/* Cache the hash code for the string */
@@ -190,7 +190,7 @@ static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifi
190190
Identifier identifier = entry.identifier();
191191
switch (entry.kind()) {
192192
case ADD:
193-
Preconditions.checkState(
193+
checkState(
194194
!map.containsKey(identifier),
195195
"Trying to add file %s which is already added.",
196196
identifier);

paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,21 @@ public static SimpleFileEntry from(ManifestEntry entry) {
8181
entry.externalPath());
8282
}
8383

84+
public SimpleFileEntry toDelete() {
85+
return new SimpleFileEntry(
86+
FileKind.DELETE,
87+
partition,
88+
bucket,
89+
totalBuckets,
90+
level,
91+
fileName,
92+
extraFiles,
93+
embeddedIndex,
94+
minKey,
95+
maxKey,
96+
externalPath);
97+
}
98+
8499
public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
85100
return entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
86101
}
@@ -115,6 +130,11 @@ public String fileName() {
115130
return fileName;
116131
}
117132

133+
@Nullable
134+
public byte[] embeddedIndex() {
135+
return embeddedIndex;
136+
}
137+
118138
@Nullable
119139
@Override
120140
public String externalPath() {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.manifest;
20+
21+
import javax.annotation.Nullable;
22+
23+
import java.util.Objects;
24+
25+
/** A {@link FileEntry} contains {@link SimpleFileEntry} and dv file name. */
26+
public class SimpleFileEntryWithDV extends SimpleFileEntry {
27+
28+
@Nullable private final String dvFileName;
29+
30+
public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName) {
31+
super(
32+
entry.kind(),
33+
entry.partition(),
34+
entry.bucket(),
35+
entry.totalBuckets(),
36+
entry.level(),
37+
entry.fileName(),
38+
entry.extraFiles(),
39+
entry.embeddedIndex(),
40+
entry.minKey(),
41+
entry.maxKey(),
42+
entry.externalPath());
43+
this.dvFileName = dvFileName;
44+
}
45+
46+
public Identifier identifier() {
47+
return new IdentifierWithDv(super.identifier(), dvFileName);
48+
}
49+
50+
@Nullable
51+
public String dvFileName() {
52+
return dvFileName;
53+
}
54+
55+
public SimpleFileEntry toDelete() {
56+
return new SimpleFileEntryWithDV(super.toDelete(), dvFileName);
57+
}
58+
59+
@Override
60+
public boolean equals(Object o) {
61+
if (this == o) {
62+
return true;
63+
}
64+
if (o == null || getClass() != o.getClass()) {
65+
return false;
66+
}
67+
if (!super.equals(o)) {
68+
return false;
69+
}
70+
SimpleFileEntryWithDV that = (SimpleFileEntryWithDV) o;
71+
return Objects.equals(dvFileName, that.dvFileName);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(super.hashCode(), dvFileName);
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return super.toString() + ", {dvFileName=" + dvFileName + '}';
82+
}
83+
84+
/**
85+
* The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
86+
* file.
87+
*/
88+
static class IdentifierWithDv extends Identifier {
89+
90+
private final String dvFileName;
91+
92+
public IdentifierWithDv(Identifier identifier, String dvFileName) {
93+
super(
94+
identifier.partition,
95+
identifier.bucket,
96+
identifier.level,
97+
identifier.fileName,
98+
identifier.extraFiles,
99+
identifier.embeddedIndex,
100+
identifier.externalPath);
101+
this.dvFileName = dvFileName;
102+
}
103+
104+
@Override
105+
public boolean equals(Object o) {
106+
if (this == o) {
107+
return true;
108+
}
109+
if (o == null || getClass() != o.getClass()) {
110+
return false;
111+
}
112+
if (!super.equals(o)) {
113+
return false;
114+
}
115+
IdentifierWithDv that = (IdentifierWithDv) o;
116+
return Objects.equals(dvFileName, that.dvFileName);
117+
}
118+
119+
@Override
120+
public int hashCode() {
121+
return Objects.hash(super.hashCode(), dvFileName);
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)