Skip to content

Commit 19e8dee

Browse files
committed
feat: Add metadata-only replace API to Table for REPLACE snapshot operations
Fixes #3130
1 parent 4173ef7 commit 19e8dee

File tree

4 files changed

+228
-2
lines changed

4 files changed

+228
-2
lines changed

pyiceberg/table/__init__.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,32 @@ def update_statistics(self) -> UpdateStatistics:
450450
"""
451451
return UpdateStatistics(transaction=self)
452452

453+
def replace(
454+
self,
455+
files_to_delete: Iterable[DataFile],
456+
files_to_add: Iterable[DataFile],
457+
snapshot_properties: dict[str, str] = EMPTY_DICT,
458+
branch: str | None = MAIN_BRANCH,
459+
) -> None:
460+
"""
461+
Shorthand for replacing existing files with new files.
462+
463+
A replace will produce a REPLACE snapshot that will ignore existing
464+
files and replace them with the new files.
465+
466+
Args:
467+
files_to_delete: The files to delete
468+
files_to_add: The new files to add that replace the deleted files
469+
snapshot_properties: Custom properties to be added to the snapshot summary
470+
branch: Branch Reference to run the replace operation
471+
"""
472+
with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot:
473+
for file_to_delete in files_to_delete:
474+
replace_snapshot.delete_data_file(file_to_delete)
475+
476+
for data_file in files_to_add:
477+
replace_snapshot.append_data_file(data_file)
478+
453479
def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
454480
"""
455481
Shorthand API for appending a PyArrow table to a table transaction.
@@ -1384,6 +1410,33 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT,
13841410
with self.transaction() as tx:
13851411
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
13861412

1413+
def replace(
1414+
self,
1415+
files_to_delete: Iterable[DataFile],
1416+
files_to_add: Iterable[DataFile],
1417+
snapshot_properties: dict[str, str] = EMPTY_DICT,
1418+
branch: str | None = MAIN_BRANCH,
1419+
) -> None:
1420+
"""
1421+
Shorthand for replacing existing files with new files.
1422+
1423+
A replace will produce a REPLACE snapshot that will ignore existing
1424+
files and replace them with the new files.
1425+
1426+
Args:
1427+
files_to_delete: The files to delete
1428+
files_to_add: The new files to add that replace the deleted files
1429+
snapshot_properties: Custom properties to be added to the snapshot summary
1430+
branch: Branch Reference to run the replace operation
1431+
"""
1432+
with self.transaction() as tx:
1433+
tx.replace(
1434+
files_to_delete=files_to_delete,
1435+
files_to_add=files_to_add,
1436+
snapshot_properties=snapshot_properties,
1437+
branch=branch,
1438+
)
1439+
13871440
def dynamic_partition_overwrite(
13881441
self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH
13891442
) -> None:

pyiceberg/table/snapshots.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
344344

345345

346346
def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary:
347-
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
347+
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}:
348348
raise ValueError(f"Operation not implemented: {summary.operation}")
349349

350350
if not previous_summary:

pyiceberg/table/update/snapshot.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,81 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
666666
else:
667667
return []
668668

669+
class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]):
670+
"""A snapshot producer that rewrites data files."""
671+
672+
def __init__(self, operation: Operation, transaction: Transaction, io: FileIO, snapshot_properties: dict[str, str]):
673+
super().__init__(operation, transaction, io, snapshot_properties)
674+
675+
def _commit(self) -> UpdatesAndRequirements:
676+
# Only produce a commit when there is something to rewrite
677+
if self._deleted_data_files or self._added_data_files:
678+
return super()._commit()
679+
else:
680+
return (), ()
681+
682+
def _deleted_entries(self) -> list[ManifestEntry]:
683+
"""Check if we need to mark the files as deleted."""
684+
if self._parent_snapshot_id is not None:
685+
previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
686+
if previous_snapshot is None:
687+
raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}")
688+
689+
executor = ExecutorFactory.get_or_create()
690+
691+
def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]:
692+
return [
693+
ManifestEntry.from_args(
694+
status=ManifestEntryStatus.DELETED,
695+
snapshot_id=entry.snapshot_id,
696+
sequence_number=entry.sequence_number,
697+
file_sequence_number=entry.file_sequence_number,
698+
data_file=entry.data_file,
699+
)
700+
for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True)
701+
if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files
702+
]
703+
704+
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
705+
return list(itertools.chain(*list_of_entries))
706+
else:
707+
return []
708+
709+
def _existing_manifests(self) -> list[ManifestFile]:
710+
"""To determine if there are any existing manifests."""
711+
existing_files = []
712+
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
713+
for manifest_file in snapshot.manifests(io=self._io):
714+
entries_to_write: set[ManifestEntry] = set()
715+
found_deleted_entries: set[ManifestEntry] = set()
716+
717+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
718+
if entry.data_file in self._deleted_data_files:
719+
found_deleted_entries.add(entry)
720+
else:
721+
entries_to_write.add(entry)
722+
723+
if len(found_deleted_entries) == 0:
724+
existing_files.append(manifest_file)
725+
continue
726+
727+
if len(entries_to_write) == 0:
728+
continue
729+
730+
with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer:
731+
for entry in entries_to_write:
732+
writer.add_entry(
733+
ManifestEntry.from_args(
734+
status=ManifestEntryStatus.EXISTING,
735+
snapshot_id=entry.snapshot_id,
736+
sequence_number=entry.sequence_number,
737+
file_sequence_number=entry.file_sequence_number,
738+
data_file=entry.data_file,
739+
)
740+
)
741+
existing_files.append(writer.to_manifest_file())
742+
return existing_files
743+
669744

670745
class UpdateSnapshot:
671746
_transaction: Transaction
@@ -724,7 +799,13 @@ def delete(self) -> _DeleteFiles:
724799
snapshot_properties=self._snapshot_properties,
725800
)
726801

727-
802+
def replace(self) -> _RewriteFiles:
803+
return _RewriteFiles(
804+
operation=Operation.REPLACE,
805+
transaction=self._transaction,
806+
io=self._io,
807+
snapshot_properties=self._snapshot_properties,
808+
)
728809
class _ManifestMergeManager(Generic[U]):
729810
_target_size_bytes: int
730811
_min_count_to_merge: int

tests/table/test_replace.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import uuid
2+
import pytest
3+
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
4+
from pyiceberg.table.snapshots import Operation
5+
from pyiceberg.partitioning import PartitionSpec
6+
from pyiceberg.schema import Schema
7+
from pyiceberg.typedef import Record
8+
9+
def test_replace_api(catalog):
10+
# Setup a basic table using the catalog fixture
11+
catalog.create_namespace("default")
12+
table = catalog.create_table(
13+
identifier="default.test_replace",
14+
schema=Schema(),
15+
)
16+
17+
# Create mock DataFiles for the test
18+
file_to_delete = DataFile.from_args(
19+
file_path="s3://bucket/test/data/deleted.parquet",
20+
file_format=FileFormat.PARQUET,
21+
partition=Record(),
22+
record_count=100,
23+
file_size_in_bytes=1024,
24+
content=DataFileContent.DATA,
25+
)
26+
file_to_delete.spec_id = 0
27+
28+
file_to_add = DataFile.from_args(
29+
file_path="s3://bucket/test/data/added.parquet",
30+
file_format=FileFormat.PARQUET,
31+
partition=Record(),
32+
record_count=100,
33+
file_size_in_bytes=1024,
34+
content=DataFileContent.DATA,
35+
)
36+
file_to_add.spec_id = 0
37+
38+
# Initially append to have something to replace
39+
with table.transaction() as tx:
40+
with tx.update_snapshot().fast_append() as append_snapshot:
41+
append_snapshot.append_data_file(file_to_delete)
42+
43+
# Verify initial append snapshot
44+
assert len(table.history()) == 1
45+
snapshot = table.current_snapshot()
46+
assert snapshot.summary["operation"] == Operation.APPEND
47+
48+
# Call the replace API
49+
table.replace(
50+
files_to_delete=[file_to_delete],
51+
files_to_add=[file_to_add]
52+
)
53+
54+
# Verify the replacement created a REPLACE snapshot
55+
assert len(table.history()) == 2
56+
snapshot = table.current_snapshot()
57+
assert snapshot.summary["operation"] == Operation.REPLACE
58+
59+
# Verify the correct files are added and deleted
60+
# The summary property tracks these counts
61+
assert snapshot.summary["added-data-files"] == "1"
62+
assert snapshot.summary["deleted-data-files"] == "1"
63+
assert snapshot.summary["added-records"] == "100"
64+
assert snapshot.summary["deleted-records"] == "100"
65+
66+
# Verify the new file exists in the new manifest
67+
manifest_files = snapshot.manifests(table.io)
68+
assert len(manifest_files) == 2 # One for ADDED, one for DELETED
69+
70+
# Check that sequence numbers were handled properly natively by verifying the manifest contents
71+
entries = []
72+
for manifest in manifest_files:
73+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
74+
entries.append(entry)
75+
76+
# One entry for ADDED (new file), one for DELETED (old file)
77+
assert len(entries) == 2
78+
79+
def test_replace_empty_files(catalog):
80+
# Setup a basic table using the catalog fixture
81+
catalog.create_namespace("default")
82+
table = catalog.create_table(
83+
identifier="default.test_replace_empty",
84+
schema=Schema(),
85+
)
86+
87+
# Replacing empty lists should not throw errors, but should produce no changes.
88+
table.replace([], [])
89+
90+
# History should be completely empty since no files were rewritten
91+
assert len(table.history()) == 0
92+
assert table.current_snapshot() is None

0 commit comments

Comments
 (0)