Skip to content

Commit 571647d

Browse files
author
Stephen Buck
committed
Implement write.parquet.row-group-size-bytes in the pyarrow writer
The pyiceberg writer has historically ignored write.parquet.row-group-size-bytes (logging 'not implemented') and used only write.parquet.row-group-limit (rows). For wide tables that means a single row group ends up at gigabytes — e.g. 337 cols × 1,048,576 default rows ≈ 1.7 GiB uncompressed per row group — which drives the polars / pyarrow reader's decode peak into the tens of GiB on production reads. Now write_file resolves row_group_size as min(row_group_limit, row_group_size_bytes / bytes_per_row), where bytes_per_row is approximated from the in-memory arrow_table's nbytes. This matches Spark / parquet-mr 'whichever limit fires first' semantics and lets the existing PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT (128 MiB) actually take effect.
1 parent c84017d commit 571647d

2 files changed

Lines changed: 103 additions & 2 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,15 +2404,30 @@ def data_file_statistics_from_parquet_metadata(
24042404
)
24052405

24062406

2407+
def _resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int | None, row_group_size_bytes: int | None) -> int | None:
2408+
if not row_group_size_bytes or arrow_table.num_rows == 0:
2409+
return row_group_limit
2410+
bytes_per_row = max(1, arrow_table.nbytes // arrow_table.num_rows)
2411+
rows_for_byte_budget = max(1, row_group_size_bytes // bytes_per_row)
2412+
if row_group_limit is None:
2413+
return rows_for_byte_budget
2414+
return min(row_group_limit, rows_for_byte_budget)
2415+
2416+
24072417
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
24082418
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
24092419

24102420
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
2411-
row_group_size = property_as_int(
2421+
row_group_limit = property_as_int(
24122422
properties=table_metadata.properties,
24132423
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
24142424
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
24152425
)
2426+
row_group_size_bytes = property_as_int(
2427+
properties=table_metadata.properties,
2428+
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
2429+
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
2430+
)
24162431
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
24172432

24182433
def write_parquet(task: WriteTask) -> DataFile:
@@ -2436,6 +2451,7 @@ def write_parquet(task: WriteTask) -> DataFile:
24362451
for batch in task.record_batches
24372452
]
24382453
arrow_table = pa.Table.from_batches(batches)
2454+
row_group_size = _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes)
24392455
file_path = location_provider.new_data_location(
24402456
data_file_name=task.generate_data_file_filename("parquet"),
24412457
partition_key=task.partition_key,
@@ -2564,7 +2580,6 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
25642580
from pyiceberg.table import TableProperties
25652581

25662582
for key_pattern in [
2567-
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
25682583
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
25692584
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
25702585
]:

tests/io/test_pyarrow.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import tempfile
2121
import uuid
2222
from datetime import date
23+
from pathlib import Path
2324
from typing import Any, List, Optional
2425
from unittest.mock import MagicMock, patch
2526
from uuid import uuid4
@@ -67,13 +68,15 @@
6768
_determine_partitions,
6869
_primitive_to_physical,
6970
_read_deletes,
71+
_resolve_row_group_size,
7072
_to_requested_schema,
7173
bin_pack_arrow_table,
7274
compute_statistics_plan,
7375
data_file_statistics_from_parquet_metadata,
7476
expression_to_pyarrow,
7577
parquet_path_to_id_mapping,
7678
schema_to_pyarrow,
79+
write_file,
7780
)
7881
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
7982
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2319,3 +2322,86 @@ def test_pyarrow_io_multi_fs() -> None:
23192322

23202323
# Same PyArrowFileIO instance resolves local file input to LocalFileSystem
23212324
assert isinstance(pyarrow_file_io.new_input("file:///path/to/file")._filesystem, LocalFileSystem)
2325+
2326+
2327+
@pytest.mark.parametrize(
2328+
"arrow_table,row_group_limit,row_group_size_bytes,expected",
2329+
[
2330+
# Byte limit tighter than row limit — 2 int64 cols => 16 bytes/row,
2331+
# 1024-byte budget => 64 rows/group.
2332+
(pa.table({"a": list(range(1000)), "b": list(range(1000))}), 10_000, 1024, 64),
2333+
# Row limit tighter than byte limit.
2334+
(pa.table({"a": list(range(1000))}), 10, 10**9, 10),
2335+
# Byte limit disabled (0) falls back to the row limit.
2336+
(pa.table({"a": list(range(1000))}), 500, 0, 500),
2337+
# Empty input falls back to the row limit.
2338+
(pa.table({"a": pa.array([], type=pa.int64())}), 500, 1024, 500),
2339+
],
2340+
)
2341+
def test__resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int, expected: int) -> None:
2342+
"""Pick min(row_group_limit, bytes/(bytes_per_row)) when byte limit is set."""
2343+
assert _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) == expected
2344+
2345+
2346+
def test_write_file_byte_limit_produces_more_row_groups_than_row_limit_alone(tmp_path: Path) -> None:
2347+
"""A tight byte limit splits a single arrow table across multiple row groups."""
2348+
from pyiceberg.table import WriteTask
2349+
2350+
table_schema = Schema(
2351+
NestedField(1, "a", LongType(), required=False),
2352+
NestedField(2, "b", LongType(), required=False),
2353+
)
2354+
arrow_data = pa.table({"a": list(range(10_000)), "b": list(range(10_000))})
2355+
2356+
def _write(properties: dict[str, str], subdir: str) -> Path:
2357+
table_metadata = TableMetadataV2(
2358+
location=f"file://{tmp_path}/{subdir}",
2359+
last_column_id=2,
2360+
format_version=2,
2361+
schemas=[table_schema],
2362+
partition_specs=[PartitionSpec()],
2363+
properties=properties,
2364+
)
2365+
task = WriteTask(
2366+
write_uuid=uuid.uuid4(),
2367+
task_id=0,
2368+
record_batches=arrow_data.to_batches(),
2369+
schema=table_schema,
2370+
)
2371+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2372+
return Path(data_files[0].file_path.removeprefix("file://"))
2373+
2374+
default_groups = pq.ParquetFile(_write({}, "default")).num_row_groups
2375+
constrained_groups = pq.ParquetFile(
2376+
_write({TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: "1024"}, "constrained")
2377+
).num_row_groups
2378+
assert default_groups == 1
2379+
assert constrained_groups > 1
2380+
2381+
2382+
def test_write_file_byte_limit_respects_row_limit_upper_bound(tmp_path: Path) -> None:
2383+
"""With an effectively infinite byte target, the row limit caps row groups."""
2384+
from pyiceberg.table import WriteTask
2385+
2386+
table_schema = Schema(NestedField(1, "a", LongType(), required=False))
2387+
arrow_data = pa.table({"a": list(range(10_000))})
2388+
table_metadata = TableMetadataV2(
2389+
location=f"file://{tmp_path}",
2390+
last_column_id=1,
2391+
format_version=2,
2392+
schemas=[table_schema],
2393+
partition_specs=[PartitionSpec()],
2394+
properties={
2395+
TableProperties.PARQUET_ROW_GROUP_LIMIT: "1000",
2396+
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: str(10**12),
2397+
},
2398+
)
2399+
task = WriteTask(
2400+
write_uuid=uuid.uuid4(),
2401+
task_id=0,
2402+
record_batches=arrow_data.to_batches(),
2403+
schema=table_schema,
2404+
)
2405+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2406+
pf = pq.ParquetFile(data_files[0].file_path.removeprefix("file://"))
2407+
assert pf.num_row_groups == 10

0 commit comments

Comments
 (0)