Skip to content

Commit d54af04

Browse files
Merge remote-tracking branch 'upstream/main' into sm/replace-table
# Conflicts: # tests/catalog/test_catalog_behaviors.py
2 parents 42c0fe7 + d339391 commit d54af04

11 files changed

Lines changed: 692 additions & 47 deletions

File tree

mkdocs/docs/api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
380380
print(f"Buffer contains {len(buf)} rows")
381381
```
382382

383+
### Streaming writes from a `RecordBatchReader`
384+
385+
`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot.
386+
387+
```python
388+
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
389+
tbl.append(reader)
390+
```
391+
392+
Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.
393+
383394
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
384395

385396
```python

pyiceberg/catalog/rest/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ class IdentifierKind(Enum):
230230
VIEW = "view"
231231

232232

233+
class ScanPlanningMode(Enum):
234+
CLIENT = "client"
235+
SERVER = "server"
236+
237+
233238
ACCESS_DELEGATION_DEFAULT = "vended-credentials"
234239
AUTHORIZATION_HEADER = "Authorization"
235240
BEARER_PREFIX = "Bearer"
@@ -260,8 +265,8 @@ class IdentifierKind(Enum):
260265
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
261266
AUTH = "auth"
262267
CUSTOM = "custom"
263-
REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
264-
REST_SCAN_PLANNING_ENABLED_DEFAULT = False
268+
SCAN_PLANNING_MODE = "scan-planning-mode"
269+
SCAN_PLANNING_MODE_DEFAULT = ScanPlanningMode.CLIENT.value
265270
# for backwards compatibility with older REST servers where it can be assumed that a particular
266271
# server supports view endpoints but doesn't send the "endpoints" field in the ConfigResponse
267272
VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"
@@ -485,9 +490,8 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | Non
485490
@override
486491
def supports_server_side_planning(self) -> bool:
487492
"""Check if the catalog supports server-side scan planning."""
488-
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
489-
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
490-
)
493+
scan_planning_mode = ScanPlanningMode(self.properties.get(SCAN_PLANNING_MODE, SCAN_PLANNING_MODE_DEFAULT))
494+
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and scan_planning_mode == ScanPlanningMode.SERVER
491495

492496
@retry(**_RETRY_ARGS)
493497
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:

pyiceberg/io/pyarrow.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2675,6 +2675,18 @@ def write_parquet(task: WriteTask) -> DataFile:
26752675

26762676

26772677
def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2678+
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.
2679+
2680+
Note:
2681+
``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes
2682+
(``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet
2683+
bytes. The resulting Parquet file after compression (zstd by default,
2684+
plus dictionary/RLE encoding) is typically 3-10× smaller than
2685+
``target_file_size``. This is a coarse proxy for the spec-defined
2686+
``write.target-file-size-bytes`` and will be tightened to true on-disk
2687+
bytes once the writer is switched to a rolling-``ParquetWriter`` with
2688+
``OutputStream.tell()`` (#2998).
2689+
"""
26782690
from pyiceberg.utils.bin_packing import PackingIterator
26792691

26802692
avg_row_size_bytes = tbl.nbytes / tbl.num_rows
@@ -2690,6 +2702,41 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
26902702
return bin_packed_record_batches
26912703

26922704

2705+
def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2706+
"""Microbatch a single-pass stream of RecordBatches into target-sized groups.
2707+
2708+
Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and
2709+
holds at most one in-flight buffer in memory, bounded by ``target_file_size``.
2710+
Suitable for streaming inputs (``pa.RecordBatchReader``,
2711+
``Iterator[pa.RecordBatch]``) where the total size is unknown up front and
2712+
the caller cannot afford to materialise the full dataset.
2713+
2714+
Each yielded list of batches is intended to be written as a single Parquet
2715+
data file. Because this is single-pass FIFO accumulation (no lookback), the
2716+
last bin may be smaller than ``target_file_size``.
2717+
2718+
Note:
2719+
``target_file_size`` is measured in **uncompressed in-memory** Arrow
2720+
bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
2721+
The resulting Parquet file after compression is typically 3-10×
2722+
smaller than ``target_file_size``. Matches the existing
2723+
:func:`bin_pack_arrow_table` semantics; both will be tightened to true
2724+
on-disk bytes once the writer is switched to a rolling-
2725+
``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
2726+
"""
2727+
buffer: list[pa.RecordBatch] = []
2728+
buffer_bytes = 0
2729+
for batch in batches:
2730+
buffer.append(batch)
2731+
buffer_bytes += batch.nbytes
2732+
if buffer_bytes >= target_file_size:
2733+
yield buffer
2734+
buffer = []
2735+
buffer_bytes = 0
2736+
if buffer:
2737+
yield buffer
2738+
2739+
26932740
def _check_pyarrow_schema_compatible(
26942741
requested_schema: Schema,
26952742
provided_schema: pa.Schema,
@@ -2809,15 +2856,24 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
28092856

28102857
def _dataframe_to_data_files(
28112858
table_metadata: TableMetadata,
2812-
df: pa.Table,
2859+
df: pa.Table | pa.RecordBatchReader,
28132860
io: FileIO,
28142861
write_uuid: uuid.UUID | None = None,
28152862
counter: itertools.count[int] | None = None,
28162863
) -> Iterable[DataFile]:
2817-
"""Convert a PyArrow table into a DataFile.
2864+
"""Convert a PyArrow Table or RecordBatchReader into DataFiles.
2865+
2866+
For a ``pa.Table`` the data is materialised in memory and bin-packed into
2867+
target-sized files (with partition splitting if the table is partitioned).
2868+
2869+
For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
2870+
target-sized files using bounded memory (see :func:`bin_pack_record_batches`).
2871+
Streaming writes are currently only supported on unpartitioned tables;
2872+
partitioned support is tracked in
2873+
https://github.com/apache/iceberg-python/issues/2152.
28182874
28192875
Returns:
2820-
An iterable that supplies datafiles that represent the table.
2876+
An iterable that supplies datafiles that represent the input data.
28212877
"""
28222878
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask
28232879

@@ -2837,6 +2893,23 @@ def _dataframe_to_data_files(
28372893
format_version=table_metadata.format_version,
28382894
)
28392895

2896+
if isinstance(df, pa.RecordBatchReader):
2897+
if not table_metadata.spec().is_unpartitioned():
2898+
raise NotImplementedError(
2899+
"Writing a pa.RecordBatchReader to a partitioned table is not yet supported. "
2900+
"Materialise the reader as a pa.Table first, or follow "
2901+
"https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support."
2902+
)
2903+
yield from write_file(
2904+
io=io,
2905+
table_metadata=table_metadata,
2906+
tasks=(
2907+
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
2908+
for batches in bin_pack_record_batches(df, target_file_size)
2909+
),
2910+
)
2911+
return
2912+
28402913
if table_metadata.spec().is_unpartitioned():
28412914
yield from write_file(
28422915
io=io,

0 commit comments

Comments
 (0)