Skip to content

Commit 4d7194c

Browse files
committed
Merge remote-tracking branch 'apache/main' into file-format-parquet-impl
2 parents a2d9ea7 + 3bd5f27 commit 4d7194c

12 files changed

Lines changed: 698 additions & 53 deletions

File tree

mkdocs/docs/api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
365365
print(f"Buffer contains {len(buf)} rows")
366366
```
367367

368+
### Streaming writes from a `RecordBatchReader`
369+
370+
`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot.
371+
372+
```python
373+
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
374+
tbl.append(reader)
375+
```
376+
377+
Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.
378+
368379
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
369380

370381
```python

pyiceberg/catalog/rest/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ class IdentifierKind(Enum):
225225
VIEW = "view"
226226

227227

228+
class ScanPlanningMode(Enum):
229+
CLIENT = "client"
230+
SERVER = "server"
231+
232+
228233
ACCESS_DELEGATION_DEFAULT = "vended-credentials"
229234
AUTHORIZATION_HEADER = "Authorization"
230235
BEARER_PREFIX = "Bearer"
@@ -255,8 +260,8 @@ class IdentifierKind(Enum):
255260
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
256261
AUTH = "auth"
257262
CUSTOM = "custom"
258-
REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
259-
REST_SCAN_PLANNING_ENABLED_DEFAULT = False
263+
SCAN_PLANNING_MODE = "scan-planning-mode"
264+
SCAN_PLANNING_MODE_DEFAULT = ScanPlanningMode.CLIENT.value
260265
# for backwards compatibility with older REST servers where it can be assumed that a particular
261266
# server supports view endpoints but doesn't send the "endpoints" field in the ConfigResponse
262267
VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"
@@ -480,9 +485,8 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | Non
480485
@override
481486
def supports_server_side_planning(self) -> bool:
482487
"""Check if the catalog supports server-side scan planning."""
483-
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
484-
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
485-
)
488+
scan_planning_mode = ScanPlanningMode(self.properties.get(SCAN_PLANNING_MODE, SCAN_PLANNING_MODE_DEFAULT))
489+
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and scan_planning_mode == ScanPlanningMode.SERVER
486490

487491
@retry(**_RETRY_ARGS)
488492
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:

pyiceberg/io/pyarrow.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2746,6 +2746,18 @@ def write_data_file(task: WriteTask) -> DataFile:
27462746

27472747

27482748
def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2749+
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.
2750+
2751+
Note:
2752+
``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes
2753+
(``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet
2754+
bytes. The resulting Parquet file after compression (zstd by default,
2755+
plus dictionary/RLE encoding) is typically 3-10× smaller than
2756+
``target_file_size``. This is a coarse proxy for the spec-defined
2757+
``write.target-file-size-bytes`` and will be tightened to true on-disk
2758+
bytes once the writer is switched to a rolling-``ParquetWriter`` with
2759+
``OutputStream.tell()`` (#2998).
2760+
"""
27492761
from pyiceberg.utils.bin_packing import PackingIterator
27502762

27512763
avg_row_size_bytes = tbl.nbytes / tbl.num_rows
@@ -2761,6 +2773,41 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
27612773
return bin_packed_record_batches
27622774

27632775

2776+
def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2777+
"""Microbatch a single-pass stream of RecordBatches into target-sized groups.
2778+
2779+
Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and
2780+
holds at most one in-flight buffer in memory, bounded by ``target_file_size``.
2781+
Suitable for streaming inputs (``pa.RecordBatchReader``,
2782+
``Iterator[pa.RecordBatch]``) where the total size is unknown up front and
2783+
the caller cannot afford to materialise the full dataset.
2784+
2785+
Each yielded list of batches is intended to be written as a single Parquet
2786+
data file. Because this is single-pass FIFO accumulation (no lookback), the
2787+
last bin may be smaller than ``target_file_size``.
2788+
2789+
Note:
2790+
``target_file_size`` is measured in **uncompressed in-memory** Arrow
2791+
bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
2792+
The resulting Parquet file after compression is typically 3-10×
2793+
smaller than ``target_file_size``. Matches the existing
2794+
:func:`bin_pack_arrow_table` semantics; both will be tightened to true
2795+
on-disk bytes once the writer is switched to a rolling-
2796+
``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
2797+
"""
2798+
buffer: list[pa.RecordBatch] = []
2799+
buffer_bytes = 0
2800+
for batch in batches:
2801+
buffer.append(batch)
2802+
buffer_bytes += batch.nbytes
2803+
if buffer_bytes >= target_file_size:
2804+
yield buffer
2805+
buffer = []
2806+
buffer_bytes = 0
2807+
if buffer:
2808+
yield buffer
2809+
2810+
27642811
def _check_pyarrow_schema_compatible(
27652812
requested_schema: Schema,
27662813
provided_schema: pa.Schema,
@@ -2880,15 +2927,24 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
28802927

28812928
def _dataframe_to_data_files(
28822929
table_metadata: TableMetadata,
2883-
df: pa.Table,
2930+
df: pa.Table | pa.RecordBatchReader,
28842931
io: FileIO,
28852932
write_uuid: uuid.UUID | None = None,
28862933
counter: itertools.count[int] | None = None,
28872934
) -> Iterable[DataFile]:
2888-
"""Convert a PyArrow table into a DataFile.
2935+
"""Convert a PyArrow Table or RecordBatchReader into DataFiles.
2936+
2937+
For a ``pa.Table`` the data is materialised in memory and bin-packed into
2938+
target-sized files (with partition splitting if the table is partitioned).
2939+
2940+
For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
2941+
target-sized files using bounded memory (see :func:`bin_pack_record_batches`).
2942+
Streaming writes are currently only supported on unpartitioned tables;
2943+
partitioned support is tracked in
2944+
https://github.com/apache/iceberg-python/issues/2152.
28892945
28902946
Returns:
2891-
An iterable that supplies datafiles that represent the table.
2947+
An iterable that supplies datafiles that represent the input data.
28922948
"""
28932949
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask
28942950

@@ -2908,6 +2964,23 @@ def _dataframe_to_data_files(
29082964
format_version=table_metadata.format_version,
29092965
)
29102966

2967+
if isinstance(df, pa.RecordBatchReader):
2968+
if not table_metadata.spec().is_unpartitioned():
2969+
raise NotImplementedError(
2970+
"Writing a pa.RecordBatchReader to a partitioned table is not yet supported. "
2971+
"Materialise the reader as a pa.Table first, or follow "
2972+
"https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support."
2973+
)
2974+
yield from write_file(
2975+
io=io,
2976+
table_metadata=table_metadata,
2977+
tasks=(
2978+
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
2979+
for batches in bin_pack_record_batches(df, target_file_size)
2980+
),
2981+
)
2982+
return
2983+
29112984
if table_metadata.spec().is_unpartitioned():
29122985
yield from write_file(
29132986
io=io,

0 commit comments

Comments
 (0)