Skip to content

Commit a146330

Browse files
committed
Revert "Implement drop_stream function for proper memory management of ArrowArrayStream"
This reverts commit 041bae2.
1 parent 041bae2 commit a146330

1 file changed

Lines changed: 14 additions & 25 deletions

File tree

src/dataframe.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::ffi::{c_void, CString};
20-
use std::sync::{Arc, OnceLock};
20+
use std::sync::Arc;
2121

2222
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
2323
use arrow::compute::can_cast_types;
@@ -39,7 +39,6 @@ use datafusion::prelude::*;
3939
use datafusion_ffi::table_provider::FFI_TableProvider;
4040
use futures::{StreamExt, TryStreamExt};
4141
use pyo3::exceptions::PyValueError;
42-
use pyo3::ffi;
4342
use pyo3::prelude::*;
4443
use pyo3::pybacked::PyBackedStr;
4544
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
@@ -59,19 +58,6 @@ use crate::{
5958
expr::{sort_expr::PySortExpr, PyExpr},
6059
};
6160

62-
static ARROW_STREAM_NAME: OnceLock<CString> = OnceLock::new();
63-
64-
unsafe extern "C" fn drop_stream(capsule: *mut ffi::PyObject) {
65-
if capsule.is_null() {
66-
return;
67-
}
68-
let name = ARROW_STREAM_NAME.get_or_init(|| CString::new("arrow_array_stream").unwrap());
69-
let stream_ptr = ffi::PyCapsule_GetPointer(capsule, name.as_ptr()) as *mut FFI_ArrowArrayStream;
70-
if !stream_ptr.is_null() {
71-
drop(Box::from_raw(stream_ptr));
72-
}
73-
}
74-
7561
// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
7662
// - we have not decided on the table_provider approach yet
7763
// this is an interim implementation
@@ -972,17 +958,20 @@ impl PyDataFrame {
972958
!stream_ptr.is_null(),
973959
"ArrowArrayStream pointer should never be null"
974960
);
975-
let name = ARROW_STREAM_NAME.get_or_init(|| CString::new("arrow_array_stream").unwrap());
976-
let capsule = unsafe {
977-
ffi::PyCapsule_New(stream_ptr as *mut c_void, name.as_ptr(), Some(drop_stream))
978-
};
979-
if capsule.is_null() {
980-
unsafe { drop(Box::from_raw(stream_ptr)) };
981-
Err(PyErr::fetch(py).into())
982-
} else {
983-
let any = unsafe { Bound::from_owned_ptr(py, capsule) };
984-
Ok(any.downcast_into::<PyCapsule>().unwrap())
961+
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
962+
unsafe {
963+
PyCapsule::new_bound_with_destructor(
964+
py,
965+
stream_ptr,
966+
Some(stream_capsule_name),
967+
|ptr: *mut FFI_ArrowArrayStream, _| {
968+
if !ptr.is_null() {
969+
unsafe { Box::from_raw(ptr) };
970+
}
971+
},
972+
)
985973
}
974+
.map_err(PyDataFusionError::from)
986975
}
987976

988977
fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {

0 commit comments

Comments
 (0)