-
Notifications
You must be signed in to change notification settings - Fork 510
Add all_manifests metadata table with tests
#1241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
73e9bc8
044512e
952fa1c
ef991e6
3a920f9
0b7747e
6be1a3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,14 @@ | |
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime | ||
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple | ||
| from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple | ||
|
|
||
| from pyiceberg.conversions import from_bytes | ||
| from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary | ||
| from pyiceberg.partitioning import PartitionSpec | ||
| from pyiceberg.table.snapshots import Snapshot, ancestors_of | ||
| from pyiceberg.types import PrimitiveType | ||
| from pyiceberg.utils.concurrent import ExecutorFactory | ||
| from pyiceberg.utils.singleton import _convert_to_hashable_type | ||
|
|
||
| if TYPE_CHECKING: | ||
|
|
@@ -32,6 +33,41 @@ | |
| from pyiceberg.table import Table | ||
|
|
||
|
|
||
| def get_manifests_schema() -> "pa.Schema": | ||
| import pyarrow as pa | ||
|
|
||
| partition_summary_schema = pa.struct([ | ||
| pa.field("contains_null", pa.bool_(), nullable=False), | ||
| pa.field("contains_nan", pa.bool_(), nullable=True), | ||
| pa.field("lower_bound", pa.string(), nullable=True), | ||
| pa.field("upper_bound", pa.string(), nullable=True), | ||
| ]) | ||
|
|
||
| manifest_schema = pa.schema([ | ||
| pa.field("content", pa.int8(), nullable=False), | ||
| pa.field("path", pa.string(), nullable=False), | ||
| pa.field("length", pa.int64(), nullable=False), | ||
| pa.field("partition_spec_id", pa.int32(), nullable=False), | ||
| pa.field("added_snapshot_id", pa.int64(), nullable=False), | ||
| pa.field("added_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("existing_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("deleted_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("added_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("existing_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("deleted_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False), | ||
| ]) | ||
| return manifest_schema | ||
|
|
||
|
|
||
| def get_all_manifests_schema() -> "pa.Schema": | ||
| import pyarrow as pa | ||
|
|
||
| all_manifests_schema = get_manifests_schema() | ||
| all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interestingly, this isnt in the documentation https://iceberg.apache.org/docs/latest/spark-queries/#all-manifests
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's not present in iceberg docs. |
||
| return all_manifests_schema | ||
|
|
||
|
|
||
| class InspectTable: | ||
| tbl: Table | ||
|
|
||
|
|
@@ -326,31 +362,9 @@ def update_partitions_map( | |
| schema=table_schema, | ||
| ) | ||
|
|
||
| def manifests(self) -> "pa.Table": | ||
| def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": | ||
| import pyarrow as pa | ||
|
|
||
| partition_summary_schema = pa.struct([ | ||
| pa.field("contains_null", pa.bool_(), nullable=False), | ||
| pa.field("contains_nan", pa.bool_(), nullable=True), | ||
| pa.field("lower_bound", pa.string(), nullable=True), | ||
| pa.field("upper_bound", pa.string(), nullable=True), | ||
| ]) | ||
|
|
||
| manifest_schema = pa.schema([ | ||
| pa.field("content", pa.int8(), nullable=False), | ||
| pa.field("path", pa.string(), nullable=False), | ||
| pa.field("length", pa.int64(), nullable=False), | ||
| pa.field("partition_spec_id", pa.int32(), nullable=False), | ||
| pa.field("added_snapshot_id", pa.int64(), nullable=False), | ||
| pa.field("added_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("existing_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("deleted_data_files_count", pa.int32(), nullable=False), | ||
| pa.field("added_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("existing_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("deleted_delete_files_count", pa.int32(), nullable=False), | ||
| pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False), | ||
| ]) | ||
|
|
||
| def _partition_summaries_to_rows( | ||
| spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary] | ||
| ) -> List[Dict[str, Any]]: | ||
|
|
@@ -386,11 +400,11 @@ def _partition_summaries_to_rows( | |
|
|
||
| specs = self.tbl.metadata.specs() | ||
| manifests = [] | ||
| if snapshot := self.tbl.metadata.current_snapshot(): | ||
| if snapshot: | ||
| for manifest in snapshot.manifests(self.tbl.io): | ||
| is_data_file = manifest.content == ManifestContent.DATA | ||
| is_delete_file = manifest.content == ManifestContent.DELETES | ||
| manifests.append({ | ||
| manifest_row = { | ||
| "content": manifest.content, | ||
| "path": manifest.manifest_path, | ||
| "length": manifest.manifest_length, | ||
|
|
@@ -405,13 +419,19 @@ def _partition_summaries_to_rows( | |
| "partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions) | ||
| if manifest.partitions | ||
| else [], | ||
| }) | ||
| } | ||
| if is_all_manifests_table: | ||
| manifest_row["reference_snapshot_id"] = snapshot.snapshot_id | ||
|
soumya-ghosh marked this conversation as resolved.
|
||
| manifests.append(manifest_row) | ||
|
|
||
| return pa.Table.from_pylist( | ||
| manifests, | ||
| schema=manifest_schema, | ||
| schema=get_all_manifests_schema() if is_all_manifests_table else get_manifests_schema(), | ||
| ) | ||
|
|
||
| def manifests(self) -> "pa.Table": | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wdyt about adding an optional
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I am aligned with this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea i think thats fine since |
||
| return self._generate_manifests_table(self.tbl.current_snapshot()) | ||
|
|
||
| def metadata_log_entries(self) -> "pa.Table": | ||
| import pyarrow as pa | ||
|
|
||
|
|
@@ -586,3 +606,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": | |
|
|
||
| def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": | ||
| return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) | ||
|
|
||
| def all_manifests(self) -> "pa.Table": | ||
| import pyarrow as pa | ||
|
|
||
| snapshots = self.tbl.snapshots() | ||
| if not snapshots: | ||
| return pa.Table.from_pylist([], schema=get_all_manifests_schema()) | ||
|
|
||
| executor = ExecutorFactory.get_or_create() | ||
| manifests_by_snapshots: Iterator["pa.Table"] = executor.map( | ||
| lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] | ||
| ) | ||
| return pa.concat_tables(manifests_by_snapshots) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -846,3 +846,95 @@ def inspect_files_asserts(df: pa.Table) -> None: | |
| inspect_files_asserts(files_df) | ||
| inspect_files_asserts(data_files_df) | ||
| inspect_files_asserts(delete_files_df) | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| @pytest.mark.parametrize("format_version", [1, 2]) | ||
| def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: | ||
| identifier = "default.table_metadata_all_manifests" | ||
| try: | ||
| session_catalog.drop_table(identifier=identifier) | ||
| except NoSuchTableError: | ||
| pass | ||
|
|
||
| spark.sql( | ||
| f""" | ||
| CREATE TABLE {identifier} ( | ||
| id int, | ||
| data string | ||
| ) | ||
| PARTITIONED BY (data) | ||
| TBLPROPERTIES ('write.update.mode'='merge-on-read', | ||
| 'write.delete.mode'='merge-on-read') | ||
| """ | ||
| ) | ||
| tbl = session_catalog.load_table(identifier) | ||
|
|
||
| # check all_manifests when there are no snapshots | ||
| lhs = tbl.inspect.all_manifests().to_pandas() | ||
| rhs = spark.table(f"{identifier}.all_manifests").toPandas() | ||
| assert lhs.empty | ||
| assert rhs.empty | ||
|
|
||
| spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") | ||
|
|
||
| spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") | ||
|
|
||
| spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") | ||
|
|
||
| spark.sql(f"DELETE FROM {identifier} WHERE id = 2") | ||
|
|
||
| spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") | ||
|
|
||
| df = tbl.inspect.all_manifests() | ||
|
|
||
| assert df.column_names == [ | ||
| "content", | ||
| "path", | ||
| "length", | ||
| "partition_spec_id", | ||
| "added_snapshot_id", | ||
| "added_data_files_count", | ||
| "existing_data_files_count", | ||
| "deleted_data_files_count", | ||
| "added_delete_files_count", | ||
| "existing_delete_files_count", | ||
| "deleted_delete_files_count", | ||
| "partition_summaries", | ||
| "reference_snapshot_id", | ||
| ] | ||
|
|
||
| int_cols = [ | ||
| "content", | ||
| "length", | ||
| "partition_spec_id", | ||
| "added_snapshot_id", | ||
| "added_data_files_count", | ||
| "existing_data_files_count", | ||
| "deleted_data_files_count", | ||
| "added_delete_files_count", | ||
| "existing_delete_files_count", | ||
| "deleted_delete_files_count", | ||
| "reference_snapshot_id", | ||
| ] | ||
|
|
||
| for column in int_cols: | ||
| for value in df[column]: | ||
| assert isinstance(value.as_py(), int) | ||
|
|
||
| for value in df["path"]: | ||
| assert isinstance(value.as_py(), str) | ||
|
|
||
| for value in df["partition_summaries"]: | ||
| assert isinstance(value.as_py(), list) | ||
| for row in value: | ||
| assert isinstance(row["contains_null"].as_py(), bool) | ||
| assert isinstance(row["contains_nan"].as_py(), (bool, type(None))) | ||
| assert isinstance(row["lower_bound"].as_py(), (str, type(None))) | ||
| assert isinstance(row["upper_bound"].as_py(), (str, type(None))) | ||
|
|
||
| lhs = spark.table(f"{identifier}.all_manifests").toPandas() | ||
| rhs = df.to_pandas() | ||
| for column in df.column_names: | ||
| for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): | ||
| assert left == right, f"Difference in column {column}: {left} != {right}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: is it possible to use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, making the change. |
||
Uh oh!
There was an error while loading. Please reload this page.