Skip to content

Commit 5f970a8

Browse files
authored
Core: Support appending files with different specs (#9860)
1 parent bc72b2e commit 5f970a8

2 files changed

Lines changed: 145 additions & 45 deletions

File tree

core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Objects;
3232
import java.util.Set;
33+
import java.util.stream.Collectors;
3334
import org.apache.iceberg.encryption.EncryptedOutputFile;
3435
import org.apache.iceberg.events.CreateSnapshotEvent;
3536
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -42,6 +43,7 @@
4243
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4344
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
4445
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
46+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4547
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
4648
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
4749
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
@@ -79,7 +81,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
7981
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
8082

8183
// update data
82-
private final List<DataFile> newDataFiles = Lists.newArrayList();
84+
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
8385
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
8486
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
8587
private Long newDataFilesDataSequenceNumber;
@@ -89,10 +91,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
8991
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
9092
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
9193
private Expression deleteExpression = Expressions.alwaysFalse();
92-
private PartitionSpec dataSpec;
9394

9495
// cache new data manifests after writing
95-
private List<ManifestFile> cachedNewDataManifests = null;
96+
private final List<ManifestFile> cachedNewDataManifests = Lists.newLinkedList();
9697
private boolean hasNewDataFiles = false;
9798

9899
// cache new manifests for delete files
@@ -105,7 +106,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
105106
super(ops);
106107
this.tableName = tableName;
107108
this.ops = ops;
108-
this.dataSpec = null;
109109
long targetSizeBytes =
110110
ops.current()
111111
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
@@ -141,18 +141,31 @@ protected boolean isCaseSensitive() {
141141
}
142142

143143
protected PartitionSpec dataSpec() {
144+
Set<PartitionSpec> specs = dataSpecs();
144145
Preconditions.checkState(
145-
dataSpec != null, "Cannot determine partition spec: no data files have been added");
146-
// the spec is set when the write is started
147-
return dataSpec;
146+
specs.size() == 1,
147+
"Cannot return a single partition spec: data files with different partition specs have been added");
148+
return specs.iterator().next();
149+
}
150+
151+
protected Set<PartitionSpec> dataSpecs() {
152+
Set<PartitionSpec> specs = newDataFilesBySpec.keySet();
153+
Preconditions.checkState(
154+
!specs.isEmpty(), "Cannot determine partition specs: no data files have been added");
155+
return ImmutableSet.copyOf(specs);
148156
}
149157

150158
protected Expression rowFilter() {
151159
return deleteExpression;
152160
}
153161

154162
protected List<DataFile> addedDataFiles() {
155-
return ImmutableList.copyOf(newDataFiles);
163+
return ImmutableList.copyOf(
164+
newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList()));
165+
}
166+
167+
protected Map<PartitionSpec, List<DataFile>> addedDataFilesBySpec() {
168+
return ImmutableMap.copyOf(newDataFilesBySpec);
156169
}
157170

158171
protected void failAnyDelete() {
@@ -212,7 +225,7 @@ protected boolean deletesDeleteFiles() {
212225
}
213226

214227
protected boolean addsDataFiles() {
215-
return !newDataFiles.isEmpty();
228+
return !newDataFilesBySpec.isEmpty();
216229
}
217230

218231
protected boolean addsDeleteFiles() {
@@ -223,9 +236,17 @@ protected boolean addsDeleteFiles() {
223236
protected void add(DataFile file) {
224237
Preconditions.checkNotNull(file, "Invalid data file: null");
225238
if (newDataFilePaths.add(file.path())) {
226-
setDataSpec(file);
227-
addedFilesSummary.addedFile(dataSpec(), file);
239+
PartitionSpec fileSpec = ops.current().spec(file.specId());
240+
Preconditions.checkArgument(
241+
fileSpec != null,
242+
"Cannot find partition spec %s for data file: %s",
243+
file.specId(),
244+
file.path());
245+
246+
addedFilesSummary.addedFile(fileSpec, file);
228247
hasNewDataFiles = true;
248+
List<DataFile> newDataFiles =
249+
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
229250
newDataFiles.add(file);
230251
}
231252
}
@@ -255,17 +276,6 @@ private void add(DeleteFileHolder fileHolder) {
255276
}
256277
}
257278

258-
private void setDataSpec(DataFile file) {
259-
PartitionSpec fileSpec = ops.current().spec(file.specId());
260-
Preconditions.checkNotNull(
261-
fileSpec, "Cannot find partition spec for data file: %s", file.path());
262-
if (dataSpec == null) {
263-
dataSpec = fileSpec;
264-
} else if (dataSpec.specId() != file.specId()) {
265-
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
266-
}
267-
}
268-
269279
/** Add all files in a manifest to the new snapshot. */
270280
protected void add(ManifestFile manifest) {
271281
Preconditions.checkArgument(
@@ -885,7 +895,7 @@ public Object updateEvent() {
885895

886896
@SuppressWarnings("checkstyle:CyclomaticComplexity")
887897
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
888-
if (cachedNewDataManifests != null) {
898+
if (!cachedNewDataManifests.isEmpty()) {
889899
boolean hasDeletes = false;
890900
for (ManifestFile manifest : cachedNewDataManifests) {
891901
if (!committed.contains(manifest)) {
@@ -895,7 +905,7 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
895905
}
896906

897907
if (hasDeletes) {
898-
this.cachedNewDataManifests = null;
908+
this.cachedNewDataManifests.clear();
899909
}
900910
}
901911

@@ -941,7 +951,7 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
941951

942952
private Iterable<ManifestFile> prepareNewDataManifests() {
943953
Iterable<ManifestFile> newManifests;
944-
if (!newDataFiles.isEmpty()) {
954+
if (!newDataFilesBySpec.isEmpty()) {
945955
List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
946956
newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests);
947957
} else {
@@ -954,29 +964,31 @@ private Iterable<ManifestFile> prepareNewDataManifests() {
954964
}
955965

956966
private List<ManifestFile> newDataFilesAsManifests() {
957-
if (hasNewDataFiles && cachedNewDataManifests != null) {
967+
if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
958968
cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
959-
cachedNewDataManifests = null;
969+
cachedNewDataManifests.clear();
960970
}
961971

962-
if (cachedNewDataManifests == null) {
963-
try {
964-
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
965-
try {
966-
if (newDataFilesDataSequenceNumber == null) {
967-
newDataFiles.forEach(writer::add);
968-
} else {
969-
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
970-
}
971-
} finally {
972-
writer.close();
973-
}
974-
975-
this.cachedNewDataManifests = writer.toManifestFiles();
976-
this.hasNewDataFiles = false;
977-
} catch (IOException e) {
978-
throw new RuntimeIOException(e, "Failed to close manifest writer");
979-
}
972+
if (cachedNewDataManifests.isEmpty()) {
973+
newDataFilesBySpec.forEach(
974+
(dataSpec, newDataFiles) -> {
975+
try {
976+
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec);
977+
try {
978+
if (newDataFilesDataSequenceNumber == null) {
979+
newDataFiles.forEach(writer::add);
980+
} else {
981+
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
982+
}
983+
} finally {
984+
writer.close();
985+
}
986+
this.cachedNewDataManifests.addAll(writer.toManifestFiles());
987+
this.hasNewDataFiles = false;
988+
} catch (IOException e) {
989+
throw new RuntimeIOException(e, "Failed to close manifest writer");
990+
}
991+
});
980992
}
981993

982994
return cachedNewDataManifests;

core/src/test/java/org/apache/iceberg/TestMergeAppend.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
import java.io.IOException;
2828
import java.util.Arrays;
2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Set;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.stream.Collectors;
3435
import org.apache.iceberg.ManifestEntry.Status;
3536
import org.apache.iceberg.exceptions.CommitFailedException;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3638
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
3739
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3840
import org.apache.iceberg.types.Types;
@@ -90,6 +92,92 @@ public void testEmptyTableAppend() {
9092
statuses(Status.ADDED, Status.ADDED));
9193
}
9294

95+
@TestTemplate
96+
public void testEmptyTableAppendFilesWithDifferentSpecs() {
97+
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
98+
99+
TableMetadata base = readMetadata();
100+
assertThat(base.currentSnapshot()).as("Should not have a current snapshot").isNull();
101+
assertThat(base.lastSequenceNumber()).as("Last sequence number should be 0").isEqualTo(0);
102+
103+
table.updateSpec().addField("id").commit();
104+
PartitionSpec newSpec = table.spec();
105+
106+
assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
107+
108+
DataFile fileNewSpec =
109+
DataFiles.builder(newSpec)
110+
.withPath("/path/to/data-b.parquet")
111+
.withPartitionPath("data_bucket=0/id=0")
112+
.withFileSizeInBytes(10)
113+
.withRecordCount(1)
114+
.build();
115+
116+
Snapshot committedSnapshot =
117+
commit(table, table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec), branch);
118+
119+
assertThat(committedSnapshot).as("Should create a snapshot").isNotNull();
120+
V1Assert.assertEquals(
121+
"Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber());
122+
V2Assert.assertEquals(
123+
"Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber());
124+
125+
assertThat(committedSnapshot.allManifests(table.io()))
126+
.as("Should create 2 manifests for initial write, 1 manifest per spec")
127+
.hasSize(2);
128+
129+
long snapshotId = committedSnapshot.snapshotId();
130+
131+
ImmutableMap<Integer, DataFile> expectedFileBySpec =
132+
ImmutableMap.of(SPEC.specId(), FILE_A, newSpec.specId(), fileNewSpec);
133+
134+
expectedFileBySpec.forEach(
135+
(specId, expectedDataFile) -> {
136+
ManifestFile manifestFileForSpecId =
137+
committedSnapshot.allManifests(table.io()).stream()
138+
.filter(m -> Objects.equals(m.partitionSpecId(), specId))
139+
.findAny()
140+
.get();
141+
142+
validateManifest(
143+
manifestFileForSpecId,
144+
dataSeqs(1L),
145+
fileSeqs(1L),
146+
ids(snapshotId),
147+
files(expectedDataFile),
148+
statuses(Status.ADDED));
149+
});
150+
}
151+
152+
@TestTemplate
153+
public void testDataSpecThrowsExceptionIfDataFilesWithDifferentSpecsAreAdded() {
154+
assertThat(listManifestFiles()).as("Table should start empty").isEmpty();
155+
156+
TableMetadata base = readMetadata();
157+
assertThat(base.currentSnapshot()).as("Should not have a current snapshot").isNull();
158+
assertThat(base.lastSequenceNumber()).as("Last sequence number should be 0").isEqualTo(0);
159+
160+
table.updateSpec().addField("id").commit();
161+
PartitionSpec newSpec = table.spec();
162+
163+
assertThat(table.specs()).as("Table should have 2 specs").hasSize(2);
164+
165+
DataFile fileNewSpec =
166+
DataFiles.builder(newSpec)
167+
.withPath("/path/to/data-b.parquet")
168+
.withPartitionPath("data_bucket=0/id=0")
169+
.withFileSizeInBytes(10)
170+
.withRecordCount(1)
171+
.build();
172+
173+
MergeAppend mergeAppend =
174+
(MergeAppend) table.newAppend().appendFile(FILE_A).appendFile(fileNewSpec);
175+
assertThatThrownBy(mergeAppend::dataSpec)
176+
.isInstanceOf(IllegalStateException.class)
177+
.hasMessage(
178+
"Cannot return a single partition spec: data files with different partition specs have been added");
179+
}
180+
93181
@TestTemplate
94182
public void testEmptyTableAppendManifest() throws IOException {
95183
assertThat(listManifestFiles()).isEmpty();

0 commit comments

Comments
 (0)