Skip to content

Commit 29d2faf

Browse files
committed
UNPICK
1 parent 74c95e8 commit 29d2faf

File tree

14 files changed

+86
-675
lines changed

14 files changed

+86
-675
lines changed

docs/source/conf.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,6 @@
7272
suppress_warnings = ["autoapi.python_import_resolution"]
7373
autoapi_python_class_content = "both"
7474
autoapi_keep_files = False # set to True for debugging generated files
75-
autoapi_options = [
76-
"members",
77-
"undoc-members",
78-
"special-members",
79-
"show-inheritance",
80-
"show-module-summary",
81-
"imported-members",
82-
]
8375

8476

8577
def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa: ARG001

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -145,41 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` objects
172-
that implement the Arrow C data interface. These batches can be consumed by
173-
libraries like PyArrow without copying:
174-
175-
.. code-block:: python
176-
177-
for batch in df:
178-
pa_batch = batch.to_pyarrow() # optional conversion
179-
... # process each batch as it is produced
180-
181-
See :doc:`../io/arrow` for additional details on the Arrow interface.
182-
183152
HTML Rendering
184153
--------------
185154

python/datafusion/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
)
5454
from .io import read_avro, read_csv, read_json, read_parquet
5555
from .plan import ExecutionPlan, LogicalPlan
56-
from .record_batch import RecordBatch, RecordBatchStream, to_record_batch_stream
56+
from .record_batch import RecordBatch, RecordBatchStream
5757
from .user_defined import (
5858
Accumulator,
5959
AggregateUDF,
@@ -107,7 +107,6 @@
107107
"read_json",
108108
"read_parquet",
109109
"substrait",
110-
"to_record_batch_stream",
111110
"udaf",
112111
"udf",
113112
"udtf",

python/datafusion/dataframe.py

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28-
AsyncIterator,
2928
Iterable,
30-
Iterator,
3129
Literal,
3230
Optional,
3331
Union,
@@ -44,11 +42,7 @@
4442
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4543
from datafusion.expr import Expr, SortExpr, sort_or_default
4644
from datafusion.plan import ExecutionPlan, LogicalPlan
47-
from datafusion.record_batch import (
48-
RecordBatch,
49-
RecordBatchStream,
50-
to_record_batch_stream,
51-
)
45+
from datafusion.record_batch import RecordBatchStream
5246

5347
if TYPE_CHECKING:
5448
import pathlib
@@ -59,7 +53,6 @@
5953
import pyarrow as pa
6054

6155
from datafusion._internal import expr as expr_internal
62-
from datafusion.record_batch import RecordBatch
6356

6457
from enum import Enum
6558

@@ -296,9 +289,6 @@ def __init__(
296289
class DataFrame:
297290
"""Two dimensional table representation of data.
298291
299-
DataFrame objects are iterable; iterating over a DataFrame yields
300-
:class:`pyarrow.RecordBatch` instances lazily.
301-
302292
See :ref:`user_guide_concepts` in the online documentation for more information.
303293
"""
304294

@@ -1028,22 +1018,6 @@ def to_arrow_table(self) -> pa.Table:
10281018
"""
10291019
return self.df.to_arrow_table()
10301020

1031-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1032-
"""Iterate over :py:class:`pyarrow.RecordBatch` objects.
1033-
1034-
This executes the DataFrame and yields each partition as a native
1035-
:py:class:`pyarrow.RecordBatch`.
1036-
1037-
Yields:
1038-
pyarrow.RecordBatch: the next batch in the result stream.
1039-
"""
1040-
for batch in self.execute_stream():
1041-
# ``execute_stream`` yields batches that may be ``RecordBatch``
1042-
# wrappers or ``pyarrow.RecordBatch`` objects directly. Convert
1043-
# to native PyArrow batches when necessary to provide a consistent
1044-
# iterator interface.
1045-
yield batch.to_pyarrow() if hasattr(batch, "to_pyarrow") else batch
1046-
10471021
def execute_stream(self) -> RecordBatchStream:
10481022
"""Executes this DataFrame and returns a stream over a single partition.
10491023
@@ -1124,41 +1098,21 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
11241098
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
11251099

11261100
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1127-
"""Export the DataFrame as an Arrow C Stream.
1101+
"""Export an Arrow PyCapsule Stream.
11281102
1129-
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1130-
Arrow's C Stream interface. Record batches are produced incrementally, so the
1131-
full result set is never materialized in memory. When ``requested_schema`` is
1132-
provided, only straightforward projections such as column selection or
1133-
reordering are applied.
1103+
This will execute and collect the DataFrame. We will attempt to respect the
1104+
requested schema, but only trivial transformations will be applied such as only
1105+
returning the fields listed in the requested schema if their data types match
1106+
those in the DataFrame.
11341107
11351108
Args:
11361109
requested_schema: Attempt to provide the DataFrame using this schema.
11371110
11381111
Returns:
1139-
Arrow PyCapsule object representing an ``ArrowArrayStream``.
1112+
Arrow PyCapsule object.
11401113
"""
1141-
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1142-
# ``execute_stream_partitioned`` under the hood to stream batches while
1143-
# preserving the original partition order.
11441114
return self.df.__arrow_c_stream__(requested_schema)
11451115

1146-
def __iter__(self) -> Iterator[RecordBatch]:
1147-
"""Yield record batches from the DataFrame without materializing results.
1148-
1149-
This implementation delegates to :func:`to_record_batch_stream`, which
1150-
executes the DataFrame and returns a :class:`RecordBatchStream`.
1151-
"""
1152-
return to_record_batch_stream(self).__iter__()
1153-
1154-
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1155-
"""Asynchronously yield record batches from the DataFrame.
1156-
1157-
This delegates to :func:`to_record_batch_stream` to obtain a
1158-
:class:`RecordBatchStream` and returns its asynchronous iterator.
1159-
"""
1160-
return to_record_batch_stream(self).__aiter__()
1161-
11621116
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11631117
"""Apply a function to the current DataFrame which returns another DataFrame.
11641118

python/datafusion/record_batch.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@
2525

2626
from typing import TYPE_CHECKING
2727

28-
import datafusion._internal as df_internal
29-
3028
if TYPE_CHECKING:
3129
import pyarrow as pa
3230
import typing_extensions
3331

34-
from datafusion.dataframe import DataFrame
32+
import datafusion._internal as df_internal
3533

3634

3735
class RecordBatch:
@@ -69,10 +67,10 @@ async def __anext__(self) -> RecordBatch:
6967
next_batch = await self.rbs.__anext__()
7068
return RecordBatch(next_batch)
7169

72-
def __next__(self) -> pa.RecordBatch:
70+
def __next__(self) -> RecordBatch:
7371
"""Iterator function."""
7472
next_batch = next(self.rbs)
75-
return next_batch.to_pyarrow()
73+
return RecordBatch(next_batch)
7674

7775
def __aiter__(self) -> typing_extensions.Self:
7876
"""Async iterator function."""
@@ -81,15 +79,3 @@ def __aiter__(self) -> typing_extensions.Self:
8179
def __iter__(self) -> typing_extensions.Self:
8280
"""Iterator function."""
8381
return self
84-
85-
86-
def to_record_batch_stream(df: DataFrame) -> RecordBatchStream:
87-
"""Convert a DataFrame into a RecordBatchStream.
88-
89-
Args:
90-
df: DataFrame to convert.
91-
92-
Returns:
93-
A RecordBatchStream representing the DataFrame.
94-
"""
95-
return df.execute_stream()

python/tests/conftest.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import DataFrame, SessionContext
20+
from datafusion import SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,12 +49,3 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52-
53-
54-
@pytest.fixture
55-
def fail_collect(monkeypatch):
56-
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57-
msg = "collect should not be called"
58-
raise AssertionError(msg)
59-
60-
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)