Core, Spark: Fix invalid snapshot ID filtering in RewriteTablePath copy plan#16220
Core, Spark: Fix invalid snapshot ID filtering in RewriteTablePath copy plan#16220krisnaru wants to merge 1 commit into
Conversation
fc3db69 to
805b520
Compare
f9503fd to
ce254d6
Compare
anuragmantri
left a comment
There was a problem hiding this comment.
The convention we follow is that we raise the initial PR against the latest Spark branch. After the first review we can bulk backport to rest of the branches.
@krisnaru - Could you create Spark 4.1 only PR out of this for easy review, please?
|
|
||
| List<Object[]> actual = rowsSorted(targetTableLocation(), "c1"); | ||
| List<Object[]> expected = rowsSorted(location, "c1"); | ||
| assertEquals( |
There was a problem hiding this comment.
Worth asserting the incremental result only contains 1 new file (fileC) to validate dedup is actually filtering.
| .copyPlan() | ||
| .removeIf(pair -> previouslyCopiedPaths.contains(pair.first())); | ||
| } catch (Exception e) { | ||
| LOG.warn("Unable to read content files from start version for incremental filtering", e); |
There was a problem hiding this comment.
Can you also add the consequence of this to the warning - falling back to full copy plan.
ce254d6 to
3f72d71
Compare
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
Fixes #14458
Summary
Problem
RewriteTablePathUtil.writeDataFileEntry filtered copy plan entries using snapshotIds.contains(entry.snapshotId()). This is fundamentally broken because entry.snapshotId() reflects the snapshot that originally added the file, which can be expired at any time by table maintenance. When RewriteManifests reorganizes entries from expired snapshots into new manifests, the manifest is correctly selected for processing but the entry-level filter drops the files — causing silent data loss at the target.
Scenario:
Append (S3) → RewriteManifests (S4) → Expire S3 → Incremental replication builds deltaSnapshotIds = {S4}, but file C has entry.snapshotId() = S3 → {S4}.contains(S3) == false → file C excluded from copy plan.
Approach
Test plan