Skip to content

Commit 1d3b70f

Browse files
committed
Add metadata tables for data_files and delete_files
1 parent c96cee2 commit 1d3b70f

File tree

2 files changed

+165
-130
lines changed

2 files changed

+165
-130
lines changed

pyiceberg/table/__init__.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4308,7 +4308,7 @@ def history(self) -> "pa.Table":
43084308

43094309
return pa.Table.from_pylist(history, schema=history_schema)
43104310

4311-
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
4311+
def _files(self, snapshot_id: Optional[int] = None, file_content_type: str = "all") -> "pa.Table":
43124312
import pyarrow as pa
43134313

43144314
from pyiceberg.io.pyarrow import schema_to_pyarrow
@@ -4365,6 +4365,10 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
43654365
for manifest_list in snapshot.manifests(io):
43664366
for manifest_entry in manifest_list.fetch_manifest_entry(io):
43674367
data_file = manifest_entry.data_file
4368+
if file_content_type == "data" and data_file.content != DataFileContent.DATA:
4369+
continue
4370+
if file_content_type == "delete" and data_file.content == DataFileContent.DATA:
4371+
continue
43684372
column_sizes = data_file.column_sizes or {}
43694373
value_counts = data_file.value_counts or {}
43704374
null_value_counts = data_file.null_value_counts or {}
@@ -4393,12 +4397,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
43934397
"spec_id": data_file.spec_id,
43944398
"record_count": data_file.record_count,
43954399
"file_size_in_bytes": data_file.file_size_in_bytes,
4396-
"column_sizes": dict(data_file.column_sizes),
4397-
"value_counts": dict(data_file.value_counts),
4398-
"null_value_counts": dict(data_file.null_value_counts),
4399-
"nan_value_counts": dict(data_file.nan_value_counts),
4400-
"lower_bounds": dict(data_file.lower_bounds),
4401-
"upper_bounds": dict(data_file.upper_bounds),
4400+
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
4401+
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
4402+
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
4403+
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
4404+
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
4405+
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
44024406
"key_metadata": data_file.key_metadata,
44034407
"split_offsets": data_file.split_offsets,
44044408
"equality_ids": data_file.equality_ids,
@@ -4411,6 +4415,15 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
44114415
schema=files_schema,
44124416
)
44134417

4418+
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
4419+
return self._files(snapshot_id)
4420+
4421+
def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
4422+
return self._files(snapshot_id, "data")
4423+
4424+
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
4425+
return self._files(snapshot_id, "delete")
4426+
44144427

44154428
class _ManifestMergeManager(Generic[U]):
44164429
_target_size_bytes: int

tests/integration/test_inspect_table.py

Lines changed: 145 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -672,126 +672,141 @@ def test_inspect_files(
672672
# append more data
673673
tbl.append(arrow_table_with_null)
674674

675-
df = tbl.refresh().inspect.files()
675+
# configure table properties
676+
if format_version == 2:
677+
with tbl.transaction() as txn:
678+
txn.set_properties({"write.delete.mode": "merge-on-read"})
679+
spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
676680

677-
assert df.column_names == [
678-
"content",
679-
"file_path",
680-
"file_format",
681-
"spec_id",
682-
"record_count",
683-
"file_size_in_bytes",
684-
"column_sizes",
685-
"value_counts",
686-
"null_value_counts",
687-
"nan_value_counts",
688-
"lower_bounds",
689-
"upper_bounds",
690-
"key_metadata",
691-
"split_offsets",
692-
"equality_ids",
693-
"sort_order_id",
694-
"readable_metrics",
695-
]
696-
697-
# make sure the non-nullable fields are filled
698-
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
699-
for value in df[int_column]:
700-
assert isinstance(value.as_py(), int)
701-
702-
for split_offsets in df["split_offsets"]:
703-
assert isinstance(split_offsets.as_py(), list)
704-
705-
for file_format in df["file_format"]:
706-
assert file_format.as_py() == "PARQUET"
681+
files_df = tbl.refresh().inspect.files()
707682

708-
for file_path in df["file_path"]:
709-
assert file_path.as_py().startswith("s3://")
683+
data_files_df = tbl.inspect.data_files()
710684

711-
lhs = df.to_pandas()
712-
rhs = spark.table(f"{identifier}.files").toPandas()
685+
delete_files_df = tbl.inspect.delete_files()
713686

714-
lhs_subset = lhs[
715-
[
687+
def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
688+
assert df.column_names == [
716689
"content",
717690
"file_path",
718691
"file_format",
719692
"spec_id",
720693
"record_count",
721694
"file_size_in_bytes",
695+
"column_sizes",
696+
"value_counts",
697+
"null_value_counts",
698+
"nan_value_counts",
699+
"lower_bounds",
700+
"upper_bounds",
701+
"key_metadata",
722702
"split_offsets",
723703
"equality_ids",
724704
"sort_order_id",
705+
"readable_metrics",
725706
]
726-
]
727-
rhs_subset = rhs[
728-
[
729-
"content",
730-
"file_path",
731-
"file_format",
732-
"spec_id",
733-
"record_count",
734-
"file_size_in_bytes",
735-
"split_offsets",
736-
"equality_ids",
737-
"sort_order_id",
707+
708+
# make sure the non-nullable fields are filled
709+
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
710+
for value in df[int_column]:
711+
assert isinstance(value.as_py(), int)
712+
713+
for split_offsets in df["split_offsets"]:
714+
assert isinstance(split_offsets.as_py(), list)
715+
716+
for file_format in df["file_format"]:
717+
assert file_format.as_py() == "PARQUET"
718+
719+
for file_path in df["file_path"]:
720+
assert file_path.as_py().startswith("s3://")
721+
722+
lhs = df.to_pandas()
723+
rhs = spark_df.toPandas()
724+
725+
lhs_subset = lhs[
726+
[
727+
"content",
728+
"file_path",
729+
"file_format",
730+
"spec_id",
731+
"record_count",
732+
"file_size_in_bytes",
733+
"split_offsets",
734+
"equality_ids",
735+
"sort_order_id",
736+
]
737+
]
738+
rhs_subset = rhs[
739+
[
740+
"content",
741+
"file_path",
742+
"file_format",
743+
"spec_id",
744+
"record_count",
745+
"file_size_in_bytes",
746+
"split_offsets",
747+
"equality_ids",
748+
"sort_order_id",
749+
]
738750
]
739-
]
740751

741-
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
752+
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
742753

743-
for column in df.column_names:
744-
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
745-
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
746-
# NaN != NaN in Python
747-
continue
748-
if column in [
749-
"column_sizes",
750-
"value_counts",
751-
"null_value_counts",
752-
"nan_value_counts",
753-
"lower_bounds",
754-
"upper_bounds",
755-
]:
756-
if isinstance(right, dict):
757-
left = dict(left)
758-
assert left == right, f"Difference in column {column}: {left} != {right}"
754+
for column in df.column_names:
755+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
756+
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
757+
# NaN != NaN in Python
758+
continue
759+
if column in [
760+
"column_sizes",
761+
"value_counts",
762+
"null_value_counts",
763+
"nan_value_counts",
764+
"lower_bounds",
765+
"upper_bounds",
766+
]:
767+
if isinstance(right, dict):
768+
left = dict(left)
769+
assert left == right, f"Difference in column {column}: {left} != {right}"
759770

760-
elif column == "readable_metrics":
761-
assert list(left.keys()) == [
762-
"bool",
763-
"string",
764-
"string_long",
765-
"int",
766-
"long",
767-
"float",
768-
"double",
769-
"timestamp",
770-
"timestamptz",
771-
"date",
772-
"binary",
773-
"fixed",
774-
]
775-
assert left.keys() == right.keys()
776-
777-
for rm_column in left.keys():
778-
rm_lhs = left[rm_column]
779-
rm_rhs = right[rm_column]
780-
781-
assert rm_lhs["column_size"] == rm_rhs["column_size"]
782-
assert rm_lhs["value_count"] == rm_rhs["value_count"]
783-
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
784-
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
785-
786-
if rm_column == "timestamptz":
787-
# PySpark does not correctly set the timstamptz
788-
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
789-
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
790-
791-
assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
792-
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
793-
else:
794-
assert left == right, f"Difference in column {column}: {left} != {right}"
771+
elif column == "readable_metrics":
772+
assert list(left.keys()) == [
773+
"bool",
774+
"string",
775+
"string_long",
776+
"int",
777+
"long",
778+
"float",
779+
"double",
780+
"timestamp",
781+
"timestamptz",
782+
"date",
783+
"binary",
784+
"fixed",
785+
]
786+
assert left.keys() == right.keys()
787+
788+
for rm_column in left.keys():
789+
rm_lhs = left[rm_column]
790+
rm_rhs = right[rm_column]
791+
792+
assert rm_lhs["column_size"] == rm_rhs["column_size"]
793+
assert rm_lhs["value_count"] == rm_rhs["value_count"]
794+
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
795+
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
796+
797+
if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
798+
# PySpark does not correctly set the timstamptz
799+
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
800+
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
801+
802+
assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
803+
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
804+
else:
805+
assert left == right, f"Difference in column {column}: {left} != {right}"
806+
807+
inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
808+
inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files"))
809+
inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files"))
795810

796811

797812
@pytest.mark.integration
@@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog
801816

802817
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
803818

804-
df = tbl.refresh().inspect.files()
819+
files_df = tbl.refresh().inspect.files()
820+
data_files_df = tbl.inspect.data_files()
821+
delete_files_df = tbl.inspect.delete_files()
805822

806-
assert df.column_names == [
807-
"content",
808-
"file_path",
809-
"file_format",
810-
"spec_id",
811-
"record_count",
812-
"file_size_in_bytes",
813-
"column_sizes",
814-
"value_counts",
815-
"null_value_counts",
816-
"nan_value_counts",
817-
"lower_bounds",
818-
"upper_bounds",
819-
"key_metadata",
820-
"split_offsets",
821-
"equality_ids",
822-
"sort_order_id",
823-
"readable_metrics",
824-
]
823+
def inspect_files_asserts(df: pa.Table) -> None:
824+
assert df.column_names == [
825+
"content",
826+
"file_path",
827+
"file_format",
828+
"spec_id",
829+
"record_count",
830+
"file_size_in_bytes",
831+
"column_sizes",
832+
"value_counts",
833+
"null_value_counts",
834+
"nan_value_counts",
835+
"lower_bounds",
836+
"upper_bounds",
837+
"key_metadata",
838+
"split_offsets",
839+
"equality_ids",
840+
"sort_order_id",
841+
"readable_metrics",
842+
]
843+
844+
assert df.to_pandas().empty is True
825845

826-
assert df.to_pandas().empty is True
846+
inspect_files_asserts(files_df)
847+
inspect_files_asserts(data_files_df)
848+
inspect_files_asserts(delete_files_df)

0 commit comments

Comments
 (0)