Skip to content

Latest commit

 

History

History
123 lines (85 loc) · 3.7 KB

File metadata and controls

123 lines (85 loc) · 3.7 KB

Arrow Interface

DataFusion DataFrames can stream results to Arrow-based Python libraries without first materializing all batches in memory. This page focuses on the Arrow interfaces exposed by DataFrame and when to use each one.

Zero-copy Streaming

DataFusion DataFrames implement the __arrow_c_stream__ protocol, enabling zero-copy, lazy streaming into Arrow-based Python libraries. With the streaming protocol, batches are produced on demand.

Note

The protocol is implementation-agnostic and works with any Python library that understands the Arrow C streaming interface (for example, PyArrow or other Arrow-compatible implementations). The sections below provide a short PyArrow-specific example and general guidance for other implementations.

PyArrow

import pyarrow as pa

# Create a PyArrow RecordBatchReader without materializing all batches
reader = pa.RecordBatchReader.from_stream(df)
for batch in reader:
    ...  # process each batch as it is produced

DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` objects lazily so you can loop over results directly without importing PyArrow:

for batch in df:
    ...  # each batch is a ``datafusion.RecordBatch``

Each batch exposes to_pyarrow(), allowing conversion to a PyArrow table. pa.table(df) collects the entire DataFrame eagerly into a PyArrow table:

import pyarrow as pa

table = pa.table(df)

Asynchronous iteration is supported as well, allowing integration with asyncio event loops:

async for batch in df:
    ...  # process each batch as it is produced

Execute as Stream

For finer control over streaming execution, use :py:meth:`~datafusion.DataFrame.execute_stream` to obtain a :py:class:`datafusion.RecordBatchStream`:

stream = df.execute_stream()
for batch in stream:
    ...  # process each batch as it is produced

Tip

To get a PyArrow reader instead, call

pa.RecordBatchReader.from_stream(df).

When partition boundaries are important, :py:meth:`~datafusion.DataFrame.execute_stream_partitioned` returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per partition:

for stream in df.execute_stream_partitioned():
    for batch in stream:
        ...  # each stream yields RecordBatches

To process partitions concurrently, first collect the streams into a list and then poll each one in a separate asyncio task:

import asyncio

async def consume(stream):
    async for batch in stream:
        ...

streams = list(df.execute_stream_partitioned())
await asyncio.gather(*(consume(s) for s in streams))

See :doc:`../io/arrow` for additional details on the Arrow interface.