Skip to content

Commit 8e23f2c

Browse files
Initial implementation of FFI_QueryPlanner
Co-authored-by: Tim Saucer <timsaucer@gmail.com>
1 parent 937dfda commit 8e23f2c

6 files changed

Lines changed: 528 additions & 3 deletions

File tree

datafusion/core/src/execution/context/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! [`SessionContext`] API for registering data sources and executing queries
1919
20+
use std::any::Any;
2021
use std::collections::HashSet;
2122
use std::fmt::Debug;
2223
use std::sync::{Arc, Weak};
@@ -2073,7 +2074,7 @@ impl From<SessionContext> for SessionStateBuilder {
20732074

20742075
/// A planner used to add extensions to DataFusion logical and physical plans.
20752076
#[async_trait]
2076-
pub trait QueryPlanner: Debug {
2077+
pub trait QueryPlanner: Any + Debug {
20772078
/// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution
20782079
async fn create_physical_plan(
20792080
&self,

datafusion/ffi/src/session/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ use datafusion_expr::{
3737
};
3838
use datafusion_physical_expr::PhysicalExpr;
3939
use datafusion_physical_plan::ExecutionPlan;
40-
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
40+
use datafusion_proto::bytes::{
41+
logical_plan_from_bytes, logical_plan_to_bytes_with_extension_codec,
42+
};
4143
use datafusion_proto::logical_plan::LogicalExtensionCodec;
4244
use datafusion_proto::logical_plan::from_proto::parse_expr;
4345
use datafusion_proto::logical_plan::to_proto::serialize_expr;
@@ -63,6 +65,7 @@ use crate::util::FFI_Result;
6365
use crate::{df_result, sresult, sresult_return};
6466

6567
pub mod config;
68+
pub mod planner;
6669

6770
/// A stable struct for sharing [`Session`] across FFI boundaries.
6871
///
@@ -550,8 +553,10 @@ impl Session for ForeignSession {
550553
&self,
551554
logical_plan: &LogicalPlan,
552555
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
556+
let codec: Arc<dyn LogicalExtensionCodec> = (&self.session.logical_codec).into();
553557
unsafe {
554-
let logical_plan = logical_plan_to_bytes(logical_plan)?;
558+
let logical_plan =
559+
logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?;
555560
let physical_plan = df_result!(
556561
(self.session.create_physical_plan)(
557562
&self.session,
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::ffi::c_void;
19+
use std::sync::Arc;
20+
21+
use async_ffi::{FfiFuture, FutureExt};
22+
use async_trait::async_trait;
23+
use datafusion_common::DataFusionError;
24+
use datafusion_execution::TaskContext;
25+
use datafusion_expr::LogicalPlan;
26+
use datafusion_physical_plan::ExecutionPlan;
27+
use datafusion_proto::bytes::{
28+
logical_plan_from_bytes_with_extension_codec,
29+
logical_plan_to_bytes_with_extension_codec,
30+
};
31+
use datafusion_proto::logical_plan::{
32+
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
33+
};
34+
use datafusion_session::Session;
35+
use stabby::vec::Vec as SVec;
36+
use tokio::runtime::Handle;
37+
38+
use crate::execution::FFI_TaskContextProvider;
39+
use crate::execution_plan::FFI_ExecutionPlan;
40+
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
41+
use crate::session::{FFI_SessionRef, ForeignSession};
42+
use crate::util::FFI_Result;
43+
use crate::{df_result, sresult, sresult_return};
44+
45+
/// FFI-compatible variant of `QueryPlanner` that accepts a `&dyn Session` instead of a
46+
/// concrete session type.
47+
///
48+
/// `QueryPlanner` requires a specific `SessionState` type, which cannot cross an FFI
49+
/// boundary. `QueryPlannerWeak` relaxes that requirement by accepting `&dyn Session`, making
50+
/// it possible for a planner implemented in a foreign library to participate in DataFusion's
51+
/// planning pipeline.
52+
///
53+
/// # Usage
54+
///
55+
/// This trait is not registered with DataFusion directly. Instead, wrap it in a
56+
/// `QueryPlanner` implementation (e.g. `DynamicForeignQueryPlaner` in the integration
57+
/// tests) and register that wrapper with the `SessionContext`.
58+
#[async_trait]
59+
pub trait QueryPlannerWeak: std::fmt::Debug + Send + Sync {
60+
/// Converts a [LogicalPlan] into an [ExecutionPlan] suitable for execution.
61+
///
62+
/// Mirrors `QueryPlanner::create_physical_plan` but receives session as
63+
/// `&dyn Session` rather than `&SessionState`.
64+
async fn create_physical_plan(
65+
&self,
66+
logical_plan: &LogicalPlan,
67+
session: &dyn Session,
68+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>>;
69+
}
70+
71+
#[repr(C)]
72+
#[derive(Debug)]
73+
pub struct FFI_QueryPlanner {
74+
// it would make sense, for ballista, to get access to this method.
75+
// as the plan is going to be decode at the scheduler side.
76+
//
77+
// at the moment,FFI_SessionRef is not public, so we'd need to change it
78+
/// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution
79+
create_physical_plan:
80+
unsafe extern "C" fn(
81+
&Self,
82+
logical_plan_serialized: SVec<u8>,
83+
session: FFI_SessionRef,
84+
) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>>,
85+
86+
/// Logical codec used to provide encoding and decoding of plans.
87+
pub logical_codec: FFI_LogicalExtensionCodec,
88+
89+
/// Release the memory of the private data when it is no longer being used.
90+
pub release: unsafe extern "C" fn(arg: &mut Self),
91+
92+
/// Internal data. This is only to be accessed by the planner of the plan.
93+
/// The foreign library should never attempt to access this data.
94+
pub private_data: *mut c_void,
95+
96+
/// Used to create a clone on the planner . This should
97+
/// only need to be called by the receiver of the plan.
98+
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
99+
100+
/// Utility to identify when FFI objects are accessed locally through
101+
/// the foreign interface. See [`crate::get_library_marker_id`] and
102+
/// the crate's `README.md` for more information.
103+
pub library_marker_id: extern "C" fn() -> usize,
104+
}
105+
106+
unsafe impl Send for FFI_QueryPlanner {}
107+
unsafe impl Sync for FFI_QueryPlanner {}
108+
109+
struct QueryPlannerPrivateData {
110+
planner: Arc<dyn QueryPlannerWeak>,
111+
}
112+
113+
impl FFI_QueryPlanner {
114+
fn inner(&self) -> &Arc<dyn QueryPlannerWeak> {
115+
unsafe {
116+
let private_data = self.private_data as *const QueryPlannerPrivateData;
117+
&(*private_data).planner
118+
}
119+
}
120+
}
121+
122+
unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_QueryPlanner) {
123+
unsafe {
124+
let private_data =
125+
Box::from_raw(ctx.private_data as *mut QueryPlannerPrivateData);
126+
drop(private_data);
127+
}
128+
}
129+
130+
unsafe extern "C" fn create_physical_plan_fn_wrapper(
131+
planner: &FFI_QueryPlanner,
132+
logical_plan_serialized: SVec<u8>,
133+
session: FFI_SessionRef,
134+
) -> FfiFuture<FFI_Result<FFI_ExecutionPlan>> {
135+
unsafe {
136+
let planner = Arc::clone(planner.inner());
137+
let codec: Arc<dyn LogicalExtensionCodec> = (&session.logical_codec).into();
138+
let runtime = session.runtime().clone();
139+
140+
async move {
141+
let mut foreign_session = None;
142+
let session = sresult_return!(
143+
session
144+
.as_local()
145+
.map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
146+
.unwrap_or_else(|| {
147+
foreign_session = Some(ForeignSession::try_from(&session)?);
148+
Ok(foreign_session.as_ref().unwrap())
149+
})
150+
);
151+
152+
let task_ctx: Arc<TaskContext> = session.task_ctx();
153+
154+
let logical_plan =
155+
sresult_return!(logical_plan_from_bytes_with_extension_codec(
156+
logical_plan_serialized.as_slice(),
157+
task_ctx.as_ref(),
158+
codec.as_ref()
159+
));
160+
161+
let physical_plan =
162+
planner.create_physical_plan(&logical_plan, session).await;
163+
164+
sresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime)))
165+
}
166+
.into_ffi()
167+
}
168+
}
169+
170+
unsafe extern "C" fn clone_fn_wrapper(planner: &FFI_QueryPlanner) -> FFI_QueryPlanner {
171+
let codec = planner.logical_codec.clone();
172+
let planner = Arc::clone(planner.inner());
173+
FFI_QueryPlanner::new_with_ffi_codec(planner, codec)
174+
}
175+
176+
impl Drop for FFI_QueryPlanner {
177+
fn drop(&mut self) {
178+
unsafe { (self.release)(self) }
179+
}
180+
}
181+
182+
impl FFI_QueryPlanner {
183+
pub fn new(
184+
planner: Arc<dyn QueryPlannerWeak>,
185+
runtime: Option<&Handle>,
186+
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
187+
logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
188+
) -> Self {
189+
let logical_codec =
190+
logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
191+
let logical_codec = FFI_LogicalExtensionCodec::new(
192+
logical_codec,
193+
runtime.cloned(),
194+
task_ctx_provider.into(),
195+
);
196+
197+
Self::new_with_ffi_codec(planner, logical_codec)
198+
}
199+
200+
pub fn new_with_ffi_codec(
201+
planner: Arc<dyn QueryPlannerWeak>,
202+
codec: FFI_LogicalExtensionCodec,
203+
) -> Self {
204+
let private_data = Box::new(QueryPlannerPrivateData { planner });
205+
206+
Self {
207+
create_physical_plan: create_physical_plan_fn_wrapper,
208+
logical_codec: codec,
209+
clone: clone_fn_wrapper,
210+
release: release_fn_wrapper,
211+
private_data: Box::into_raw(private_data) as *mut c_void,
212+
library_marker_id: crate::get_library_marker_id,
213+
}
214+
}
215+
}
216+
217+
impl Clone for FFI_QueryPlanner {
218+
fn clone(&self) -> Self {
219+
unsafe { (self.clone)(self) }
220+
}
221+
}
222+
223+
#[derive(Debug)]
224+
pub struct ForeignQueryPlanner(pub FFI_QueryPlanner);
225+
226+
impl From<&FFI_QueryPlanner> for Arc<dyn QueryPlannerWeak> {
227+
fn from(planner: &FFI_QueryPlanner) -> Self {
228+
if (planner.library_marker_id)() == crate::get_library_marker_id() {
229+
Arc::clone(planner.inner())
230+
} else {
231+
Arc::new(ForeignQueryPlanner(planner.clone()))
232+
}
233+
}
234+
}
235+
236+
#[async_trait]
237+
impl QueryPlannerWeak for ForeignQueryPlanner {
238+
/// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution
239+
async fn create_physical_plan(
240+
&self,
241+
logical_plan: &LogicalPlan,
242+
session_state: &dyn Session,
243+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
244+
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
245+
let logical_plan_buf =
246+
logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?;
247+
248+
let session_ref =
249+
FFI_SessionRef::new(session_state, None, self.0.logical_codec.clone());
250+
251+
let plan = df_result!(unsafe {
252+
(self.0.create_physical_plan)(
253+
&self.0,
254+
logical_plan_buf.as_ref().into(),
255+
session_ref,
256+
)
257+
.await
258+
})?;
259+
260+
Ok((&plan).try_into()?)
261+
}
262+
}
263+
264+
#[cfg(test)]
265+
mod tests {
266+
use std::sync::Arc;
267+
268+
use datafusion_expr::LogicalPlan;
269+
use datafusion_physical_plan::ExecutionPlan;
270+
use datafusion_physical_plan::empty::EmptyExec;
271+
use datafusion_session::Session;
272+
273+
use crate::session::planner::{FFI_QueryPlanner, QueryPlannerWeak};
274+
275+
#[derive(Debug, Default)]
276+
struct DummyPlanner {}
277+
278+
#[async_trait::async_trait]
279+
impl QueryPlannerWeak for DummyPlanner {
280+
async fn create_physical_plan(
281+
&self,
282+
logical_plan: &LogicalPlan,
283+
_session_state: &dyn Session,
284+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
285+
let schema = logical_plan.schema().as_arrow().clone();
286+
// will need better test
287+
Ok(Arc::new(EmptyExec::new(Arc::new(schema))))
288+
}
289+
}
290+
291+
#[tokio::test]
292+
async fn test_end_to_end() -> datafusion::common::Result<()> {
293+
let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
294+
295+
let df = ctx.sql("select 1 as i").await?;
296+
let logical_plan = df.logical_plan();
297+
298+
let planner: Arc<dyn QueryPlannerWeak> = Arc::new(DummyPlanner::default());
299+
300+
let mut ffi_planner =
301+
FFI_QueryPlanner::new(planner, None, task_ctx_provider, None);
302+
ffi_planner.library_marker_id = crate::mock_foreign_marker_id;
303+
304+
let foreign_planner: Arc<dyn QueryPlannerWeak> = (&ffi_planner).into();
305+
306+
let empty_exec = foreign_planner
307+
.create_physical_plan(logical_plan, &ctx.state())
308+
.await?;
309+
310+
assert!(empty_exec.downcast_ref::<EmptyExec>().is_some());
311+
Ok(())
312+
}
313+
}

datafusion/ffi/src/tests/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::execution_plan::FFI_ExecutionPlan;
3535
use crate::execution_plan::tests::EmptyExec;
3636
use crate::physical_optimizer::FFI_PhysicalOptimizerRule;
3737
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
38+
use crate::session::planner::FFI_QueryPlanner;
3839
use crate::table_provider::FFI_TableProvider;
3940
use crate::table_provider_factory::FFI_TableProviderFactory;
4041
use crate::tests::catalog::create_catalog_provider_list;
@@ -47,6 +48,7 @@ mod async_provider;
4748
pub mod catalog;
4849
pub mod config;
4950
mod physical_optimizer;
51+
mod planner;
5052
mod sync_provider;
5153
mod table_provider_factory;
5254
mod udf_udaf_udwf;
@@ -100,6 +102,9 @@ pub struct ForeignLibraryModule {
100102

101103
pub create_physical_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule,
102104

105+
pub create_query_planner:
106+
extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_QueryPlanner,
107+
103108
pub version: extern "C" fn() -> u64,
104109
}
105110

@@ -164,6 +169,7 @@ pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule {
164169
create_empty_exec,
165170
create_physical_optimizer_rule:
166171
physical_optimizer::create_physical_optimizer_rule,
172+
create_query_planner: planner::create_query_planner,
167173
version: super::version,
168174
}
169175
}

0 commit comments

Comments
 (0)