Skip to content

Commit e63feff

Browse files
committed
Added implementation to delete file.
1 parent e522567 commit e63feff

File tree

2 files changed

+163
-0
lines changed

2 files changed

+163
-0
lines changed

pyiceberg/table/__init__.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,40 @@ def add_files(
936936
for data_file in data_files:
937937
append_files.append_data_file(data_file)
938938

939+
def delete_files(
940+
self,
941+
file_paths: list[str],
942+
snapshot_properties: dict[str, str] = EMPTY_DICT,
943+
branch: str | None = MAIN_BRANCH,
944+
) -> None:
945+
"""
946+
Shorthand API for removing data files from the table transaction by their paths.
947+
948+
Args:
949+
file_paths: The list of full file paths to be removed from the table
950+
snapshot_properties: Custom properties to be added to the snapshot summary
951+
branch: Branch to delete files from
952+
953+
Raises:
954+
ValueError: If file_paths contains duplicates
955+
ValueError: If any file paths are not found in the table
956+
"""
957+
if len(file_paths) != len(set(file_paths)):
958+
raise ValueError("File paths must be unique")
959+
960+
file_paths_set = set(file_paths)
961+
data_files = _get_data_files_from_snapshot(
962+
table_metadata=self.table_metadata, file_paths=file_paths_set, io=self._table.io, branch=branch
963+
)
964+
965+
missing_files = file_paths_set - set(data_files.keys())
966+
if missing_files:
967+
raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}")
968+
969+
with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot:
970+
for data_file in data_files.values():
971+
overwrite_snapshot.delete_data_file(data_file)
972+
939973
def update_spec(self) -> UpdateSpec:
940974
"""Create a new UpdateSpec to update the partitioning of the table.
941975
@@ -1506,6 +1540,31 @@ def add_files(
15061540
branch=branch,
15071541
)
15081542

1543+
def delete_files(
1544+
self,
1545+
file_paths: list[str],
1546+
snapshot_properties: dict[str, str] = EMPTY_DICT,
1547+
branch: str | None = MAIN_BRANCH,
1548+
) -> None:
1549+
"""
1550+
Shorthand API for removing data files from the table by their paths.
1551+
1552+
Args:
1553+
file_paths: The list of full file paths to be removed from the table
1554+
snapshot_properties: Custom properties to be added to the snapshot summary
1555+
branch: Branch to delete files from
1556+
1557+
Raises:
1558+
ValueError: If file_paths contains duplicates
1559+
ValueError: If any file paths are not found in the table
1560+
"""
1561+
with self.transaction() as tx:
1562+
tx.delete_files(
1563+
file_paths=file_paths,
1564+
snapshot_properties=snapshot_properties,
1565+
branch=branch,
1566+
)
1567+
15091568
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
15101569
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)
15111570

@@ -2175,3 +2234,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list
21752234
futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths]
21762235

21772236
return [f.result() for f in futures if f.result()]
2237+
2238+
2239+
def _get_data_files_from_snapshot(
2240+
table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH
2241+
) -> dict[str, DataFile]:
2242+
snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot()
2243+
if snapshot is None:
2244+
return {}
2245+
2246+
result: dict[str, DataFile] = {}
2247+
for manifest in snapshot.manifests(io):
2248+
if manifest.content == ManifestContent.DATA:
2249+
for entry in manifest.fetch_manifest_entry(io, discard_deleted=True):
2250+
if entry.data_file.file_path in file_paths:
2251+
result[entry.data_file.file_path] = entry.data_file
2252+
if len(result) == len(file_paths):
2253+
return result
2254+
return result

tests/integration/test_add_files.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,3 +1040,89 @@ def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, form
10401040

10411041
for col in branch_df.columns:
10421042
assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null"
1043+
1044+
1045+
@pytest.mark.integration
1046+
def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
1047+
identifier = f"default.delete_files_unpartitioned_v{format_version}"
1048+
tbl = _create_table(session_catalog, identifier, format_version)
1049+
1050+
file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
1051+
for file_path in file_paths:
1052+
_write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE)
1053+
1054+
tbl.add_files(file_paths=file_paths)
1055+
assert len(tbl.scan().to_arrow()) == 5
1056+
1057+
tbl.delete_files(file_paths=file_paths[:2])
1058+
1059+
rows = spark.sql(
1060+
f"""
1061+
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
1062+
FROM {identifier}.all_manifests
1063+
"""
1064+
).collect()
1065+
1066+
assert sum(row.deleted_data_files_count for row in rows) == 2
1067+
1068+
df = spark.table(identifier)
1069+
assert df.count() == 3
1070+
1071+
assert len(tbl.scan().to_arrow()) == 3
1072+
1073+
1074+
@pytest.mark.integration
1075+
def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None:
1076+
identifier = f"default.delete_files_nonexistent_v{format_version}"
1077+
tbl = _create_table(session_catalog, identifier, format_version)
1078+
1079+
file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)]
1080+
for file_path in file_paths:
1081+
_write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE)
1082+
1083+
tbl.add_files(file_paths=file_paths)
1084+
1085+
with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"):
1086+
tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"])
1087+
1088+
1089+
@pytest.mark.integration
1090+
def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None:
1091+
identifier = f"default.delete_files_duplicate_v{format_version}"
1092+
tbl = _create_table(session_catalog, identifier, format_version)
1093+
1094+
file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet"
1095+
_write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE)
1096+
1097+
tbl.add_files(file_paths=[file_path])
1098+
1099+
with pytest.raises(ValueError, match="File paths must be unique"):
1100+
tbl.delete_files(file_paths=[file_path, file_path])
1101+
1102+
1103+
@pytest.mark.integration
1104+
def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
1105+
identifier = f"default.delete_files_branch_v{format_version}"
1106+
branch = "branch1"
1107+
1108+
tbl = _create_table(session_catalog, identifier, format_version)
1109+
1110+
file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)]
1111+
for file_path in file_paths:
1112+
_write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE)
1113+
1114+
tbl.append(ARROW_TABLE)
1115+
assert tbl.metadata.current_snapshot_id is not None
1116+
tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
1117+
1118+
tbl.add_files(file_paths=file_paths, branch=branch)
1119+
branch_df = spark.table(f"{identifier}.branch_{branch}")
1120+
assert branch_df.count() == 6
1121+
1122+
tbl.delete_files(file_paths=file_paths[:3], branch=branch)
1123+
1124+
branch_df = spark.table(f"{identifier}.branch_{branch}")
1125+
assert branch_df.count() == 3
1126+
1127+
main_df = spark.table(identifier)
1128+
assert main_df.count() == 1

0 commit comments

Comments
 (0)