@@ -41,6 +41,11 @@ use datafusion::execution::context::TaskContext;
4141use datafusion:: logical_expr:: SortExpr ;
4242use datafusion:: logical_expr:: dml:: InsertOp ;
4343use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
44+ use datafusion:: physical_plan:: {
45+ ExecutionPlan as DFExecutionPlan , collect as df_collect,
46+ collect_partitioned as df_collect_partitioned, execute_stream as df_execute_stream,
47+ execute_stream_partitioned as df_execute_stream_partitioned,
48+ } ;
4449use datafusion:: prelude:: * ;
4550use datafusion_python_util:: { is_ipython_env, spawn_future, wait_for_future} ;
4651use futures:: { StreamExt , TryStreamExt } ;
@@ -52,13 +57,6 @@ use pyo3::pybacked::PyBackedStr;
5257use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
5358
5459use crate :: common:: data_type:: PyScalarValue ;
55- use datafusion:: physical_plan:: {
56- ExecutionPlan as DFExecutionPlan ,
57- collect as df_collect,
58- collect_partitioned as df_collect_partitioned,
59- execute_stream as df_execute_stream,
60- execute_stream_partitioned as df_execute_stream_partitioned,
61- } ;
6260use crate :: errors:: { PyDataFusionError , PyDataFusionResult , py_datafusion_err} ;
6361use crate :: expr:: PyExpr ;
6462use crate :: expr:: sort_expr:: { PySortExpr , to_sort_expressions} ;
@@ -672,8 +670,8 @@ impl PyDataFrame {
672670 /// guarantee of the order of the result.
673671 fn collect < ' py > ( & self , py : Python < ' py > ) -> PyResult < Vec < Bound < ' py , PyAny > > > {
674672 let ( plan, task_ctx) = self . create_and_cache_plan ( py) ?;
675- let batches = wait_for_future ( py , df_collect ( plan , task_ctx ) ) ?
676- . map_err ( PyDataFusionError :: from) ?;
673+ let batches =
674+ wait_for_future ( py , df_collect ( plan , task_ctx ) ) ? . map_err ( PyDataFusionError :: from) ?;
677675 // cannot use PyResult<Vec<RecordBatch>> return type due to
678676 // https://github.com/PyO3/pyo3/issues/1813
679677 batches. into_iter ( ) . map ( |rb| rb. to_pyarrow ( py) ) . collect ( )
@@ -1187,9 +1185,10 @@ impl PyDataFrame {
11871185
11881186 fn execute_stream_partitioned ( & self , py : Python ) -> PyResult < Vec < PyRecordBatchStream > > {
11891187 let ( plan, task_ctx) = self . create_and_cache_plan ( py) ?;
1190- let streams = spawn_future ( py, async move {
1191- df_execute_stream_partitioned ( plan, task_ctx)
1192- } ) ?;
1188+ let streams = spawn_future (
1189+ py,
1190+ async move { df_execute_stream_partitioned ( plan, task_ctx) } ,
1191+ ) ?;
11931192 Ok ( streams. into_iter ( ) . map ( PyRecordBatchStream :: new) . collect ( ) )
11941193 }
11951194
0 commit comments