Skip to content

Commit e1d8d46

Browse files
nathanb9Nathan Bezualem
andauthored
Add optimize_with_context to FFI_PhysicalOptimizerRule (apache#22584)
## Which issue does this PR close? Closes apache#22334 ## Rationale for this change `FFI_PhysicalOptimizerRule` only plumbed `optimize`, `name`, and `schema_check` — not `optimize_with_context`. Foreign rules that override the context-aware variant had their override silently discarded. ## What changes are included in this PR? - Added `FFI_PhysicalOptimizerContext` struct to pass optimizer context (config + statistics registry) across FFI - Added `optimize_with_context` function pointer to `FFI_PhysicalOptimizerRule` - `ForeignPhysicalOptimizerRule` now overrides `optimize_with_context` to route through FFI - Unit tests for context-aware round-trip (with and without statistics registry) ## Are these changes tested? Yes — two new tests (`test_optimize_with_context_round_trip`, `test_optimize_with_context_with_registry`) plus all existing tests continue to pass. ## Are there any user-facing changes? API change: `FFI_PhysicalOptimizerRule` gains a new field (`optimize_with_context`). This is a layout change for any external consumer of this struct. --------- Co-authored-by: Nathan Bezualem <nbez@amazon.com>
1 parent d1ec74e commit e1d8d46

4 files changed

Lines changed: 299 additions & 4 deletions

File tree

datafusion/ffi/src/physical_optimizer.rs

Lines changed: 225 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use async_trait::async_trait;
2222
use datafusion_common::config::ConfigOptions;
2323
use datafusion_common::error::Result;
24-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
24+
use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule};
2525
use datafusion_physical_plan::ExecutionPlan;
2626
use stabby::string::String as SString;
2727
use tokio::runtime::Handle;
@@ -31,6 +31,84 @@ use crate::execution_plan::FFI_ExecutionPlan;
3131
use crate::util::FFI_Result;
3232
use crate::{df_result, sresult_return};
3333

34+
/// A stable struct for sharing [`PhysicalOptimizerContext`] across FFI boundaries.
35+
///
36+
/// This provides access to configuration options for optimizer rules that need
37+
/// extended context beyond the plan itself.
38+
#[repr(C)]
39+
#[derive(Debug)]
40+
pub struct FFI_PhysicalOptimizerContext {
41+
pub config_options:
42+
unsafe extern "C" fn(&FFI_PhysicalOptimizerContext) -> FFI_ConfigOptions,
43+
44+
/// Release the memory of the private data.
45+
pub release: unsafe extern "C" fn(&mut FFI_PhysicalOptimizerContext),
46+
47+
/// Internal data. Only accessed by the provider.
48+
pub private_data: *const c_void,
49+
}
50+
51+
unsafe impl Send for FFI_PhysicalOptimizerContext {}
52+
unsafe impl Sync for FFI_PhysicalOptimizerContext {}
53+
54+
struct OptimizerContextPrivateData {
55+
config: ConfigOptions,
56+
}
57+
58+
impl FFI_PhysicalOptimizerContext {
59+
pub fn new(context: &dyn PhysicalOptimizerContext) -> Self {
60+
let private_data = Box::new(OptimizerContextPrivateData {
61+
config: context.config_options().clone(),
62+
});
63+
let private_data = Box::into_raw(private_data) as *const c_void;
64+
65+
Self {
66+
config_options: context_config_options_fn,
67+
release: context_release_fn,
68+
private_data,
69+
}
70+
}
71+
72+
fn inner(&self) -> &OptimizerContextPrivateData {
73+
unsafe { &*(self.private_data as *const OptimizerContextPrivateData) }
74+
}
75+
}
76+
77+
impl Drop for FFI_PhysicalOptimizerContext {
78+
fn drop(&mut self) {
79+
unsafe { (self.release)(self) }
80+
}
81+
}
82+
83+
unsafe extern "C" fn context_config_options_fn(
84+
ctx: &FFI_PhysicalOptimizerContext,
85+
) -> FFI_ConfigOptions {
86+
FFI_ConfigOptions::from(&ctx.inner().config)
87+
}
88+
89+
unsafe extern "C" fn context_release_fn(ctx: &mut FFI_PhysicalOptimizerContext) {
90+
if !ctx.private_data.is_null() {
91+
unsafe {
92+
let _ = Box::from_raw(ctx.private_data as *mut OptimizerContextPrivateData);
93+
}
94+
ctx.private_data = std::ptr::null();
95+
}
96+
}
97+
98+
/// Reconstructed [`PhysicalOptimizerContext`] on the consumer side of FFI.
99+
///
100+
/// `StatisticsRegistry` is not plumbed because it contains trait object vtables
101+
/// that are only valid within the originating library.
102+
struct ForeignOptimizerContext {
103+
config: ConfigOptions,
104+
}
105+
106+
impl PhysicalOptimizerContext for ForeignOptimizerContext {
107+
fn config_options(&self) -> &ConfigOptions {
108+
&self.config
109+
}
110+
}
111+
34112
/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries.
35113
#[repr(C)]
36114
#[derive(Debug)]
@@ -55,6 +133,12 @@ pub struct FFI_PhysicalOptimizerRule {
55133
/// Return the major DataFusion version number of this rule.
56134
pub version: unsafe extern "C" fn() -> u64,
57135

136+
pub optimize_with_context: unsafe extern "C" fn(
137+
&Self,
138+
plan: &FFI_ExecutionPlan,
139+
context: &FFI_PhysicalOptimizerContext,
140+
) -> FFI_Result<FFI_ExecutionPlan>,
141+
58142
/// Internal data. This is only to be accessed by the provider of the rule.
59143
/// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data.
60144
pub private_data: *mut c_void,
@@ -98,6 +182,23 @@ unsafe extern "C" fn optimize_fn_wrapper(
98182
FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
99183
}
100184

185+
unsafe extern "C" fn optimize_with_context_fn_wrapper(
186+
rule: &FFI_PhysicalOptimizerRule,
187+
plan: &FFI_ExecutionPlan,
188+
context: &FFI_PhysicalOptimizerContext,
189+
) -> FFI_Result<FFI_ExecutionPlan> {
190+
let runtime = rule.runtime();
191+
let inner = rule.inner();
192+
let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
193+
let config = sresult_return!(ConfigOptions::try_from(unsafe {
194+
(context.config_options)(context)
195+
}));
196+
let foreign_ctx = ForeignOptimizerContext { config };
197+
let optimized_plan = sresult_return!(inner.optimize_with_context(plan, &foreign_ctx));
198+
199+
FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
200+
}
201+
101202
unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString {
102203
let rule = rule.inner();
103204
rule.name().into()
@@ -127,6 +228,7 @@ unsafe extern "C" fn clone_fn_wrapper(
127228

128229
FFI_PhysicalOptimizerRule {
129230
optimize: optimize_fn_wrapper,
231+
optimize_with_context: optimize_with_context_fn_wrapper,
130232
name: name_fn_wrapper,
131233
schema_check: schema_check_fn_wrapper,
132234
clone: clone_fn_wrapper,
@@ -160,6 +262,7 @@ impl FFI_PhysicalOptimizerRule {
160262

161263
Self {
162264
optimize: optimize_fn_wrapper,
265+
optimize_with_context: optimize_with_context_fn_wrapper,
163266
name: name_fn_wrapper,
164267
schema_check: schema_check_fn_wrapper,
165268
clone: clone_fn_wrapper,
@@ -220,6 +323,24 @@ impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
220323
(&optimized_plan).try_into()
221324
}
222325

326+
fn optimize_with_context(
327+
&self,
328+
plan: Arc<dyn ExecutionPlan>,
329+
context: &dyn PhysicalOptimizerContext,
330+
) -> Result<Arc<dyn ExecutionPlan>> {
331+
let ffi_context = FFI_PhysicalOptimizerContext::new(context);
332+
let plan = FFI_ExecutionPlan::new(plan, None);
333+
334+
let optimized_plan = unsafe {
335+
df_result!((self.rule.optimize_with_context)(
336+
&self.rule,
337+
&plan,
338+
&ffi_context
339+
))?
340+
};
341+
(&optimized_plan).try_into()
342+
}
343+
223344
fn name(&self) -> &str {
224345
&self.name
225346
}
@@ -236,8 +357,11 @@ mod tests {
236357
use arrow::datatypes::{DataType, Field, Schema};
237358
use datafusion_common::config::ConfigOptions;
238359
use datafusion_common::error::Result;
239-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
360+
use datafusion_physical_optimizer::{
361+
ConfigOnlyContext, PhysicalOptimizerContext, PhysicalOptimizerRule,
362+
};
240363
use datafusion_physical_plan::ExecutionPlan;
364+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
241365

242366
use super::*;
243367
use crate::execution_plan::tests::EmptyExec;
@@ -265,6 +389,39 @@ mod tests {
265389
}
266390
}
267391

392+
/// A rule that returns an error from `optimize` but succeeds when
393+
/// called via `optimize_with_context`, proving the context path is taken.
394+
#[derive(Debug)]
395+
struct ContextAwareRule;
396+
397+
impl PhysicalOptimizerRule for ContextAwareRule {
398+
fn optimize(
399+
&self,
400+
_plan: Arc<dyn ExecutionPlan>,
401+
_config: &ConfigOptions,
402+
) -> Result<Arc<dyn ExecutionPlan>> {
403+
Err(datafusion_common::DataFusionError::Plan(
404+
"optimize should not be called directly".to_string(),
405+
))
406+
}
407+
408+
fn optimize_with_context(
409+
&self,
410+
plan: Arc<dyn ExecutionPlan>,
411+
_context: &dyn PhysicalOptimizerContext,
412+
) -> Result<Arc<dyn ExecutionPlan>> {
413+
Ok(plan)
414+
}
415+
416+
fn name(&self) -> &str {
417+
"context_aware_rule"
418+
}
419+
420+
fn schema_check(&self) -> bool {
421+
true
422+
}
423+
}
424+
268425
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
269426
let schema =
270427
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
@@ -374,4 +531,70 @@ mod tests {
374531

375532
Ok(())
376533
}
534+
535+
#[test]
536+
fn test_optimize_with_context_round_trip() -> Result<()> {
537+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
538+
Arc::new(ContextAwareRule);
539+
540+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
541+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
542+
543+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
544+
(&ffi_rule).into();
545+
546+
let plan = create_test_plan();
547+
let config = ConfigOptions::new();
548+
let context = ConfigOnlyContext::new(&config);
549+
550+
let optimized = foreign_rule.optimize_with_context(plan, &context)?;
551+
assert_eq!(optimized.name(), "empty-exec");
552+
553+
Ok(())
554+
}
555+
556+
/// Tests that `optimize_with_context` works even when the caller supplies a
557+
/// statistics registry. The registry cannot survive the FFI round-trip (it
558+
/// contains trait object vtables that are library-local), so the provider
559+
/// side will always see `None`. This test verifies the context-aware path
560+
/// still succeeds in that scenario.
561+
#[test]
562+
fn test_optimize_with_context_with_registry() -> Result<()> {
563+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
564+
Arc::new(ContextAwareRule);
565+
566+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
567+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
568+
569+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
570+
(&ffi_rule).into();
571+
572+
struct ContextWithRegistry {
573+
config: ConfigOptions,
574+
registry: StatisticsRegistry,
575+
}
576+
577+
impl PhysicalOptimizerContext for ContextWithRegistry {
578+
fn config_options(&self) -> &ConfigOptions {
579+
&self.config
580+
}
581+
582+
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
583+
Some(&self.registry)
584+
}
585+
}
586+
587+
let ctx = ContextWithRegistry {
588+
config: ConfigOptions::new(),
589+
registry: StatisticsRegistry::default_with_builtin_providers(),
590+
};
591+
592+
let plan = create_test_plan();
593+
// The optimize_with_context path works, but the registry is not
594+
// available on the provider side (it will be None).
595+
let optimized = foreign_rule.optimize_with_context(plan, &ctx)?;
596+
assert_eq!(optimized.name(), "empty-exec");
597+
598+
Ok(())
599+
}
377600
}

datafusion/ffi/src/tests/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ pub struct ForeignLibraryModule {
113113

114114
pub create_physical_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule,
115115

116+
pub create_context_aware_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule,
117+
116118
pub version: extern "C" fn() -> u64,
117119
}
118120

@@ -259,6 +261,8 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule {
259261
create_table_with_statistics,
260262
create_physical_optimizer_rule:
261263
physical_optimizer::create_physical_optimizer_rule,
264+
create_context_aware_optimizer_rule:
265+
physical_optimizer::create_context_aware_optimizer_rule,
262266
version: super::version,
263267
}
264268
}

datafusion/ffi/src/tests/physical_optimizer.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion_common::config::ConfigOptions;
2121
use datafusion_common::error::Result;
22-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
22+
use datafusion_physical_optimizer::{PhysicalOptimizerContext, PhysicalOptimizerRule};
2323
use datafusion_physical_plan::ExecutionPlan;
2424
use datafusion_physical_plan::limit::GlobalLimitExec;
2525

@@ -52,3 +52,45 @@ pub(crate) extern "C" fn create_physical_optimizer_rule() -> FFI_PhysicalOptimiz
5252
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(AddLimitRule);
5353
FFI_PhysicalOptimizerRule::new(rule, None)
5454
}
55+
56+
/// A rule that returns an error from `optimize()` (proving the context path must
57+
/// be taken) but succeeds in `optimize_with_context()` by wrapping the plan in a
58+
/// `GlobalLimitExec`.
59+
#[derive(Debug)]
60+
struct ContextAwareAddLimitRule;
61+
62+
impl PhysicalOptimizerRule for ContextAwareAddLimitRule {
63+
fn optimize(
64+
&self,
65+
_plan: Arc<dyn ExecutionPlan>,
66+
_config: &ConfigOptions,
67+
) -> Result<Arc<dyn ExecutionPlan>> {
68+
Err(datafusion_common::DataFusionError::Plan(
69+
"optimize should not be called directly; use optimize_with_context"
70+
.to_string(),
71+
))
72+
}
73+
74+
fn optimize_with_context(
75+
&self,
76+
plan: Arc<dyn ExecutionPlan>,
77+
_context: &dyn PhysicalOptimizerContext,
78+
) -> Result<Arc<dyn ExecutionPlan>> {
79+
Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(10))))
80+
}
81+
82+
fn name(&self) -> &str {
83+
"context_aware_add_limit_rule"
84+
}
85+
86+
fn schema_check(&self) -> bool {
87+
true
88+
}
89+
}
90+
91+
pub(crate) extern "C" fn create_context_aware_optimizer_rule() -> FFI_PhysicalOptimizerRule
92+
{
93+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
94+
Arc::new(ContextAwareAddLimitRule);
95+
FFI_PhysicalOptimizerRule::new(rule, None)
96+
}

0 commit comments

Comments
 (0)