Skip to content

Commit 706390f

Browse files
paultmathewPaul Mathew
andauthored
Support pa.RecordBatchReader in Table.{append,overwrite} (#3335)
# Rationale for this change Closes #2152, addresses the long-standing memory problem reported in #1004 and re-discovered by dlt-hub#3753. `Table.append(df)` and `Table.overwrite(df)` currently require a fully materialised `pa.Table`. For large or unbounded inputs this means loading the entire dataset into memory before writing — fatal at any non-trivial scale and a recurring complaint going back to #1004 (Aug 2024). The reference Java implementation has streaming append; iceberg-go shipped it in iceberg-go#369 (Apr 2025). Python is the last major SDK without it. This PR adds `pa.RecordBatchReader` as a valid input to `Table.append/overwrite` (and `Transaction.append/overwrite`). The reader is consumed lazily, microbatched into Parquet files via the new `bin_pack_record_batches` helper, and committed in a single snapshot via the existing `fast_append` pipeline. ```python reader = pa.RecordBatchReader.from_batches(schema, batch_iter) tbl.append(reader) # ← streams, doesn't materialise tbl.overwrite(reader) # ← also supported ``` ## Scope (unpartitioned only) Streaming into a partitioned table raises `NotImplementedError` pointing back to #2152. Partitioned support is genuinely the harder case — it needs design discussion around partition cardinality bounds, per-partition rolling writers, and idempotency on retry — so I'm proposing to land in three reviewable PRs: 1. **This PR** — API + unpartitioned + buffered byte-budget bin-packing. 2. **PR2 (next)** — switch internals to a rolling `pq.ParquetWriter` + `OutputStream.tell()` for constant-memory streaming. No API change. Detailed plan below. 3. **PR3 (later)** — partitioned streaming, after design discussion on #2152. This mirrors iceberg-go#369's staging: ship the unpartitioned API first, iterate from there. ## Implementation The streaming path reuses the existing `WriteTask` → `write_file` → `fast_append` pipeline. The only new primitive is `bin_pack_record_batches` (sibling of the existing `bin_pack_arrow_table`): - Accumulates incoming `RecordBatch`es into an in-memory buffer. - Flushes when `sum(batch.nbytes) >= write.target-file-size-bytes`. - Each flushed buffer becomes one parquet file via the existing `write_parquet` task. - Schema check (`_check_pyarrow_schema_compatible`) runs against `reader.schema` before the snapshot producer opens — schema mismatches fail before any data file is written, so no orphans. ## Acknowledged trade-offs **Memory**: peak memory is bounded by `N_workers × write.target-file-size-bytes` (default 8 × 512 MiB ≈ 4 GiB), not constant. This is materially better than today's "materialise everything" but isn't yet "constant memory streaming". PR2 fixes this. **Byte semantics**: `write.target-file-size-bytes` is currently interpreted as **uncompressed in-memory Arrow bytes** (`RecordBatch.nbytes` — the bin-packing weight), not compressed on-disk Parquet bytes. The resulting files are typically 3-10× smaller than the property suggests after zstd / dictionary / RLE encoding. This matches the existing `pa.Table` write path (`bin_pack_arrow_table` uses the same accounting) — this PR doesn't change pyiceberg's existing semantics, it only documents them in the docstrings of both helpers and the `Transaction.append/overwrite` `Note:` blocks. PR2 fixes this too. **Retry**: `pa.RecordBatchReader` is single-pass, so a failed catalog commit leaves the reader drained and a naive retry writes zero rows. Documented in the `Note:` block — callers needing at-least-once semantics should reconstruct the reader on each attempt via a factory callable, or use the two-stage `add_files` pattern (whose input is a replayable list of paths). ## PR2 — proposed scope (FYI, not in this PR) Drop the buffer-and-flush approach and use a rolling `pq.ParquetWriter` driven by `OutputStream.tell()` (added in #2998 specifically for this kind of use case): ```python # sketch writer = pq.ParquetWriter(fos, schema, **kwargs) for batch in reader: writer.write_batch(batch) if fos.tell() >= target_file_size: # compressed on-disk bytes writer.close() finalize_data_file(...) # open next file fos = io.new_output(next_path).create(overwrite=True) writer = pq.ParquetWriter(fos, schema, **kwargs) writer.close() ``` What this delivers: - **Constant memory**: `O(1 batch)` per worker (~10s of MB) regardless of `target_file_size`. The 4 GiB peak in this PR drops to ~50-100 MB. - **Spec-correct byte semantics**: `write.target-file-size-bytes` becomes actual on-disk compressed bytes, matching the Java/Spark/Flink writers and the spec. - **No public API change**: same `tx.append(reader)` / `tx.overwrite(reader)` — internals only. Open design questions for PR2 (will surface on the issue thread before coding): - **Parallelism**: a single rolling writer is serial. Either accept that for streaming (memory-vs-throughput trade), or add a hybrid (N rolling writers fed via a queue) and pick a default that matches today's `executor.map(write_parquet, tasks)` parallelism. - **Backwards compat**: switching `bin_pack_arrow_table` to the same rolling-writer mechanism would also tighten the `pa.Table` path's byte semantics. That changes file-size characteristics for every existing pyiceberg writer. Probably worth a separate change with a deprecation note, or a feature flag. - **`add_files` interaction**: rolling writes produce data files we know about directly; we shouldn't go through the parquet-footer round-trip in `_dataframe_to_data_files`. Means a small refactor in the streaming-only path. ## Are these changes tested? Yes, comprehensively at four layers. **1. Unit tests** (`tests/io/test_pyarrow.py`) — 4 new tests for `bin_pack_record_batches` covering single-bin, microbatched, empty input, and lazy generator consumption. **2. End-to-end behaviour tests** (`tests/catalog/test_catalog_behaviors.py`) — 8 new tests parametrised across all three in-process catalog backends (`memory`, `sql`, `sql_without_rowcount`) → 24 test runs covering append, overwrite, microbatch verification (multiple files in one snapshot), empty reader, partitioned-table-raises, invalid-input-rejected, reader-consumed-exactly-once, and schema-mismatch-writes-no-files. **3. Integration tests** (`tests/integration/test_writes/test_writes.py`) — 6 new Spark-readback tests for v1 + v2 format versions covering append, overwrite, and multi-file microbatch. Proves Spark can read tables written via the streaming path against the docker-compose stack. **4. Smoke test on a real production stack** — verified end-to-end against AWS Glue + S3 in our staging account: 100 k-row streaming append in 17 s, 20-file microbatched commit, Athena read-back (`COUNT(*)` and `MAX(id)` matched the input exactly), schema-mismatch rejection leaving no orphan files. Full unit suite: 3 647 passed. Full integration suite: 122 passed, 1 skipped. ## Are there any user-facing changes? Yes, intentionally: - `Transaction.append(df)`, `Transaction.overwrite(df)`, `Table.append(df)`, `Table.overwrite(df)` accept `pa.Table | pa.RecordBatchReader`. - The `ValueError` raised on bad input changes from `"Expected PyArrow table, got: ..."` to `"Expected pa.Table or pa.RecordBatchReader, got: ..."`. Updated `test_invalid_arguments` accordingly. - New module-level helper `bin_pack_record_batches` in `pyiceberg.io.pyarrow` (sibling of `bin_pack_arrow_table`). - `bin_pack_arrow_table` gained its first docstring, documenting the existing uncompressed-Arrow-bytes accounting. - Docs: new "Streaming writes from a RecordBatchReader" subsection in `mkdocs/docs/api.md`. - Docstrings on `Transaction.append/overwrite` document retry semantics and the byte-semantics caveat. ## Related - Closes #2152 - Addresses #1004 (closed by reporter without a fix) - Reference implementation: iceberg-go#369 - Downstream consumer hitting the same problem: dlt-hub/dlt#3753 (independent rediscovery of the same approach) - Builds on the maintainer-blessed pattern from #1742's review (`_dataframe_to_data_files` + `fast_append.append_data_file()`, no separate `write_parquet` API) - Companion fix (already merged separately): test-state isolation in `test_write_optional_list` - PR2 will build on `OutputStream.tell()` from #2998 --------- Co-authored-by: Paul Mathew <paul.mathew@aircall.io>
1 parent 43d1f1f commit 706390f

7 files changed

Lines changed: 498 additions & 30 deletions

File tree

mkdocs/docs/api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
365365
print(f"Buffer contains {len(buf)} rows")
366366
```
367367

368+
### Streaming writes from a `RecordBatchReader`
369+
370+
`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot.
371+
372+
```python
373+
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
374+
tbl.append(reader)
375+
```
376+
377+
Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.
378+
368379
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
369380

370381
```python

pyiceberg/io/pyarrow.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2675,6 +2675,18 @@ def write_parquet(task: WriteTask) -> DataFile:
26752675

26762676

26772677
def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2678+
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.
2679+
2680+
Note:
2681+
``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes
2682+
(``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet
2683+
bytes. The resulting Parquet file after compression (zstd by default,
2684+
plus dictionary/RLE encoding) is typically 3-10× smaller than
2685+
``target_file_size``. This is a coarse proxy for the spec-defined
2686+
``write.target-file-size-bytes`` and will be tightened to true on-disk
2687+
bytes once the writer is switched to a rolling-``ParquetWriter`` with
2688+
``OutputStream.tell()`` (#2998).
2689+
"""
26782690
from pyiceberg.utils.bin_packing import PackingIterator
26792691

26802692
avg_row_size_bytes = tbl.nbytes / tbl.num_rows
@@ -2690,6 +2702,41 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
26902702
return bin_packed_record_batches
26912703

26922704

2705+
def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
2706+
"""Microbatch a single-pass stream of RecordBatches into target-sized groups.
2707+
2708+
Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and
2709+
holds at most one in-flight buffer in memory, bounded by ``target_file_size``.
2710+
Suitable for streaming inputs (``pa.RecordBatchReader``,
2711+
``Iterator[pa.RecordBatch]``) where the total size is unknown up front and
2712+
the caller cannot afford to materialise the full dataset.
2713+
2714+
Each yielded list of batches is intended to be written as a single Parquet
2715+
data file. Because this is single-pass FIFO accumulation (no lookback), the
2716+
last bin may be smaller than ``target_file_size``.
2717+
2718+
Note:
2719+
``target_file_size`` is measured in **uncompressed in-memory** Arrow
2720+
bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
2721+
The resulting Parquet file after compression is typically 3-10×
2722+
smaller than ``target_file_size``. Matches the existing
2723+
:func:`bin_pack_arrow_table` semantics; both will be tightened to true
2724+
on-disk bytes once the writer is switched to a rolling-
2725+
``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
2726+
"""
2727+
buffer: list[pa.RecordBatch] = []
2728+
buffer_bytes = 0
2729+
for batch in batches:
2730+
buffer.append(batch)
2731+
buffer_bytes += batch.nbytes
2732+
if buffer_bytes >= target_file_size:
2733+
yield buffer
2734+
buffer = []
2735+
buffer_bytes = 0
2736+
if buffer:
2737+
yield buffer
2738+
2739+
26932740
def _check_pyarrow_schema_compatible(
26942741
requested_schema: Schema,
26952742
provided_schema: pa.Schema,
@@ -2809,15 +2856,24 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
28092856

28102857
def _dataframe_to_data_files(
28112858
table_metadata: TableMetadata,
2812-
df: pa.Table,
2859+
df: pa.Table | pa.RecordBatchReader,
28132860
io: FileIO,
28142861
write_uuid: uuid.UUID | None = None,
28152862
counter: itertools.count[int] | None = None,
28162863
) -> Iterable[DataFile]:
2817-
"""Convert a PyArrow table into a DataFile.
2864+
"""Convert a PyArrow Table or RecordBatchReader into DataFiles.
2865+
2866+
For a ``pa.Table`` the data is materialised in memory and bin-packed into
2867+
target-sized files (with partition splitting if the table is partitioned).
2868+
2869+
For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
2870+
target-sized files using bounded memory (see :func:`bin_pack_record_batches`).
2871+
Streaming writes are currently only supported on unpartitioned tables;
2872+
partitioned support is tracked in
2873+
https://github.com/apache/iceberg-python/issues/2152.
28182874
28192875
Returns:
2820-
An iterable that supplies datafiles that represent the table.
2876+
An iterable that supplies datafiles that represent the input data.
28212877
"""
28222878
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask
28232879

@@ -2837,6 +2893,23 @@ def _dataframe_to_data_files(
28372893
format_version=table_metadata.format_version,
28382894
)
28392895

2896+
if isinstance(df, pa.RecordBatchReader):
2897+
if not table_metadata.spec().is_unpartitioned():
2898+
raise NotImplementedError(
2899+
"Writing a pa.RecordBatchReader to a partitioned table is not yet supported. "
2900+
"Materialise the reader as a pa.Table first, or follow "
2901+
"https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support."
2902+
)
2903+
yield from write_file(
2904+
io=io,
2905+
table_metadata=table_metadata,
2906+
tasks=(
2907+
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
2908+
for batches in bin_pack_record_batches(df, target_file_size)
2909+
),
2910+
)
2911+
return
2912+
28402913
if table_metadata.spec().is_unpartitioned():
28412914
yield from write_file(
28422915
io=io,

pyiceberg/table/__init__.py

Lines changed: 116 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,53 @@ def update_statistics(self) -> UpdateStatistics:
450450
"""
451451
return UpdateStatistics(transaction=self)
452452

453-
def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
453+
def append(
454+
self,
455+
df: pa.Table | pa.RecordBatchReader,
456+
snapshot_properties: dict[str, str] = EMPTY_DICT,
457+
branch: str | None = MAIN_BRANCH,
458+
) -> None:
454459
"""
455-
Shorthand API for appending a PyArrow table to a table transaction.
460+
Shorthand API for appending PyArrow data to a table transaction.
461+
462+
Accepts either a fully materialised ``pa.Table`` or a streaming
463+
``pa.RecordBatchReader``. Streaming is microbatched by
464+
``write.target-file-size-bytes`` so memory stays bounded; the reader is
465+
consumed once and cannot be reused.
466+
467+
Streaming writes are currently only supported on unpartitioned tables;
468+
passing a ``pa.RecordBatchReader`` for a partitioned table raises
469+
``NotImplementedError``. See
470+
https://github.com/apache/iceberg-python/issues/2152.
471+
472+
Note:
473+
When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
474+
once and cannot be replayed. If the catalog commit fails (e.g.
475+
``CommitFailedException`` from a concurrent writer) the reader is
476+
already drained and a naive retry will append zero rows. Callers
477+
that need at-least-once semantics should either:
478+
479+
- reconstruct the reader on each attempt via a factory callable,
480+
or
481+
- use a two-stage pattern — write Parquet files explicitly and
482+
then call :meth:`add_files` (whose input is a replayable list of
483+
paths) within a retry loop.
484+
485+
Failures during the write stage (mid-stream reader exception, S3
486+
errors) do not commit a snapshot, but may leave orphan data files
487+
in storage that are not referenced by any snapshot. Clean these
488+
up with expire/orphan-file maintenance jobs.
489+
490+
``write.target-file-size-bytes`` is currently interpreted as
491+
uncompressed in-memory Arrow bytes (the bin-packing weight) rather
492+
than compressed on-disk Parquet bytes. The resulting files are
493+
typically 3-10× smaller than the property suggests after
494+
compression. This matches the existing ``pa.Table`` write path and
495+
will be tightened once the writer is switched to a
496+
rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
456497
457498
Args:
458-
df: The Arrow dataframe that will be appended to overwrite the table
499+
df: An Arrow Table or a RecordBatchReader of records to append.
459500
snapshot_properties: Custom properties to be added to the snapshot summary
460501
branch: Branch Reference to run the append operation
461502
"""
@@ -466,8 +507,8 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT,
466507

467508
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files
468509

469-
if not isinstance(df, pa.Table):
470-
raise ValueError(f"Expected PyArrow table, got: {df}")
510+
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
511+
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")
471512

472513
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
473514
_check_pyarrow_schema_compatible(
@@ -478,12 +519,14 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT,
478519
)
479520

480521
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
481-
# skip writing data files if the dataframe is empty
482-
if df.shape[0] > 0:
483-
data_files = list(
484-
_dataframe_to_data_files(
485-
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
486-
)
522+
# For pa.Table we can short-circuit empty inputs cheaply. For a
523+
# RecordBatchReader the stream is consumed lazily by
524+
# _dataframe_to_data_files and an empty reader simply yields zero
525+
# data files (the snapshot is still committed for symmetry with the
526+
# pa.Table case where empty inputs also produce a snapshot).
527+
if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
528+
data_files = _dataframe_to_data_files(
529+
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
487530
)
488531
for data_file in data_files:
489532
append_files.append_data_file(data_file)
@@ -555,14 +598,50 @@ def dynamic_partition_overwrite(
555598

556599
def overwrite(
557600
self,
558-
df: pa.Table,
601+
df: pa.Table | pa.RecordBatchReader,
559602
overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
560603
snapshot_properties: dict[str, str] = EMPTY_DICT,
561604
case_sensitive: bool = True,
562605
branch: str | None = MAIN_BRANCH,
563606
) -> None:
564607
"""
565-
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
608+
Shorthand for adding a table overwrite with a PyArrow table or RecordBatchReader to the transaction.
609+
610+
Accepts either a fully materialised ``pa.Table`` or a streaming
611+
``pa.RecordBatchReader``. Streaming is microbatched by
612+
``write.target-file-size-bytes`` so memory stays bounded; the reader is
613+
consumed once and cannot be reused.
614+
615+
Streaming writes are currently only supported on unpartitioned tables;
616+
passing a ``pa.RecordBatchReader`` for a partitioned table raises
617+
``NotImplementedError``. See
618+
https://github.com/apache/iceberg-python/issues/2152.
619+
620+
Note:
621+
When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
622+
once and cannot be replayed. If the catalog commit fails (e.g.
623+
``CommitFailedException`` from a concurrent writer) the reader is
624+
already drained and a naive retry will write zero rows. Callers
625+
that need at-least-once semantics should either:
626+
627+
- reconstruct the reader on each attempt via a factory callable,
628+
or
629+
- use a two-stage pattern — write Parquet files explicitly and
630+
then call :meth:`add_files` (whose input is a replayable list
631+
of paths) within a retry loop.
632+
633+
Failures during the write stage (mid-stream reader exception, S3
634+
errors) do not commit a snapshot, but may leave orphan data files
635+
in storage that are not referenced by any snapshot. Clean these
636+
up with expire/orphan-file maintenance jobs.
637+
638+
``write.target-file-size-bytes`` is currently interpreted as
639+
uncompressed in-memory Arrow bytes (the bin-packing weight) rather
640+
than compressed on-disk Parquet bytes. The resulting files are
641+
typically 3-10× smaller than the property suggests after
642+
compression. This matches the existing ``pa.Table`` write path and
643+
will be tightened once the writer is switched to a
644+
rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
566645
567646
An overwrite may produce zero or more snapshots based on the operation:
568647
@@ -571,7 +650,7 @@ def overwrite(
571650
- APPEND: In case new data is being inserted into the table.
572651
573652
Args:
574-
df: The Arrow dataframe that will be used to overwrite the table
653+
df: An Arrow Table or a RecordBatchReader of records to write.
575654
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
576655
or a boolean expression in case of a partial overwrite
577656
snapshot_properties: Custom properties to be added to the snapshot summary
@@ -585,8 +664,8 @@ def overwrite(
585664

586665
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files
587666

588-
if not isinstance(df, pa.Table):
589-
raise ValueError(f"Expected PyArrow table, got: {df}")
667+
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
668+
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")
590669

591670
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
592671
_check_pyarrow_schema_compatible(
@@ -606,8 +685,8 @@ def overwrite(
606685
)
607686

608687
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
609-
# skip writing data files if the dataframe is empty
610-
if df.shape[0] > 0:
688+
# See append() for the empty-input handling rationale.
689+
if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
611690
data_files = _dataframe_to_data_files(
612691
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
613692
)
@@ -1373,12 +1452,21 @@ def upsert(
13731452
snapshot_properties=snapshot_properties,
13741453
)
13751454

1376-
def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
1455+
def append(
1456+
self,
1457+
df: pa.Table | pa.RecordBatchReader,
1458+
snapshot_properties: dict[str, str] = EMPTY_DICT,
1459+
branch: str | None = MAIN_BRANCH,
1460+
) -> None:
13771461
"""
1378-
Shorthand API for appending a PyArrow table to the table.
1462+
Shorthand API for appending PyArrow data to the table.
1463+
1464+
Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
1465+
See :meth:`Transaction.append` for streaming semantics and partition
1466+
limitations.
13791467
13801468
Args:
1381-
df: The Arrow dataframe that will be appended to overwrite the table
1469+
df: An Arrow Table or a RecordBatchReader of records to append.
13821470
snapshot_properties: Custom properties to be added to the snapshot summary
13831471
branch: Branch Reference to run the append operation
13841472
"""
@@ -1401,14 +1489,18 @@ def dynamic_partition_overwrite(
14011489

14021490
def overwrite(
14031491
self,
1404-
df: pa.Table,
1492+
df: pa.Table | pa.RecordBatchReader,
14051493
overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
14061494
snapshot_properties: dict[str, str] = EMPTY_DICT,
14071495
case_sensitive: bool = True,
14081496
branch: str | None = MAIN_BRANCH,
14091497
) -> None:
14101498
"""
1411-
Shorthand for overwriting the table with a PyArrow table.
1499+
Shorthand for overwriting the table with a PyArrow Table or RecordBatchReader.
1500+
1501+
Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
1502+
See :meth:`Transaction.overwrite` for streaming semantics and partition
1503+
limitations.
14121504
14131505
An overwrite may produce zero or more snapshots based on the operation:
14141506
@@ -1417,7 +1509,7 @@ def overwrite(
14171509
- APPEND: In case new data is being inserted into the table.
14181510
14191511
Args:
1420-
df: The Arrow dataframe that will be used to overwrite the table
1512+
df: An Arrow Table or a RecordBatchReader of records to write.
14211513
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
14221514
or a boolean expression in case of a partial overwrite
14231515
snapshot_properties: Custom properties to be added to the snapshot summary

0 commit comments

Comments
 (0)