Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ tbl.add_files(file_paths=file_paths)

<!-- prettier-ignore-start -->

!!! note "Partitions"
`add_files` only requires the client to read the existing parquet files' metadata footer in order to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like MonthTransform, and TruncateTransform which preserve the order of the values after the transformation (Any Transform that has `preserves_order` property set to True is supported).
Comment thread
sungwy marked this conversation as resolved.
Outdated

<!-- prettier-ignore-end -->

<!-- prettier-ignore-start -->
Comment thread
sungwy marked this conversation as resolved.
Outdated

!!! warning "Maintenance Operations"
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.

Expand Down
140 changes: 94 additions & 46 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
DataFileContent,
FileFormat,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec, partition_record_value
from pyiceberg.schema import (
PartnerAccessor,
PreOrderSchemaVisitor,
Expand All @@ -124,7 +125,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result


def fill_parquet_file_metadata(
Comment thread
sungwy marked this conversation as resolved.
data_file: DataFile,
@dataclass
Comment thread
sungwy marked this conversation as resolved.
Outdated
class DataFileStatistics:
record_count: int
column_sizes: Dict[int, int]
value_counts: Dict[int, int]
null_value_counts: Dict[int, int]
nan_value_counts: Dict[int, int]
column_aggregates: Dict[int, StatsAggregator]
split_offsets: Optional[List[int]] = None
Comment thread
sungwy marked this conversation as resolved.
Outdated

def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any:
if partition_field.source_id not in self.column_aggregates:
return None

if not partition_field.transform.preserves_order:
raise ValueError(
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field. {partition_field}"
)

lower_value = partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_min,
schema=schema,
)
upper_value = partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_max,
schema=schema,
)
if lower_value != upper_value:
raise ValueError(
f"Cannot infer partition value from parquet metadata as there are more than one partition values: {lower_value=}, {upper_value=}"
Comment thread
sungwy marked this conversation as resolved.
Outdated
)
return lower_value

def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})

def to_serialized_dict(self) -> Dict[str, Any]:
lower_bounds = {}
upper_bounds = {}

for k, agg in self.column_aggregates.items():
_min = agg.min_as_bytes()
if _min is not None:
lower_bounds[k] = _min
_max = agg.max_as_bytes()
if _max is not None:
upper_bounds[k] = _max
return {
"record_count": self.record_count,
"column_sizes": self.column_sizes,
"value_counts": self.value_counts,
"null_value_counts": self.null_value_counts,
"nan_value_counts": self.nan_value_counts,
"lower_bounds": lower_bounds,
"upper_bounds": upper_bounds,
"split_offsets": self.split_offsets,
}


def data_file_statistics_from_parquet_metadata(
parquet_metadata: pq.FileMetaData,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> None:
) -> DataFileStatistics:
"""
Compute and fill the following fields of the DataFile object.
Compute and return DataFileStatistics that includes the following.

- file_format
- record_count
- column_sizes
- value_counts
- null_value_counts
- nan_value_counts
- lower_bounds
- upper_bounds
- column_aggregates
- split_offsets

Args:
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
"""
if parquet_metadata.num_columns != len(stats_columns):
raise ValueError(
Expand Down Expand Up @@ -1695,30 +1755,19 @@ def fill_parquet_file_metadata(

split_offsets.sort()

lower_bounds = {}
upper_bounds = {}

for k, agg in col_aggs.items():
_min = agg.min_as_bytes()
if _min is not None:
lower_bounds[k] = _min
_max = agg.max_as_bytes()
if _max is not None:
upper_bounds[k] = _max

for field_id in invalidate_col:
del lower_bounds[field_id]
del upper_bounds[field_id]
del col_aggs[field_id]
del null_value_counts[field_id]

data_file.record_count = parquet_metadata.num_rows
data_file.column_sizes = column_sizes
data_file.value_counts = value_counts
data_file.null_value_counts = null_value_counts
data_file.nan_value_counts = nan_value_counts
data_file.lower_bounds = lower_bounds
data_file.upper_bounds = upper_bounds
data_file.split_offsets = split_offsets
return DataFileStatistics(
record_count=parquet_metadata.num_rows,
column_sizes=column_sizes,
value_counts=value_counts,
null_value_counts=null_value_counts,
nan_value_counts=nan_value_counts,
column_aggregates=col_aggs,
split_offsets=split_offsets,
)


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
Expand Down Expand Up @@ -1762,46 +1811,45 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
equality_ids=None,
key_metadata=None,
)

fill_parquet_file_metadata(
data_file=data_file,
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file.update(statistics.to_serialized_dict())
return iter([data_file])


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
for task in tasks:
input_file = io.new_input(task.file_path)
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
for file_path in file_paths:
input_file = io.new_input(file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
raise NotImplementedError(
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=task.file_path,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_field_value,
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
record_count=parquet_metadata.num_rows,
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

data_file.update(statistics.to_serialized_dict())
yield data_file


Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ def __eq__(self, other: Any) -> bool:
"""
return self.file_path == other.file_path if isinstance(other, DataFile) else False

def update(self, other: Dict[str, Any]) -> None:
Comment thread
sungwy marked this conversation as resolved.
Outdated
for k, v in other.items():
self.__setattr__(k, v)


MANIFEST_ENTRY_SCHEMAS = {
1: Schema(
Expand Down
25 changes: 21 additions & 4 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key transformed with iceberg interna
if len(partition_fields) != 1:
raise ValueError("partition_fields must contain exactly one field.")
partition_field = partition_fields[0]
iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
iceberg_typed_key_values[partition_field.name] = transformed_value
iceberg_typed_key_values[partition_field.name] = partition_record_value(
partition_field=partition_field,
value=raw_partition_field_value.value,
schema=self.schema,
)
return Record(**iceberg_typed_key_values)

def to_path(self) -> str:
return self.partition_spec.partition_to_path(self.partition, self.schema)


def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any:
"""
Return the Partition Record representation of the value.

The value is first converted to internal partition representation.
For example, UUID is converted to str, DateType to epoch-days, etc.
Comment thread
sungwy marked this conversation as resolved.
Outdated

Then the corresponding PartitionField's transform is applied to return
the final partition record value.
"""
iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
return transformed_value


@singledispatch
def _to_partition_representation(type: IcebergType, value: Any) -> Any:
return TypeError(f"Unsupported partition field type: {type}")
Expand Down
18 changes: 1 addition & 17 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Expand Down Expand Up @@ -1170,9 +1169,6 @@ def add_files(self, file_paths: List[str]) -> None:
Raises:
FileNotFoundError: If the file does not exist.
"""
if len(self.spec().fields) > 0:
raise ValueError("Cannot add files to partitioned tables")

with self.transaction() as tx:
if self.name_mapping() is None:
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
Expand Down Expand Up @@ -2515,17 +2511,6 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))


def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
raise ValueError("Cannot add files to partitioned tables")

for file_path in file_paths:
yield AddFileTask(
file_path=file_path,
partition_field_value=Record(),
)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.

Expand All @@ -2534,8 +2519,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files

tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
Expand Down
Loading