Skip to content

Commit a7280b8

Browse files
authored
FFI: plumb placement for FFI_ScalarUDF (#22608)
## Which issue does this PR close? - Part of #22330. This is the first of the per-method PRs that issue describes. It plumbs `placement` only; the remaining defaulted methods follow separately, so the umbrella issue stays open. ## Rationale for this change `FFI_ScalarUDF` (`datafusion/ffi/src/udf/mod.rs`) carried no function pointer for `placement`, and `ForeignScalarUDF` did not override it, so a producer's override of `ScalarUDFImpl::placement` (default body at `datafusion/expr/src/udf.rs:1028`) was dropped on the consumer side and every foreign UDF fell back to `KeepInPlace`. A UDF loaded over FFI never delivered its leaf-pushdown hint to the optimizer. ## What changes are included in this PR? - New `FFI_ExpressionPlacement` enum bridge in `datafusion/ffi/src/placement.rs`, in the shape of `FFI_Volatility`: `#[repr(u8)]` with `From` impls both ways and a round-trip test over every variant. - A `placement` function pointer on `FFI_ScalarUDF`, populated in the `From<Arc<ScalarUDF>>` constructor, with `placement_fn_wrapper` on the producer side and a forwarding `ForeignScalarUDF::placement` on the consumer side. `placement` is infallible, so the pointer returns the enum directly rather than `FFI_Result`. Adding a field to the `#[repr(C)]` struct changes its layout, so this is an API change and should carry the `api change` label (I can't add it myself). It targets `main` and should not be back-ported to a release branch. `display_name` is also on the issue's list, but it has been deprecated since 50.0.0, so it should be dropped from the gap list rather than plumbed. I have left it and the remaining methods to follow-up PRs. ## Are these changes tested? Yes. - Unit: a round-trip test over all four `ExpressionPlacement` variants, plus a forced-foreign test (`mock_foreign_marker_id`) using a UDF whose `placement` override depends on its arguments. The assertions cover ordered, reordered, and empty argument slices, so argument marshalling is checked, not just the return value. - Integration: `tests/ffi_udf.rs` loads the UDF from the real cdylib and asserts the override survives the boundary, which is the surface a layout change needs. Run with `cargo test -p datafusion-ffi` and `cargo test -p datafusion-ffi --features integration-tests`. ## Are there any user-facing changes? A `placement` override on a `ScalarUDFImpl` is now preserved across the FFI boundary instead of being silently replaced by the default. This is an ABI change to `FFI_ScalarUDF`; consumers must be recompiled against the new layout. --------- Signed-off-by: Amogh Ramesh <ramogh2404@gmail.com>
1 parent 58e37a0 commit a7280b8

6 files changed

Lines changed: 263 additions & 5 deletions

File tree

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub mod ffi_option;
3636
pub mod insert_op;
3737
pub mod physical_expr;
3838
pub mod physical_optimizer;
39+
pub mod placement;
3940
pub mod plan_properties;
4041
pub mod proto;
4142
pub mod record_batch_stream;

datafusion/ffi/src/placement.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_expr::ExpressionPlacement;
19+
20+
#[expect(non_camel_case_types)]
21+
#[repr(u8)]
22+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23+
pub enum FFI_ExpressionPlacement {
24+
Literal,
25+
Column,
26+
MoveTowardsLeafNodes,
27+
KeepInPlace,
28+
}
29+
30+
impl From<ExpressionPlacement> for FFI_ExpressionPlacement {
31+
fn from(value: ExpressionPlacement) -> Self {
32+
match value {
33+
ExpressionPlacement::Literal => Self::Literal,
34+
ExpressionPlacement::Column => Self::Column,
35+
ExpressionPlacement::MoveTowardsLeafNodes => Self::MoveTowardsLeafNodes,
36+
ExpressionPlacement::KeepInPlace => Self::KeepInPlace,
37+
}
38+
}
39+
}
40+
41+
impl From<FFI_ExpressionPlacement> for ExpressionPlacement {
42+
fn from(value: FFI_ExpressionPlacement) -> Self {
43+
match value {
44+
FFI_ExpressionPlacement::Literal => Self::Literal,
45+
FFI_ExpressionPlacement::Column => Self::Column,
46+
FFI_ExpressionPlacement::MoveTowardsLeafNodes => Self::MoveTowardsLeafNodes,
47+
FFI_ExpressionPlacement::KeepInPlace => Self::KeepInPlace,
48+
}
49+
}
50+
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use datafusion::logical_expr::ExpressionPlacement;
55+
56+
use super::FFI_ExpressionPlacement;
57+
58+
fn test_round_trip_placement(placement: ExpressionPlacement) {
59+
let ffi_placement: FFI_ExpressionPlacement = placement.into();
60+
let round_trip: ExpressionPlacement = ffi_placement.into();
61+
62+
assert_eq!(placement, round_trip);
63+
}
64+
65+
#[test]
66+
fn test_all_round_trip_placement() {
67+
test_round_trip_placement(ExpressionPlacement::Literal);
68+
test_round_trip_placement(ExpressionPlacement::Column);
69+
test_round_trip_placement(ExpressionPlacement::MoveTowardsLeafNodes);
70+
test_round_trip_placement(ExpressionPlacement::KeepInPlace);
71+
}
72+
}

datafusion/ffi/src/tests/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ pub struct ForeignLibraryModule {
9090

9191
pub create_timezone_udf: extern "C" fn() -> FFI_ScalarUDF,
9292

93+
pub create_placement_udf: extern "C" fn() -> FFI_ScalarUDF,
94+
9395
pub create_table_function:
9496
extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
9597

@@ -251,6 +253,7 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule {
251253
create_scalar_udf: create_ffi_abs_func,
252254
create_nullary_udf: create_ffi_random_func,
253255
create_timezone_udf: udf_udaf_udwf::create_timezone_func,
256+
create_placement_udf: udf_udaf_udwf::create_placement_func,
254257
create_table_function: create_ffi_table_func,
255258
create_sum_udaf: create_ffi_sum_func,
256259
create_stddev_udaf: create_ffi_stddev_func,

datafusion/ffi/src/tests/udf_udaf_udwf.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use arrow_schema::DataType;
2121
use datafusion_catalog::TableFunctionImpl;
2222
use datafusion_common::ScalarValue;
2323
use datafusion_expr::{
24-
AggregateUDF, ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
25-
Volatility, WindowUDF,
24+
AggregateUDF, ColumnarValue, ExpressionPlacement, ScalarFunctionArgs, ScalarUDF,
25+
ScalarUDFImpl, Signature, Volatility, WindowUDF,
2626
};
2727
use datafusion_functions::math::abs::AbsFunc;
2828
use datafusion_functions::math::random::RandomFunc;
@@ -112,6 +112,56 @@ pub(crate) extern "C" fn create_timezone_func() -> FFI_ScalarUDF {
112112
udf.into()
113113
}
114114

115+
#[derive(Debug, PartialEq, Eq, Hash)]
116+
struct PlacementUDF {
117+
signature: Signature,
118+
}
119+
120+
impl ScalarUDFImpl for PlacementUDF {
121+
fn name(&self) -> &str {
122+
"placement_udf"
123+
}
124+
125+
fn signature(&self) -> &Signature {
126+
&self.signature
127+
}
128+
129+
fn return_type(
130+
&self,
131+
_arg_types: &[DataType],
132+
) -> datafusion_common::Result<DataType> {
133+
Ok(DataType::Int64)
134+
}
135+
136+
fn invoke_with_args(
137+
&self,
138+
_args: ScalarFunctionArgs,
139+
) -> datafusion_common::Result<ColumnarValue> {
140+
datafusion_common::internal_err!("placement_udf is not meant to be invoked")
141+
}
142+
143+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
144+
// Push to the leaves only for a (Column, Literal) pairing, so the
145+
// test catches dropped, reordered, or truncated arguments.
146+
if matches!(
147+
args,
148+
[ExpressionPlacement::Column, ExpressionPlacement::Literal]
149+
) {
150+
ExpressionPlacement::MoveTowardsLeafNodes
151+
} else {
152+
ExpressionPlacement::KeepInPlace
153+
}
154+
}
155+
}
156+
157+
pub(crate) extern "C" fn create_placement_func() -> FFI_ScalarUDF {
158+
let udf: Arc<ScalarUDF> = Arc::new(ScalarUDF::from(PlacementUDF {
159+
signature: Signature::uniform(1, vec![DataType::Int64], Volatility::Immutable),
160+
}));
161+
162+
udf.into()
163+
}
164+
115165
pub(crate) extern "C" fn create_ffi_table_func(
116166
codec: FFI_LogicalExtensionCodec,
117167
) -> FFI_TableFunction {

datafusion/ffi/src/udf/mod.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use datafusion_common::config::ConfigOptions;
2828
use datafusion_common::{DataFusionError, Result, internal_err};
2929
use datafusion_expr::type_coercion::functions::fields_with_udf;
3030
use datafusion_expr::{
31-
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
32-
Signature,
31+
ColumnarValue, ExpressionPlacement, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
32+
ScalarUDFImpl, Signature,
3333
};
3434
use return_type_args::{
3535
FFI_ReturnFieldArgs, ForeignReturnFieldArgs, ForeignReturnFieldArgsOwned,
@@ -41,6 +41,7 @@ use stabby::vec::Vec as SVec;
4141
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
4242
use crate::config::FFI_ConfigOptions;
4343
use crate::expr::columnar_value::FFI_ColumnarValue;
44+
use crate::placement::FFI_ExpressionPlacement;
4445
use crate::util::{
4546
FFI_Result, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped,
4647
};
@@ -91,6 +92,14 @@ pub struct FFI_ScalarUDF {
9192
arg_types: SVec<WrappedSchema>,
9293
) -> FFI_Result<SVec<WrappedSchema>>,
9394

95+
/// FFI equivalent to the `placement` of a [`ScalarUDFImpl`]. Returns the
96+
/// placement hint for the underlying [`ScalarUDF`] given each argument's
97+
/// placement. Infallible, so it returns the value directly, not an `FFI_Result`.
98+
pub placement: unsafe extern "C" fn(
99+
udf: &Self,
100+
args: SVec<FFI_ExpressionPlacement>,
101+
) -> FFI_ExpressionPlacement,
102+
94103
/// Used to create a clone on the provider of the udf. This should
95104
/// only need to be called by the receiver of the udf.
96105
pub clone: unsafe extern "C" fn(udf: &Self) -> Self,
@@ -157,6 +166,18 @@ unsafe extern "C" fn coerce_types_fn_wrapper(
157166
sresult!(vec_datatype_to_rvec_wrapped(&return_types))
158167
}
159168

169+
unsafe extern "C" fn placement_fn_wrapper(
170+
udf: &FFI_ScalarUDF,
171+
args: SVec<FFI_ExpressionPlacement>,
172+
) -> FFI_ExpressionPlacement {
173+
let args = args
174+
.into_iter()
175+
.map(ExpressionPlacement::from)
176+
.collect::<Vec<_>>();
177+
178+
udf.inner().placement(&args).into()
179+
}
180+
160181
unsafe extern "C" fn invoke_with_args_fn_wrapper(
161182
udf: &FFI_ScalarUDF,
162183
args: SVec<WrappedArray>,
@@ -250,6 +271,7 @@ impl From<Arc<ScalarUDF>> for FFI_ScalarUDF {
250271
invoke_with_args: invoke_with_args_fn_wrapper,
251272
return_field_from_args: return_field_from_args_fn_wrapper,
252273
coerce_types: coerce_types_fn_wrapper,
274+
placement: placement_fn_wrapper,
253275
clone: clone_fn_wrapper,
254276
release: release_fn_wrapper,
255277
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -427,12 +449,59 @@ impl ScalarUDFImpl for ForeignScalarUDF {
427449
Ok(rvec_wrapped_to_vec_datatype(&result_types)?)
428450
}
429451
}
452+
453+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
454+
let args = args
455+
.iter()
456+
.map(|p| FFI_ExpressionPlacement::from(*p))
457+
.collect::<SVec<_>>();
458+
459+
let result = unsafe { (self.udf.placement)(&self.udf, args) };
460+
461+
result.into()
462+
}
430463
}
431464

432465
#[cfg(test)]
433466
mod tests {
434467
use super::*;
435468

469+
#[derive(Debug, PartialEq, Eq, Hash)]
470+
struct PlacementUDF {
471+
signature: Signature,
472+
}
473+
474+
impl ScalarUDFImpl for PlacementUDF {
475+
fn name(&self) -> &str {
476+
"placement_udf"
477+
}
478+
479+
fn signature(&self) -> &Signature {
480+
&self.signature
481+
}
482+
483+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
484+
Ok(DataType::Int64)
485+
}
486+
487+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
488+
internal_err!("placement_udf is not meant to be invoked")
489+
}
490+
491+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
492+
// Push to the leaves only for a (Column, Literal) pairing, so the
493+
// test catches dropped, reordered, or truncated arguments.
494+
if matches!(
495+
args,
496+
[ExpressionPlacement::Column, ExpressionPlacement::Literal]
497+
) {
498+
ExpressionPlacement::MoveTowardsLeafNodes
499+
} else {
500+
ExpressionPlacement::KeepInPlace
501+
}
502+
}
503+
}
504+
436505
#[test]
437506
fn test_round_trip_scalar_udf() -> Result<()> {
438507
let original_udf = datafusion::functions::math::abs::AbsFunc::new();
@@ -467,4 +536,42 @@ mod tests {
467536

468537
Ok(())
469538
}
539+
540+
#[test]
541+
fn test_ffi_udf_placement_round_trip() -> Result<()> {
542+
use datafusion_expr::Volatility;
543+
544+
let original_udf = Arc::new(ScalarUDF::from(PlacementUDF {
545+
signature: Signature::uniform(
546+
1,
547+
vec![DataType::Int64],
548+
Volatility::Immutable,
549+
),
550+
}));
551+
552+
let mut ffi_udf = FFI_ScalarUDF::from(original_udf);
553+
554+
// Force the foreign path so the call travels through the FFI vtable
555+
// rather than downcasting back to the original local type.
556+
ffi_udf.library_marker_id = crate::mock_foreign_marker_id;
557+
let foreign_udf: Arc<dyn ScalarUDFImpl> = (&ffi_udf).into();
558+
assert!(foreign_udf.is::<ForeignScalarUDF>());
559+
560+
// Without the plumbing the override is dropped and every call is
561+
// KeepInPlace. The three cases also check the arguments survive the
562+
// round trip in order.
563+
assert_eq!(
564+
foreign_udf
565+
.placement(&[ExpressionPlacement::Column, ExpressionPlacement::Literal]),
566+
ExpressionPlacement::MoveTowardsLeafNodes
567+
);
568+
assert_eq!(
569+
foreign_udf
570+
.placement(&[ExpressionPlacement::Literal, ExpressionPlacement::Column]),
571+
ExpressionPlacement::KeepInPlace
572+
);
573+
assert_eq!(foreign_udf.placement(&[]), ExpressionPlacement::KeepInPlace);
574+
575+
Ok(())
576+
}
470577
}

datafusion/ffi/tests/ffi_udf.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod tests {
2323
use arrow::datatypes::DataType;
2424
use datafusion::common::record_batch;
2525
use datafusion::error::Result;
26-
use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl};
26+
use datafusion::logical_expr::{ExpressionPlacement, ScalarUDF, ScalarUDFImpl};
2727
use datafusion::prelude::{SessionContext, col};
2828
use datafusion_execution::config::SessionConfig;
2929
use datafusion_expr::lit;
@@ -91,6 +91,31 @@ mod tests {
9191
Ok(())
9292
}
9393

94+
/// This test validates that a producer's `placement` override survives the
95+
/// FFI boundary instead of collapsing to the default `KeepInPlace`.
96+
#[tokio::test]
97+
async fn test_scalar_udf_placement() -> Result<()> {
98+
let module = get_module()?;
99+
100+
let ffi_placement_func = (module.create_placement_udf)();
101+
let foreign_func: Arc<dyn ScalarUDFImpl> = (&ffi_placement_func).into();
102+
103+
// The override pushes to the leaves only for (Column, Literal), so these
104+
// also check the arguments cross the boundary in order.
105+
assert_eq!(
106+
foreign_func
107+
.placement(&[ExpressionPlacement::Column, ExpressionPlacement::Literal]),
108+
ExpressionPlacement::MoveTowardsLeafNodes
109+
);
110+
assert_eq!(
111+
foreign_func
112+
.placement(&[ExpressionPlacement::Literal, ExpressionPlacement::Column]),
113+
ExpressionPlacement::KeepInPlace
114+
);
115+
116+
Ok(())
117+
}
118+
94119
#[tokio::test]
95120
async fn test_config_on_scalar_udf() -> Result<()> {
96121
let module = get_module()?;

0 commit comments

Comments
 (0)