Skip to content

Commit 1371ba4

Browse files
authored
Merge pull request #14 from imc-trading/sbuck/implement-row-group-size-bytes-on-develop
Implement write.parquet.row-group-size-bytes in the pyarrow writer
2 parents 10b74c3 + e3d394d commit 1371ba4

3 files changed

Lines changed: 103 additions & 3 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2535,15 +2535,30 @@ def data_file_statistics_from_parquet_metadata(
25352535
)
25362536

25372537

2538+
def _resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int | None, row_group_size_bytes: int | None) -> int | None:
2539+
if not row_group_size_bytes or arrow_table.num_rows == 0:
2540+
return row_group_limit
2541+
bytes_per_row = max(1, arrow_table.nbytes // arrow_table.num_rows)
2542+
rows_for_byte_budget = max(1, row_group_size_bytes // bytes_per_row)
2543+
if row_group_limit is None:
2544+
return rows_for_byte_budget
2545+
return min(row_group_limit, rows_for_byte_budget)
2546+
2547+
25382548
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
25392549
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
25402550

25412551
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
2542-
row_group_size = property_as_int(
2552+
row_group_limit = property_as_int(
25432553
properties=table_metadata.properties,
25442554
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
25452555
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
25462556
)
2557+
row_group_size_bytes = property_as_int(
2558+
properties=table_metadata.properties,
2559+
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
2560+
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
2561+
)
25472562
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
25482563

25492564
def write_parquet(task: WriteTask) -> DataFile:
@@ -2567,6 +2582,7 @@ def write_parquet(task: WriteTask) -> DataFile:
25672582
for batch in task.record_batches
25682583
]
25692584
arrow_table = pa.Table.from_batches(batches)
2585+
row_group_size = _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes)
25702586
file_path = location_provider.new_data_location(
25712587
data_file_name=task.generate_data_file_filename("parquet"),
25722588
partition_key=task.partition_key,
@@ -2705,7 +2721,6 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
27052721
from pyiceberg.table import TableProperties
27062722

27072723
for key_pattern in [
2708-
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
27092724
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
27102725
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
27112726
]:

tests/integration/test_writes/test_writes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,6 @@ def test_write_parquet_other_properties(
691691
@pytest.mark.parametrize(
692692
"properties",
693693
[
694-
{"write.parquet.row-group-size-bytes": "42"},
695694
{"write.parquet.bloom-filter-enabled.column.bool": "42"},
696695
{"write.parquet.bloom-filter-max-bytes": "42"},
697696
],

tests/io/test_pyarrow.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import uuid
2222
import warnings
2323
from datetime import date, datetime, timezone
24+
from pathlib import Path
2425
from typing import Any, List, Optional
2526
from unittest.mock import MagicMock, patch
2627
from uuid import uuid4
@@ -71,6 +72,7 @@
7172
_determine_partitions,
7273
_primitive_to_physical,
7374
_read_deletes,
75+
_resolve_row_group_size,
7476
_task_to_record_batches,
7577
_to_requested_schema,
7678
bin_pack_arrow_table,
@@ -79,6 +81,7 @@
7981
expression_to_pyarrow,
8082
parquet_path_to_id_mapping,
8183
schema_to_pyarrow,
84+
write_file,
8285
)
8386
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
8487
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2825,3 +2828,86 @@ def test_parse_location_defaults() -> None:
28252828
assert scheme == "hdfs"
28262829
assert netloc == "netloc:8000"
28272830
assert path == "/foo/bar"
2831+
2832+
2833+
@pytest.mark.parametrize(
2834+
"arrow_table,row_group_limit,row_group_size_bytes,expected",
2835+
[
2836+
# Byte limit tighter than row limit — 2 int64 cols => 16 bytes/row,
2837+
# 1024-byte budget => 64 rows/group.
2838+
(pa.table({"a": list(range(1000)), "b": list(range(1000))}), 10_000, 1024, 64),
2839+
# Row limit tighter than byte limit.
2840+
(pa.table({"a": list(range(1000))}), 10, 10**9, 10),
2841+
# Byte limit disabled (0) falls back to the row limit.
2842+
(pa.table({"a": list(range(1000))}), 500, 0, 500),
2843+
# Empty input falls back to the row limit.
2844+
(pa.table({"a": pa.array([], type=pa.int64())}), 500, 1024, 500),
2845+
],
2846+
)
2847+
def test__resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int, expected: int) -> None:
2848+
"""Pick min(row_group_limit, bytes/(bytes_per_row)) when byte limit is set."""
2849+
assert _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) == expected
2850+
2851+
2852+
def test_write_file_byte_limit_produces_more_row_groups_than_row_limit_alone(tmp_path: Path) -> None:
2853+
"""A tight byte limit splits a single arrow table across multiple row groups."""
2854+
from pyiceberg.table import WriteTask
2855+
2856+
table_schema = Schema(
2857+
NestedField(1, "a", LongType(), required=False),
2858+
NestedField(2, "b", LongType(), required=False),
2859+
)
2860+
arrow_data = pa.table({"a": list(range(10_000)), "b": list(range(10_000))})
2861+
2862+
def _write(properties: dict[str, str], subdir: str) -> Path:
2863+
table_metadata = TableMetadataV2(
2864+
location=f"file://{tmp_path}/{subdir}",
2865+
last_column_id=2,
2866+
format_version=2,
2867+
schemas=[table_schema],
2868+
partition_specs=[PartitionSpec()],
2869+
properties=properties,
2870+
)
2871+
task = WriteTask(
2872+
write_uuid=uuid.uuid4(),
2873+
task_id=0,
2874+
record_batches=arrow_data.to_batches(),
2875+
schema=table_schema,
2876+
)
2877+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2878+
return Path(data_files[0].file_path.removeprefix("file://"))
2879+
2880+
default_groups = pq.ParquetFile(_write({}, "default")).num_row_groups
2881+
constrained_groups = pq.ParquetFile(
2882+
_write({TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: "1024"}, "constrained")
2883+
).num_row_groups
2884+
assert default_groups == 1
2885+
assert constrained_groups > 1
2886+
2887+
2888+
def test_write_file_byte_limit_respects_row_limit_upper_bound(tmp_path: Path) -> None:
2889+
"""With an effectively infinite byte target, the row limit caps row groups."""
2890+
from pyiceberg.table import WriteTask
2891+
2892+
table_schema = Schema(NestedField(1, "a", LongType(), required=False))
2893+
arrow_data = pa.table({"a": list(range(10_000))})
2894+
table_metadata = TableMetadataV2(
2895+
location=f"file://{tmp_path}",
2896+
last_column_id=1,
2897+
format_version=2,
2898+
schemas=[table_schema],
2899+
partition_specs=[PartitionSpec()],
2900+
properties={
2901+
TableProperties.PARQUET_ROW_GROUP_LIMIT: "1000",
2902+
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: str(10**12),
2903+
},
2904+
)
2905+
task = WriteTask(
2906+
write_uuid=uuid.uuid4(),
2907+
task_id=0,
2908+
record_batches=arrow_data.to_batches(),
2909+
schema=table_schema,
2910+
)
2911+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2912+
pf = pq.ParquetFile(data_files[0].file_path.removeprefix("file://"))
2913+
assert pf.num_row_groups == 10

0 commit comments

Comments
 (0)