@@ -4,7 +4,7 @@ use std::sync::Arc;
44use arrow:: array:: {
55 ArrayRef , RecordBatch , StringBuilder , TimestampMicrosecondBuilder , UInt64Builder ,
66} ;
7- use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef , TimeUnit } ;
7+ use arrow:: datatypes:: { DataType , Field , Schema , TimeUnit } ;
88use futures:: stream:: { BoxStream , Fuse } ;
99use futures:: StreamExt ;
1010use indexmap:: IndexMap ;
@@ -14,7 +14,8 @@ use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration};
1414use pyo3:: prelude:: * ;
1515use pyo3:: types:: PyDict ;
1616use pyo3:: { intern, IntoPyObjectExt } ;
17- use pyo3_arrow:: { PyRecordBatch , PyTable } ;
17+ use pyo3_arrow:: export:: { Arro3RecordBatch , Arro3Table } ;
18+ use pyo3_arrow:: PyTable ;
1819use pyo3_async_runtimes:: tokio:: get_runtime;
1920use pyo3_object_store:: { PyObjectStore , PyObjectStoreError , PyObjectStoreResult } ;
2021use tokio:: sync:: Mutex ;
@@ -139,7 +140,7 @@ impl PyListStream {
139140
140141#[ derive( IntoPyObject ) ]
141142enum PyListIterResult {
142- Arrow ( PyRecordBatchWrapper ) ,
143+ Arrow ( Arro3RecordBatch ) ,
143144 Native ( Vec < PyObjectMeta > ) ,
144145}
145146
@@ -158,7 +159,9 @@ async fn next_stream(
158159 if metas. len ( ) >= chunk_size {
159160 match return_arrow {
160161 true => {
161- return Ok ( PyListIterResult :: Arrow ( object_meta_to_arrow ( & metas) ) ) ;
162+ return Ok ( PyListIterResult :: Arrow (
163+ object_meta_to_arrow ( & metas) . into ( ) ,
164+ ) ) ;
162165 }
163166 false => {
164167 return Ok ( PyListIterResult :: Native ( metas) ) ;
@@ -179,7 +182,9 @@ async fn next_stream(
179182 } else {
180183 match return_arrow {
181184 true => {
182- return Ok ( PyListIterResult :: Arrow ( object_meta_to_arrow ( & metas) ) ) ;
185+ return Ok ( PyListIterResult :: Arrow (
186+ object_meta_to_arrow ( & metas) . into ( ) ,
187+ ) ) ;
183188 }
184189 false => {
185190 return Ok ( PyListIterResult :: Native ( metas) ) ;
@@ -205,7 +210,7 @@ async fn collect_stream(
205210 Some ( Err ( e) ) => return Err ( PyObjectStoreError :: from ( e) . into ( ) ) ,
206211 None => match return_arrow {
207212 true => {
208- return Ok ( PyListIterResult :: Arrow ( object_meta_to_arrow ( & metas) ) ) ;
213+ return Ok ( PyListIterResult :: Arrow ( object_meta_to_arrow ( & metas) . into ( ) ) ) ;
209214 }
210215 false => {
211216 return Ok ( PyListIterResult :: Native ( metas) ) ;
@@ -215,58 +220,6 @@ async fn collect_stream(
215220 }
216221}
217222
218- struct PyRecordBatchWrapper ( PyRecordBatch ) ;
219-
220- impl PyRecordBatchWrapper {
221- fn new ( batch : RecordBatch ) -> Self {
222- Self ( PyRecordBatch :: new ( batch) )
223- }
224-
225- fn into_table ( self ) -> PyResult < PyTableWrapper > {
226- let batch = self . 0 . into_inner ( ) ;
227- let schema = batch. schema ( ) ;
228- PyTableWrapper :: new ( vec ! [ batch] , schema)
229- }
230- }
231-
232- impl < ' py > IntoPyObject < ' py > for PyRecordBatchWrapper {
233- type Target = PyAny ;
234- type Output = Bound < ' py , PyAny > ;
235- type Error = PyErr ;
236-
237- fn into_pyobject ( self , py : Python < ' py > ) -> Result < Self :: Output , Self :: Error > {
238- py. import ( intern ! ( py, "arro3.core" ) ) . map_err ( |_| {
239- PyImportError :: new_err (
240- "Could not import arro3.core. Install with\n pip install arro3-core" ,
241- )
242- } ) ?;
243- self . 0 . into_arro3 ( py)
244- }
245- }
246-
247- struct PyTableWrapper ( PyTable ) ;
248-
249- impl PyTableWrapper {
250- fn new ( batches : Vec < RecordBatch > , schema : SchemaRef ) -> PyResult < Self > {
251- Ok ( Self ( PyTable :: try_new ( batches, schema) ?) )
252- }
253- }
254-
255- impl < ' py > IntoPyObject < ' py > for PyTableWrapper {
256- type Target = PyAny ;
257- type Output = Bound < ' py , PyAny > ;
258- type Error = PyErr ;
259-
260- fn into_pyobject ( self , py : Python < ' py > ) -> Result < Self :: Output , Self :: Error > {
261- py. import ( intern ! ( py, "arro3.core" ) ) . map_err ( |_| {
262- PyImportError :: new_err (
263- "Could not import arro3.core. Install with\n pip install arro3-core" ,
264- )
265- } ) ?;
266- self . 0 . into_arro3 ( py)
267- }
268- }
269-
270223/// Array capacities for each string array
271224struct ObjectMetaCapacity {
272225 location : usize ,
@@ -304,7 +257,7 @@ fn object_meta_capacities(metas: &[PyObjectMeta]) -> ObjectMetaCapacity {
304257 capacity
305258}
306259
307- fn object_meta_to_arrow ( metas : & [ PyObjectMeta ] ) -> PyRecordBatchWrapper {
260+ fn object_meta_to_arrow ( metas : & [ PyObjectMeta ] ) -> RecordBatch {
308261 let capacity = object_meta_capacities ( metas) ;
309262
310263 let mut location = StringBuilder :: with_capacity ( metas. len ( ) , capacity. location ) ;
@@ -344,8 +297,7 @@ fn object_meta_to_arrow(metas: &[PyObjectMeta]) -> PyRecordBatchWrapper {
344297 Arc :: new( version. finish( ) ) ,
345298 ] ;
346299 // This unwrap is ok because we know the RecordBatch is valid.
347- let batch = RecordBatch :: try_new ( schema. into ( ) , columns) . unwrap ( ) ;
348- PyRecordBatchWrapper :: new ( batch)
300+ RecordBatch :: try_new ( schema. into ( ) , columns) . unwrap ( )
349301}
350302
351303pub ( crate ) struct PyListResult {
@@ -385,9 +337,9 @@ impl<'py> IntoPyObject<'py> for PyListResult {
385337 . map ( PyObjectMeta )
386338 . collect :: < Vec < _ > > ( ) ;
387339 let objects = if self . return_arrow {
388- object_meta_to_arrow ( & objects)
389- . into_table ( ) ?
390- . into_bound_py_any ( py)
340+ let batch = object_meta_to_arrow ( & objects) ;
341+ let schema = batch . schema ( ) ;
342+ Arro3Table :: from ( PyTable :: try_new ( vec ! [ batch ] , schema ) . unwrap ( ) ) . into_bound_py_any ( py)
391343 } else {
392344 objects. into_bound_py_any ( py)
393345 } ?;
0 commit comments