Skip to content

Commit ed5da2f

Browse files
committed
with_gil is deprecated
1 parent 70a688b commit ed5da2f

File tree

12 files changed

+55
-57
lines changed

12 files changed

+55
-57
lines changed

src/catalog.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl PyCatalog {
8888
"Schema with name {name} doesn't exist."
8989
)))?;
9090

91-
Python::with_gil(|py| {
91+
Python::attach(|py| {
9292
match schema
9393
.as_any()
9494
.downcast_ref::<RustWrappedPySchemaProvider>()
@@ -207,7 +207,7 @@ pub(crate) struct RustWrappedPySchemaProvider {
207207

208208
impl RustWrappedPySchemaProvider {
209209
pub fn new(schema_provider: PyObject) -> Self {
210-
let owner_name = Python::with_gil(|py| {
210+
let owner_name = Python::attach(|py| {
211211
schema_provider
212212
.bind(py)
213213
.getattr("owner_name")
@@ -222,7 +222,7 @@ impl RustWrappedPySchemaProvider {
222222
}
223223

224224
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider>>> {
225-
Python::with_gil(|py| {
225+
Python::attach(|py| {
226226
let provider = self.schema_provider.bind(py);
227227
let py_table_method = provider.getattr("table")?;
228228

@@ -249,7 +249,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
249249
}
250250

251251
fn table_names(&self) -> Vec<String> {
252-
Python::with_gil(|py| {
252+
Python::attach(|py| {
253253
let provider = self.schema_provider.bind(py);
254254

255255
provider
@@ -275,7 +275,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
275275
table: Arc<dyn TableProvider>,
276276
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
277277
let py_table = PyTable::from(table);
278-
Python::with_gil(|py| {
278+
Python::attach(|py| {
279279
let provider = self.schema_provider.bind(py);
280280
let _ = provider
281281
.call_method1("register_table", (name, py_table))
@@ -291,7 +291,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
291291
&self,
292292
name: &str,
293293
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
294-
Python::with_gil(|py| {
294+
Python::attach(|py| {
295295
let provider = self.schema_provider.bind(py);
296296
let table = provider
297297
.call_method1("deregister_table", (name,))
@@ -312,7 +312,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
312312
}
313313

314314
fn table_exist(&self, name: &str) -> bool {
315-
Python::with_gil(|py| {
315+
Python::attach(|py| {
316316
let provider = self.schema_provider.bind(py);
317317
provider
318318
.call_method1("table_exist", (name,))
@@ -333,7 +333,7 @@ impl RustWrappedPyCatalogProvider {
333333
}
334334

335335
fn schema_inner(&self, name: &str) -> PyResult<Option<Arc<dyn SchemaProvider>>> {
336-
Python::with_gil(|py| {
336+
Python::attach(|py| {
337337
let provider = self.catalog_provider.bind(py);
338338

339339
let py_schema = provider.call_method1("schema", (name,))?;
@@ -378,7 +378,7 @@ impl CatalogProvider for RustWrappedPyCatalogProvider {
378378
}
379379

380380
fn schema_names(&self) -> Vec<String> {
381-
Python::with_gil(|py| {
381+
Python::attach(|py| {
382382
let provider = self.catalog_provider.bind(py);
383383
provider
384384
.getattr("schema_names")
@@ -402,9 +402,7 @@ impl CatalogProvider for RustWrappedPyCatalogProvider {
402402
name: &str,
403403
schema: Arc<dyn SchemaProvider>,
404404
) -> datafusion::common::Result<Option<Arc<dyn SchemaProvider>>> {
405-
// JRIGHT HERE
406-
// let py_schema: PySchema = schema.into();
407-
Python::with_gil(|py| {
405+
Python::attach(|py| {
408406
let py_schema = match schema
409407
.as_any()
410408
.downcast_ref::<RustWrappedPySchemaProvider>()
@@ -435,7 +433,7 @@ impl CatalogProvider for RustWrappedPyCatalogProvider {
435433
name: &str,
436434
cascade: bool,
437435
) -> datafusion::common::Result<Option<Arc<dyn SchemaProvider>>> {
438-
Python::with_gil(|py| {
436+
Python::attach(|py| {
439437
let provider = self.catalog_provider.bind(py);
440438
let schema = provider
441439
.call_method1("deregister_schema", (name, cascade))

src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ impl PySessionContext {
855855
"Catalog with name {name} doesn't exist."
856856
)))?;
857857

858-
Python::with_gil(|py| {
858+
Python::attach(|py| {
859859
match catalog
860860
.as_any()
861861
.downcast_ref::<RustWrappedPyCatalogProvider>()

src/dataset.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl TableProvider for Dataset {
7373

7474
/// Get a reference to the schema for this table
7575
fn schema(&self) -> SchemaRef {
76-
Python::with_gil(|py| {
76+
Python::attach(|py| {
7777
let dataset = self.dataset.bind(py);
7878
// This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
7979
Arc::new(
@@ -107,7 +107,7 @@ impl TableProvider for Dataset {
107107
// The datasource should return *at least* this number of rows if available.
108108
_limit: Option<usize>,
109109
) -> DFResult<Arc<dyn ExecutionPlan>> {
110-
Python::with_gil(|py| {
110+
Python::attach(|py| {
111111
let plan: Arc<dyn ExecutionPlan> = Arc::new(
112112
DatasetExec::new(py, self.dataset.bind(py), projection.cloned(), filters)
113113
.map_err(|err| DataFusionError::External(Box::new(err)))?,

src/dataset_exec.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Iterator for PyArrowBatchesAdapter {
5353
type Item = ArrowResult<RecordBatch>;
5454

5555
fn next(&mut self) -> Option<Self::Item> {
56-
Python::with_gil(|py| {
56+
Python::attach(|py| {
5757
let mut batches = self.batches.clone_ref(py).into_bound(py);
5858
Some(
5959
batches
@@ -187,7 +187,7 @@ impl ExecutionPlan for DatasetExec {
187187
context: Arc<TaskContext>,
188188
) -> DFResult<SendableRecordBatchStream> {
189189
let batch_size = context.session_config().batch_size();
190-
Python::with_gil(|py| {
190+
Python::attach(|py| {
191191
let dataset = self.dataset.bind(py);
192192
let fragments = self.fragments.bind(py);
193193
let fragment = fragments
@@ -272,7 +272,7 @@ impl ExecutionPlanProperties for DatasetExec {
272272

273273
impl DisplayAs for DatasetExec {
274274
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
275-
Python::with_gil(|py| {
275+
Python::attach(|py| {
276276
let number_of_fragments = self.fragments.bind(py).len();
277277
match t {
278278
DisplayFormatType::Default

src/expr.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ pub fn py_expr_list(expr: &[Expr]) -> PyResult<Vec<PyExpr>> {
142142
impl PyExpr {
143143
/// Return the specific expression
144144
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
145-
Python::with_gil(|_| {
146-
match &self.expr {
145+
match &self.expr {
147146
Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_bound_py_any(py)?),
148147
Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_bound_py_any(py)?),
149148
Expr::ScalarVariable(data_type, variables) => {
@@ -199,7 +198,6 @@ impl PyExpr {
199198
))),
200199
Expr::Unnest(value) => Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_bound_py_any(py)?),
201200
}
202-
})
203201
}
204202

205203
/// Returns the name of this expression as it should appear in a schema. This name

src/pyarrow_filter_expression.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
101101
// isin, is_null, and is_valid (~is_null) are methods of pyarrow.dataset.Expression
102102
// https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Expression.html#pyarrow-dataset-expression
103103
fn try_from(expr: &Expr) -> Result<Self, Self::Error> {
104-
Python::with_gil(|py| {
104+
Python::attach(|py| {
105105
let pc = Python::import(py, "pyarrow.compute")?;
106106
let op_module = Python::import(py, "operator")?;
107107
let pc_expr: PyDataFusionResult<Bound<'_, PyAny>> = match expr {

src/pyarrow_util.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ impl<'source> FromPyObject<'source> for PyScalarValue {
5151
}
5252
}
5353

54-
pub fn scalar_to_pyarrow<'py>(scalar: &ScalarValue, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
54+
pub fn scalar_to_pyarrow<'py>(
55+
scalar: &ScalarValue,
56+
py: Python<'py>,
57+
) -> PyResult<Bound<'py, PyAny>> {
5558
let array = scalar.to_array().map_err(PyDataFusionError::from)?;
5659
// convert to pyarrow array using C data interface
5760
let pyarray = array.to_data().to_pyarrow(py)?;

src/udaf.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,18 @@ use pyo3::types::PyCapsule;
3636

3737
#[derive(Debug)]
3838
struct RustAccumulator {
39-
accum: PyObject,
39+
accum: Py<PyAny>,
4040
}
4141

4242
impl RustAccumulator {
43-
fn new(accum: PyObject) -> Self {
43+
fn new(accum: Py<PyAny>) -> Self {
4444
Self { accum }
4545
}
4646
}
4747

4848
impl Accumulator for RustAccumulator {
4949
fn state(&mut self) -> Result<Vec<ScalarValue>> {
50-
Python::with_gil(|py| {
50+
Python::attach(|py| {
5151
self.accum
5252
.bind(py)
5353
.call_method0("state")?
@@ -58,7 +58,7 @@ impl Accumulator for RustAccumulator {
5858
}
5959

6060
fn evaluate(&mut self) -> Result<ScalarValue> {
61-
Python::with_gil(|py| {
61+
Python::attach(|py| {
6262
self.accum
6363
.bind(py)
6464
.call_method0("evaluate")?
@@ -69,7 +69,7 @@ impl Accumulator for RustAccumulator {
6969
}
7070

7171
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
72-
Python::with_gil(|py| {
72+
Python::attach(|py| {
7373
// 1. cast args to Pyarrow array
7474
let py_args = values
7575
.iter()
@@ -88,7 +88,7 @@ impl Accumulator for RustAccumulator {
8888
}
8989

9090
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
91-
Python::with_gil(|py| {
91+
Python::attach(|py| {
9292
// // 1. cast states to Pyarrow arrays
9393
let py_states: Result<Vec<Bound<PyAny>>> = states
9494
.iter()
@@ -115,7 +115,7 @@ impl Accumulator for RustAccumulator {
115115
}
116116

117117
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
118-
Python::with_gil(|py| {
118+
Python::attach(|py| {
119119
// 1. cast args to Pyarrow array
120120
let py_args = values
121121
.iter()
@@ -134,7 +134,7 @@ impl Accumulator for RustAccumulator {
134134
}
135135

136136
fn supports_retract_batch(&self) -> bool {
137-
Python::with_gil(
137+
Python::attach(
138138
|py| match self.accum.bind(py).call_method0("supports_retract_batch") {
139139
Ok(x) => x.extract().unwrap_or(false),
140140
Err(_) => false,
@@ -143,9 +143,9 @@ impl Accumulator for RustAccumulator {
143143
}
144144
}
145145

146-
pub fn to_rust_accumulator(accum: PyObject) -> AccumulatorFactoryFunction {
146+
pub fn to_rust_accumulator(accum: Py<PyAny>) -> AccumulatorFactoryFunction {
147147
Arc::new(move |_| -> Result<Box<dyn Accumulator>> {
148-
let accum = Python::with_gil(|py| {
148+
let accum = Python::attach(|py| {
149149
accum
150150
.call0(py)
151151
.map_err(|e| DataFusionError::Execution(format!("{e}")))
@@ -167,7 +167,7 @@ impl PyAggregateUDF {
167167
#[pyo3(signature=(name, accumulator, input_type, return_type, state_type, volatility))]
168168
fn new(
169169
name: &str,
170-
accumulator: PyObject,
170+
accumulator: Py<PyAny>,
171171
input_type: PyArrowType<Vec<DataType>>,
172172
return_type: PyArrowType<DataType>,
173173
state_type: PyArrowType<Vec<DataType>>,

src/udf.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ use crate::utils::{parse_volatility, validate_pycapsule};
3737

3838
/// Create a Rust callable function from a python function that expects pyarrow arrays
3939
fn pyarrow_function_to_rust(
40-
func: PyObject,
40+
func: Bound<PyAny>,
4141
) -> impl Fn(&[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
4242
move |args: &[ArrayRef]| -> Result<ArrayRef, DataFusionError> {
43-
Python::with_gil(|py| {
43+
Python::attach(|py| {
4444
// 1. cast args to Pyarrow arrays
4545
let py_args = args
4646
.iter()
@@ -54,11 +54,11 @@ fn pyarrow_function_to_rust(
5454

5555
// 2. call function
5656
let value = func
57-
.call(py, py_args, None)
57+
.call(py_args, None)
5858
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
5959

6060
// 3. cast to arrow::array::Array
61-
let array_data = ArrayData::from_pyarrow_bound(value.bind(py))
61+
let array_data = ArrayData::from_pyarrow_bound(&value)
6262
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
6363
Ok(make_array(array_data))
6464
})
@@ -68,7 +68,7 @@ fn pyarrow_function_to_rust(
6868
/// Create a DataFusion's UDF implementation from a python function
6969
/// that expects pyarrow arrays. This is more efficient as it performs
7070
/// a zero-copy of the contents.
71-
fn to_scalar_function_impl(func: PyObject) -> ScalarFunctionImplementation {
71+
fn to_scalar_function_impl(func: Bound<PyAny>) -> ScalarFunctionImplementation {
7272
// Make the python function callable from rust
7373
let pyarrow_func = pyarrow_function_to_rust(func);
7474

@@ -93,7 +93,7 @@ impl PyScalarUDF {
9393
#[pyo3(signature=(name, func, input_types, return_type, volatility))]
9494
fn new(
9595
name: &str,
96-
func: PyObject,
96+
func: Bound<PyAny>,
9797
input_types: PyArrowType<Vec<DataType>>,
9898
return_type: PyArrowType<DataType>,
9999
volatility: &str,

src/udtf.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct PyTableFunction {
3939
// TODO: Implement pure python based user defined table functions
4040
#[derive(Debug, Clone)]
4141
pub(crate) enum PyTableFunctionInner {
42-
PythonFunction(Arc<PyObject>),
42+
PythonFunction(Arc<Py<PyAny>>),
4343
FFIFunction(Arc<dyn TableFunctionImpl>),
4444
}
4545

@@ -83,16 +83,15 @@ impl PyTableFunction {
8383

8484
#[allow(clippy::result_large_err)]
8585
fn call_python_table_function(
86-
func: &Arc<PyObject>,
86+
func: &Arc<Py<PyAny>>,
8787
args: &[Expr],
8888
) -> DataFusionResult<Arc<dyn TableProvider>> {
8989
let args = args
9090
.iter()
9191
.map(|arg| PyExpr::from(arg.clone()))
9292
.collect::<Vec<_>>();
9393

94-
// move |args: &[ArrayRef]| -> Result<ArrayRef, DataFusionError> {
95-
Python::with_gil(|py| {
94+
Python::attach(|py| {
9695
let py_args = PyTuple::new(py, args)?;
9796
let provider_obj = func.call1(py, py_args)?;
9897
let provider = provider_obj.bind(py);

0 commit comments

Comments
 (0)