Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand Down Expand Up @@ -3264,3 +3265,126 @@ def snapshots(self) -> "pa.Table":
snapshots,
schema=snapshots_schema,
)

def entries(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

schema = self.tbl.metadata.schema()

readable_metrics_struct = []

def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa_bound_type = schema_to_pyarrow(bound_type)
return pa.struct([
pa.field("column_size", pa.int64(), nullable=True),
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])

for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
)

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

entries_schema = pa.schema([
pa.field('status', pa.int8(), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('sequence_number', pa.int64(), nullable=False),
pa.field('file_sequence_number', pa.int64(), nullable=False),
pa.field(
'data_file',
pa.struct([
pa.field('content', pa.int8(), nullable=False),
pa.field('file_path', pa.string(), nullable=False),
pa.field('file_format', pa.string(), nullable=False),
pa.field('partition', pa_record_struct, nullable=False),
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('key_metadata', pa.binary(), nullable=True),
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
pa.field('sort_order_id', pa.int32(), nullable=True),
]),
nullable=False,
),
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
])

entries = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}

partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}

entries.append({
'status': entry.status.value,
'snapshot_id': entry.snapshot_id,
'sequence_number': entry.data_sequence_number,
'file_sequence_number': entry.file_sequence_number,
'data_file': {
"content": entry.data_file.content,
"file_path": entry.data_file.file_path,
"file_format": entry.data_file.file_format,
"partition": partition_record_dict,
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes),
"value_counts": dict(entry.data_file.value_counts),
"null_value_counts": dict(entry.data_file.null_value_counts),
"nan_value_counts": entry.data_file.nan_value_counts,
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
"split_offsets": entry.data_file.split_offsets,
"equality_ids": entry.data_file.equality_ids,
"sort_order_id": entry.data_file.sort_order_id,
"spec_id": entry.data_file.spec_id,
},
'readable_metrics': readable_metrics,
})

return pa.Table.from_pylist(
entries,
schema=entries_schema,
)
27 changes: 26 additions & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
IcebergRootModel,
Properties,
)
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import datetime_to_millis

Expand Down Expand Up @@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}

def specs_struct(self) -> StructType:
"""Produce a struct of all the combined PartitionSpecs.

The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.

:return: A StructType that represents all the combined PartitionSpecs of the table
"""
specs = self.specs()

# Collect all the fields
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}

schema = self.schema()

nested_fields = []
# Sort them by field_id in order to get a deterministic output
for field_id in sorted(struct_fields):
field = struct_fields[field_id]
source_type = schema.find_type(field.source_id)
result_type = field.transform.result_type(source_type)
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))

return StructType(*nested_fields)

def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/utils/lazydict.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ def __len__(self) -> int:
"""Return the number of items in the dictionary."""
source = self._dict or self._build_dict()
return len(source)

def __dict__(self) -> Dict[K, V]: # type: ignore
"""Convert the lazy dict in a dict."""
return self._dict or self._build_dict()
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa

"""PyArrow table with all kinds of columns"""
"""Pyarrow table with all kinds of columns."""
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
Loading