Skip to content

Commit 2bce484

Browse files
committed
Add partition field in files metadata table schema
1 parent 9fff025 commit 2bce484

2 files changed

Lines changed: 83 additions & 1 deletion

File tree

pyiceberg/table/inspect.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,11 +557,17 @@ def _get_files_from_manifest(
557557
}
558558
for field in self.tbl.metadata.schema().fields
559559
}
560+
partition = data_file.partition
561+
partition_record_dict = {
562+
field.name: partition[pos]
563+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest_list.partition_spec_id].fields)
564+
}
560565
files.append(
561566
{
562567
"content": data_file.content,
563568
"file_path": data_file.file_path,
564569
"file_format": data_file.file_format,
570+
"partition": partition_record_dict,
565571
"spec_id": data_file.spec_id,
566572
"record_count": data_file.record_count,
567573
"file_size_in_bytes": data_file.file_size_in_bytes,
@@ -604,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
604610
]
605611
)
606612

613+
partition_record = self.tbl.metadata.specs_struct()
614+
pa_record_struct = schema_to_pyarrow(partition_record)
615+
607616
for field in self.tbl.metadata.schema().fields:
608617
readable_metrics_struct.append(
609618
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
@@ -614,6 +623,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
614623
pa.field("content", pa.int8(), nullable=False),
615624
pa.field("file_path", pa.string(), nullable=False),
616625
pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False),
626+
pa.field("partition", pa_record_struct, nullable=False),
617627
pa.field("spec_id", pa.int32(), nullable=False),
618628
pa.field("record_count", pa.int64(), nullable=False),
619629
pa.field("file_size_in_bytes", pa.int64(), nullable=False),

tests/integration/test_inspect_table.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
7878
"content",
7979
"file_path",
8080
"file_format",
81+
"partition",
8182
"spec_id",
8283
"record_count",
8384
"file_size_in_bytes",
@@ -141,6 +142,9 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
141142
assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
142143

143144
for column in df.column_names:
145+
if column == "partition":
146+
# Spark leaves out the partition if the table is unpartitioned
147+
continue
144148
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
145149
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
146150
# NaN != NaN in Python
@@ -833,6 +837,7 @@ def inspect_files_asserts(df: pa.Table) -> None:
833837
"content",
834838
"file_path",
835839
"file_format",
840+
"partition",
836841
"spec_id",
837842
"record_count",
838843
"file_size_in_bytes",
@@ -1010,7 +1015,6 @@ def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Ca
10101015
spark.sql(insert_data_sql)
10111016
spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1")
10121017
spark.sql(f"DELETE FROM {identifier} WHERE int = 9")
1013-
spark.table(identifier).show(20, False)
10141018

10151019
tbl.refresh()
10161020

@@ -1029,3 +1033,71 @@ def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Ca
10291033
_inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files"))
10301034
_inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files"))
10311035
_inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files"))
1036+
1037+
1038+
@pytest.mark.integration
1039+
@pytest.mark.parametrize("format_version", [1, 2, 3])
1040+
def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
1041+
from pandas.testing import assert_frame_equal
1042+
1043+
identifier = "default.table_metadata_files_partitioned"
1044+
try:
1045+
session_catalog.drop_table(identifier=identifier)
1046+
except NoSuchTableError:
1047+
pass
1048+
1049+
spark.sql(
1050+
f"""
1051+
CREATE TABLE {identifier} (
1052+
dt date,
1053+
int_data int
1054+
)
1055+
PARTITIONED BY (months(dt))
1056+
TBLPROPERTIES ('format-version'='{format_version}')
1057+
"""
1058+
)
1059+
1060+
if format_version > 1:
1061+
spark.sql(
1062+
f"""
1063+
ALTER TABLE {identifier} SET TBLPROPERTIES(
1064+
'write.update.mode' = 'merge-on-read',
1065+
'write.delete.mode' = 'merge-on-read',
1066+
'write.merge.mode' = 'merge-on-read')
1067+
"""
1068+
)
1069+
1070+
spark.sql(f"""
1071+
INSERT INTO {identifier} VALUES (CAST('2025-01-01' AS date), 1), (CAST('2025-01-01' AS date), 2)
1072+
""")
1073+
1074+
spark.sql(
1075+
f"""
1076+
ALTER TABLE {identifier}
1077+
REPLACE PARTITION FIELD dt_month WITH days(dt)
1078+
"""
1079+
)
1080+
1081+
spark.sql(
1082+
f"""
1083+
INSERT INTO {identifier} VALUES (CAST('2025-01-02' AS date), 2)
1084+
"""
1085+
)
1086+
1087+
spark.sql(
1088+
f"""
1089+
DELETE FROM {identifier} WHERE int_data = 1
1090+
"""
1091+
)
1092+
1093+
tbl = session_catalog.load_table(identifier)
1094+
files_df = tbl.inspect.files()
1095+
lhs = files_df.to_pandas()[["file_path", "partition"]].sort_values("file_path", ignore_index=True).reset_index()
1096+
rhs = (
1097+
spark.table(f"{identifier}.files")
1098+
.select(["file_path", "partition"])
1099+
.toPandas()
1100+
.sort_values("file_path", ignore_index=True)
1101+
.reset_index()
1102+
)
1103+
assert_frame_equal(lhs, rhs, check_dtype=False)

0 commit comments

Comments
 (0)