Skip to content

Commit fc3db69

Browse files
committed
Fix copy plan to include all live files regardless of snapshot expiration in RewriteTablePath
1 parent b0f022f commit fc3db69

10 files changed

Lines changed: 485 additions & 193 deletions

File tree

core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,6 @@ private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot sn
331331
* Rewrite a data manifest, replacing path references.
332332
*
333333
* @param manifestFile source manifest file to rewrite
334-
* @param snapshotIds snapshot ids for filtering returned data manifest entries
335334
* @param outputFile output file to rewrite manifest file to
336335
* @param io file io
337336
* @param format format of the manifest file
@@ -342,7 +341,6 @@ private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, Snapshot sn
342341
*/
343342
public static RewriteResult<DataFile> rewriteDataManifest(
344343
ManifestFile manifestFile,
345-
Set<Long> snapshotIds,
346344
OutputFile outputFile,
347345
FileIO io,
348346
int format,
@@ -356,9 +354,7 @@ public static RewriteResult<DataFile> rewriteDataManifest(
356354
ManifestReader<DataFile> reader =
357355
ManifestFiles.read(manifestFile, io, specsById).select(Arrays.asList("*"))) {
358356
return StreamSupport.stream(reader.entries().spliterator(), false)
359-
.map(
360-
entry ->
361-
writeDataFileEntry(entry, snapshotIds, spec, sourcePrefix, targetPrefix, writer))
357+
.map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, targetPrefix, writer))
362358
.reduce(new RewriteResult<>(), RewriteResult::append);
363359
}
364360
}
@@ -367,7 +363,6 @@ public static RewriteResult<DataFile> rewriteDataManifest(
367363
* Rewrite a delete manifest, replacing path references.
368364
*
369365
* @param manifestFile source delete manifest to rewrite
370-
* @param snapshotIds snapshot ids for filtering returned delete manifest entries
371366
* @param outputFile output file to rewrite manifest file to
372367
* @param io file io
373368
* @param format format of the manifest file
@@ -380,7 +375,6 @@ public static RewriteResult<DataFile> rewriteDataManifest(
380375
*/
381376
public static RewriteResult<DeleteFile> rewriteDeleteManifest(
382377
ManifestFile manifestFile,
383-
Set<Long> snapshotIds,
384378
OutputFile outputFile,
385379
FileIO io,
386380
int format,
@@ -399,20 +393,13 @@ public static RewriteResult<DeleteFile> rewriteDeleteManifest(
399393
.map(
400394
entry ->
401395
writeDeleteFileEntry(
402-
entry,
403-
snapshotIds,
404-
spec,
405-
sourcePrefix,
406-
targetPrefix,
407-
stagingLocation,
408-
writer))
396+
entry, spec, sourcePrefix, targetPrefix, stagingLocation, writer))
409397
.reduce(new RewriteResult<>(), RewriteResult::append);
410398
}
411399
}
412400

413401
private static RewriteResult<DataFile> writeDataFileEntry(
414402
ManifestEntry<DataFile> entry,
415-
Set<Long> snapshotIds,
416403
PartitionSpec spec,
417404
String sourcePrefix,
418405
String targetPrefix,
@@ -429,18 +416,14 @@ private static RewriteResult<DataFile> writeDataFileEntry(
429416
DataFile newDataFile =
430417
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
431418
appendEntryWithFile(entry, writer, newDataFile);
432-
// keep the following entries in metadata but exclude them from copyPlan
433-
// 1) deleted data files
434-
// 2) entries not changed by snapshotIds
435-
if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) {
419+
if (entry.isLive()) {
436420
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
437421
}
438422
return result;
439423
}
440424

441425
private static RewriteResult<DeleteFile> writeDeleteFileEntry(
442426
ManifestEntry<DeleteFile> entry,
443-
Set<Long> snapshotIds,
444427
PartitionSpec spec,
445428
String sourcePrefix,
446429
String targetPrefix,
@@ -454,10 +437,7 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
454437
case POSITION_DELETES:
455438
DeleteFile posDeleteFile = newPositionDeleteEntry(file, spec, sourcePrefix, targetPrefix);
456439
appendEntryWithFile(entry, writer, posDeleteFile);
457-
// keep the following entries in metadata but exclude them from copyPlan
458-
// 1) deleted position delete files
459-
// 2) entries not changed by snapshotIds
460-
if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) {
440+
if (entry.isLive()) {
461441
result
462442
.copyPlan()
463443
.add(
@@ -470,11 +450,7 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
470450
case EQUALITY_DELETES:
471451
DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix);
472452
appendEntryWithFile(entry, writer, eqDeleteFile);
473-
// keep the following entries in metadata but exclude them from copyPlan
474-
// 1) deleted equality delete files
475-
// 2) entries not changed by snapshotIds
476-
if (entry.isLive() && snapshotIds.contains(entry.snapshotId())) {
477-
// No need to rewrite equality delete files as they do not contain absolute file paths.
453+
if (entry.isLive()) {
478454
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
479455
}
480456
return result;

core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.assertj.core.api.Assumptions.assumeThat;
2424

2525
import java.io.IOException;
26-
import java.util.Set;
2726
import org.junit.jupiter.api.Test;
2827
import org.junit.jupiter.api.TestTemplate;
2928
import org.junit.jupiter.api.extension.ExtendWith;
@@ -268,7 +267,6 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw
268267
RewriteTablePathUtil.RewriteResult<DeleteFile> deleteFileRewriteResult =
269268
RewriteTablePathUtil.rewriteDeleteManifest(
270269
manifest,
271-
Set.of(1000L),
272270
Files.localOutput(
273271
FileFormat.AVRO.addExtension(
274272
temp.resolve("junit" + System.nanoTime()).toFile().toString())),

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,21 @@ private Result rebuildMetadata() {
311311
// rebuild manifest files
312312
Set<ManifestFile> metaFiles = rewriteManifestListResult.toRewrite();
313313
RewriteContentFileResult rewriteManifestResult =
314-
rewriteManifests(deltaSnapshots, endMetadata, metaFiles);
314+
rewriteManifests(endMetadata, metaFiles);
315+
316+
// For incremental copies, filter out files that were already copied in previous replication
317+
if (startMetadata != null) {
318+
Table startTable = newStaticTable(startVersionName, table.io());
319+
Set<String> previouslyCopiedPaths =
320+
Sets.newHashSet(
321+
contentFileDS(startTable)
322+
.select("path")
323+
.as(Encoders.STRING())
324+
.collectAsList());
325+
rewriteManifestResult
326+
.copyPlan()
327+
.removeIf(pair -> previouslyCopiedPaths.contains(pair.first()));
328+
}
315329

316330
// rebuild position delete files
317331
Set<DeleteFile> deleteFiles =
@@ -561,23 +575,20 @@ public RewriteContentFileResult appendDeleteFile(RewriteResult<DeleteFile> r1) {
561575

562576
/** Rewrite manifest files in a distributed manner and return rewritten data files path pairs. */
563577
private RewriteContentFileResult rewriteManifests(
564-
Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
578+
TableMetadata tableMetadata, Set<ManifestFile> toRewrite) {
565579
if (toRewrite.isEmpty()) {
566580
return new RewriteContentFileResult();
567581
}
568582

569583
Encoder<ManifestFile> manifestFileEncoder = Encoders.javaSerialization(ManifestFile.class);
570584
Dataset<ManifestFile> manifestDS =
571585
spark().createDataset(Lists.newArrayList(toRewrite), manifestFileEncoder);
572-
Set<Long> deltaSnapshotIds =
573-
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
574586

575587
return manifestDS
576588
.repartition(toRewrite.size())
577589
.map(
578590
toManifests(
579591
tableBroadcast(),
580-
sparkContext().broadcast(deltaSnapshotIds),
581592
stagingDir,
582593
tableMetadata.formatVersion(),
583594
sourcePrefix,
@@ -590,7 +601,6 @@ private RewriteContentFileResult rewriteManifests(
590601

591602
private static MapFunction<ManifestFile, RewriteContentFileResult> toManifests(
592603
Broadcast<Table> table,
593-
Broadcast<Set<Long>> deltaSnapshotIds,
594604
String stagingLocation,
595605
int format,
596606
String sourcePrefix,
@@ -602,24 +612,12 @@ private static MapFunction<ManifestFile, RewriteContentFileResult> toManifests(
602612
case DATA:
603613
result.appendDataFile(
604614
writeDataManifest(
605-
manifestFile,
606-
table,
607-
deltaSnapshotIds,
608-
stagingLocation,
609-
format,
610-
sourcePrefix,
611-
targetPrefix));
615+
manifestFile, table, stagingLocation, format, sourcePrefix, targetPrefix));
612616
break;
613617
case DELETES:
614618
result.appendDeleteFile(
615619
writeDeleteManifest(
616-
manifestFile,
617-
table,
618-
deltaSnapshotIds,
619-
stagingLocation,
620-
format,
621-
sourcePrefix,
622-
targetPrefix));
620+
manifestFile, table, stagingLocation, format, sourcePrefix, targetPrefix));
623621
break;
624622
default:
625623
throw new UnsupportedOperationException(
@@ -632,7 +630,6 @@ private static MapFunction<ManifestFile, RewriteContentFileResult> toManifests(
632630
private static RewriteResult<DataFile> writeDataManifest(
633631
ManifestFile manifestFile,
634632
Broadcast<Table> table,
635-
Broadcast<Set<Long>> snapshotIds,
636633
String stagingLocation,
637634
int format,
638635
String sourcePrefix,
@@ -643,16 +640,8 @@ private static RewriteResult<DataFile> writeDataManifest(
643640
FileIO io = table.getValue().io();
644641
OutputFile outputFile = io.newOutputFile(stagingPath);
645642
Map<Integer, PartitionSpec> specsById = table.getValue().specs();
646-
Set<Long> deltaSnapshotIds = snapshotIds.value();
647643
return RewriteTablePathUtil.rewriteDataManifest(
648-
manifestFile,
649-
deltaSnapshotIds,
650-
outputFile,
651-
io,
652-
format,
653-
specsById,
654-
sourcePrefix,
655-
targetPrefix);
644+
manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix);
656645
} catch (IOException e) {
657646
throw new RuntimeIOException(e);
658647
}
@@ -661,7 +650,6 @@ private static RewriteResult<DataFile> writeDataManifest(
661650
private static RewriteResult<DeleteFile> writeDeleteManifest(
662651
ManifestFile manifestFile,
663652
Broadcast<Table> table,
664-
Broadcast<Set<Long>> snapshotIds,
665653
String stagingLocation,
666654
int format,
667655
String sourcePrefix,
@@ -672,16 +660,8 @@ private static RewriteResult<DeleteFile> writeDeleteManifest(
672660
FileIO io = table.getValue().io();
673661
OutputFile outputFile = io.newOutputFile(stagingPath);
674662
Map<Integer, PartitionSpec> specsById = table.getValue().specs();
675-
Set<Long> deltaSnapshotIds = snapshotIds.value();
676663
return RewriteTablePathUtil.rewriteDeleteManifest(
677-
manifestFile,
678-
deltaSnapshotIds,
679-
outputFile,
680-
io,
681-
format,
682-
specsById,
683-
sourcePrefix,
684-
targetPrefix,
664+
manifestFile, outputFile, io, format, specsById, sourcePrefix, targetPrefix,
685665
stagingLocation);
686666
} catch (IOException e) {
687667
throw new RuntimeIOException(e);

spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,104 @@ public void testIncrementalRewrite() throws Exception {
316316
assertEquals("Rows should match after copy", expected, actual);
317317
}
318318

319+
@TestTemplate
320+
public void testIncrementalRewriteWithRewriteManifestsAndExpire() throws Exception {
321+
String location = newTableLocation();
322+
Table sourceTable =
323+
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), location);
324+
325+
// Write 2 initial snapshots
326+
List<ThreeColumnRecord> recordsA =
327+
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
328+
Dataset<Row> dfA = spark.createDataFrame(recordsA, ThreeColumnRecord.class).coalesce(1);
329+
dfA.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
330+
331+
List<ThreeColumnRecord> recordsB =
332+
Lists.newArrayList(new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"));
333+
Dataset<Row> dfB = spark.createDataFrame(recordsB, ThreeColumnRecord.class).coalesce(1);
334+
dfB.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
335+
sourceTable.refresh();
336+
337+
// Full replication
338+
RewriteTablePath.Result initialResult =
339+
actions()
340+
.rewriteTablePath(sourceTable)
341+
.rewriteLocationPrefix(sourceTable.location(), targetTableLocation())
342+
.stagingLocation(stagingLocation())
343+
.execute();
344+
copyTableFiles(initialResult);
345+
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
346+
.hasSize(2);
347+
348+
// Append new data
349+
List<ThreeColumnRecord> recordsC =
350+
Lists.newArrayList(new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"));
351+
Dataset<Row> dfC = spark.createDataFrame(recordsC, ThreeColumnRecord.class).coalesce(1);
352+
dfC.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
353+
sourceTable.refresh();
354+
355+
// RewriteManifests merges all manifests
356+
actions().rewriteManifests(sourceTable).execute();
357+
sourceTable.refresh();
358+
359+
// Expire old snapshots including the one that added file C
360+
actions()
361+
.expireSnapshots(sourceTable)
362+
.retainLast(1)
363+
.expireOlderThan(sourceTable.currentSnapshot().timestampMillis() + 1)
364+
.execute();
365+
sourceTable.refresh();
366+
367+
// Incremental replication should still copy file C
368+
Table targetTable = TABLES.load(targetTableLocation());
369+
String startVersion = fileName(currentMetadata(targetTable).metadataFileLocation());
370+
RewriteTablePath.Result incrementalResult =
371+
actions()
372+
.rewriteTablePath(sourceTable)
373+
.rewriteLocationPrefix(sourceTable.location(), targetTableLocation())
374+
.stagingLocation(stagingLocation())
375+
.startVersion(startVersion)
376+
.execute();
377+
copyTableFiles(incrementalResult);
378+
379+
List<Object[]> actual = rowsSorted(targetTableLocation(), "c1");
380+
List<Object[]> expected = rowsSorted(location, "c1");
381+
assertEquals("Rows should match after incremental copy with expired snapshots", expected,
382+
actual);
383+
}
384+
385+
@TestTemplate
386+
public void testFullCopyWithRewriteManifestsAndExpire() throws Exception {
387+
String location = newTableLocation();
388+
Table sourceTable = createTableWithSnapshots(location, 2);
389+
390+
// RewriteManifests
391+
actions().rewriteManifests(sourceTable).execute();
392+
sourceTable.refresh();
393+
394+
// Expire all but the latest snapshot
395+
actions()
396+
.expireSnapshots(sourceTable)
397+
.retainLast(1)
398+
.expireOlderThan(sourceTable.currentSnapshot().timestampMillis() + 1)
399+
.execute();
400+
sourceTable.refresh();
401+
402+
// Full copy should include all live data files despite expired snapshot IDs
403+
RewriteTablePath.Result result =
404+
actions()
405+
.rewriteTablePath(sourceTable)
406+
.rewriteLocationPrefix(sourceTable.location(), targetTableLocation())
407+
.stagingLocation(stagingLocation())
408+
.execute();
409+
copyTableFiles(result);
410+
411+
List<Object[]> actual = rowsSorted(targetTableLocation(), "c1");
412+
List<Object[]> expected = rowsSorted(location, "c1");
413+
assertEquals("All rows should be present after full copy with expired snapshots", expected,
414+
actual);
415+
}
416+
319417
@TestTemplate
320418
public void testTableWith3Snapshots(@TempDir Path location1, @TempDir Path location2)
321419
throws Exception {
@@ -733,8 +831,9 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
733831
// the first snapshot
734832
// from the version file history
735833
int missingVersionFile = 1;
736-
// since first snapshot cannot be found, first data files will also be skipped
737-
int missingDataFile = 1;
834+
// the first snapshot was expired, but its data file is still live and
835+
// included in the copy plan
836+
int missingDataFile = 0;
738837
RewriteTablePath.Result result =
739838
actions()
740839
.rewriteTablePath(sourceTable)

0 commit comments

Comments
 (0)