Skip to content

Commit 431445e

Browse files
LLDayaskalt
authored andcommitted
feat: support placeholders in execution plans
Introduces `PlaceholderExpr`, allowing placeholder parameters to be preserved in the physical plan. Previously, placeholders had to be resolved to literals before physical planning.
1 parent 801caa2 commit 431445e

10 files changed

Lines changed: 832 additions & 20 deletions

File tree

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod literal;
3232
mod negative;
3333
mod no_op;
3434
mod not;
35+
mod placeholder;
3536
mod try_cast;
3637
mod unknown_column;
3738

@@ -54,5 +55,6 @@ pub use literal::{Literal, lit};
5455
pub use negative::{NegativeExpr, negative};
5556
pub use no_op::NoOp;
5657
pub use not::{NotExpr, not};
58+
pub use placeholder::{PlaceholderExpr, has_placeholders, placeholder};
5759
pub use try_cast::{TryCastExpr, try_cast};
5860
pub use unknown_column::UnKnownColumn;
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Placeholder expression.
19+
20+
use std::{
21+
any::Any,
22+
fmt::{self, Formatter},
23+
sync::Arc,
24+
};
25+
26+
use arrow::{
27+
array::RecordBatch,
28+
datatypes::{DataType, Field, FieldRef, Schema},
29+
};
30+
use datafusion_common::{
31+
DataFusionError, Result, exec_datafusion_err, tree_node::TreeNode,
32+
};
33+
use datafusion_expr::ColumnarValue;
34+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35+
use std::hash::Hash;
36+
37+
/// Physical expression representing a placeholder parameter (e.g., $1, $2, or named parameters) in
38+
/// the physical plan.
39+
///
40+
/// This expression serves as a placeholder that will be resolved to a literal value during
41+
/// execution. It should not be evaluated directly.
42+
#[derive(Debug, PartialEq, Eq, Hash)]
43+
pub struct PlaceholderExpr {
44+
/// Placeholder id, e.g. $1 or $a.
45+
pub id: String,
46+
/// Derived from expression where placeholder is met.
47+
pub field: Option<FieldRef>,
48+
}
49+
50+
impl PlaceholderExpr {
51+
/// Create a new placeholder expression.
52+
pub fn new(id: String, data_type: DataType) -> Self {
53+
let field = Arc::new(Field::new("", data_type, true));
54+
Self::new_with_field(id, field)
55+
}
56+
57+
/// Create a new placeholders expression from a field.
58+
pub fn new_with_field(id: String, field: FieldRef) -> Self {
59+
Self {
60+
id,
61+
field: Some(field),
62+
}
63+
}
64+
65+
/// Create a new placeholder expression without a specified data type.
66+
pub fn new_without_data_type(id: String) -> Self {
67+
Self { id, field: None }
68+
}
69+
70+
fn execution_error(&self) -> DataFusionError {
71+
exec_datafusion_err!(
72+
"Placeholder '{}' was not provided a value for execution.",
73+
self.id
74+
)
75+
}
76+
}
77+
78+
/// Create a placeholder expression.
79+
pub fn placeholder<I: Into<String>>(id: I, data_type: DataType) -> Arc<dyn PhysicalExpr> {
80+
Arc::new(PlaceholderExpr::new(id.into(), data_type))
81+
}
82+
83+
/// Returns `true` if expression has placeholders.
84+
pub fn has_placeholders(expr: &Arc<dyn PhysicalExpr>) -> bool {
85+
expr.exists(|e| Ok(e.as_any().is::<PlaceholderExpr>()))
86+
.expect("do not return errors")
87+
}
88+
89+
impl fmt::Display for PlaceholderExpr {
90+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
91+
write!(f, "{}", self.id)
92+
}
93+
}
94+
95+
impl PhysicalExpr for PlaceholderExpr {
96+
fn as_any(&self) -> &dyn Any {
97+
self
98+
}
99+
100+
fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
101+
self.field
102+
.as_ref()
103+
.map(Arc::clone)
104+
.ok_or_else(|| self.execution_error())
105+
}
106+
107+
fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
108+
Err(self.execution_error())
109+
}
110+
111+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
112+
vec![]
113+
}
114+
115+
fn with_new_children(
116+
self: Arc<Self>,
117+
children: Vec<Arc<dyn PhysicalExpr>>,
118+
) -> Result<Arc<dyn PhysicalExpr>> {
119+
assert!(children.is_empty());
120+
Ok(self)
121+
}
122+
123+
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result {
124+
fmt::Display::fmt(self, f)
125+
}
126+
}

datafusion/physical-expr/src/planner.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use std::sync::Arc;
2020
use crate::ScalarFunctionExpr;
2121
use crate::{
2222
PhysicalExpr,
23-
expressions::{self, Column, Literal, binary, like, similar_to},
23+
expressions::{self, Column, Literal, PlaceholderExpr, binary, like, similar_to},
2424
};
2525

26-
use arrow::datatypes::Schema;
26+
use arrow::datatypes::{DataType, Schema};
2727
use datafusion_common::config::ConfigOptions;
2828
use datafusion_common::metadata::{FieldMetadata, format_type_and_metadata};
2929
use datafusion_common::{
@@ -301,11 +301,15 @@ pub fn create_physical_expr(
301301
);
302302
}
303303

304-
expressions::cast(
305-
create_physical_expr(expr, input_dfschema, execution_props)?,
306-
input_schema,
307-
field.data_type().clone(),
308-
)
304+
let data_type = field.data_type().clone();
305+
let mut expr = create_physical_expr(expr, input_dfschema, execution_props)?;
306+
if let Some(placeholder) = expr.as_any().downcast_ref::<PlaceholderExpr>()
307+
&& placeholder.field.is_none()
308+
{
309+
expr = expressions::placeholder(&placeholder.id, data_type.clone());
310+
}
311+
312+
expressions::cast(expr, input_schema, data_type)
309313
}
310314
Expr::TryCast(TryCast { expr, field }) => {
311315
if !field.metadata().is_empty() {
@@ -320,11 +324,16 @@ pub fn create_physical_expr(
320324
);
321325
}
322326

323-
expressions::try_cast(
324-
create_physical_expr(expr, input_dfschema, execution_props)?,
325-
input_schema,
326-
field.data_type().clone(),
327-
)
327+
let mut expr = create_physical_expr(expr, input_dfschema, execution_props)?;
328+
if let Some(placeholder) = expr.as_any().downcast_ref::<PlaceholderExpr>()
329+
&& placeholder.field.is_none()
330+
{
331+
// To maintain try_cast behavior, we initially resolve the placeholder with the
332+
// Utf8 data type.
333+
expr = expressions::placeholder(&placeholder.id, DataType::Utf8);
334+
}
335+
336+
expressions::try_cast(expr, input_schema, field.data_type().clone())
328337
}
329338
Expr::Not(expr) => {
330339
expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
@@ -409,9 +418,13 @@ pub fn create_physical_expr(
409418
expressions::in_list(value_expr, list_exprs, negated, input_schema)
410419
}
411420
},
412-
Expr::Placeholder(Placeholder { id, .. }) => {
413-
exec_err!("Placeholder '{id}' was not provided a value for execution.")
414-
}
421+
Expr::Placeholder(Placeholder { id, field }) => match field {
422+
Some(field) => Ok(Arc::new(PlaceholderExpr::new_with_field(
423+
id.clone(),
424+
Arc::clone(field),
425+
))),
426+
None => Ok(Arc::new(PlaceholderExpr::new_without_data_type(id.clone()))),
427+
},
415428
other => {
416429
not_impl_err!("Physical plan does not support logical expression {other:?}")
417430
}

datafusion/physical-expr/src/simplifier/const_evaluator.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion_common::{Result, ScalarValue};
2727
use datafusion_expr_common::columnar_value::ColumnarValue;
2828

2929
use crate::PhysicalExpr;
30-
use crate::expressions::{Column, Literal};
30+
use crate::expressions::{Column, Literal, PlaceholderExpr};
3131

3232
/// Simplify expressions that consist only of literals by evaluating them.
3333
///
@@ -157,7 +157,10 @@ fn can_evaluate_as_constant(expr: &Arc<dyn PhysicalExpr>) -> bool {
157157
let mut can_evaluate = true;
158158

159159
expr.apply(|e| {
160-
if e.as_any().is::<Column>() || e.is_volatile_node() {
160+
if e.as_any().is::<Column>()
161+
|| e.is_volatile_node()
162+
|| e.as_any().is::<PlaceholderExpr>()
163+
{
161164
can_evaluate = false;
162165
Ok(TreeNodeRecursion::Stop)
163166
} else {

datafusion/proto/proto/datafusion.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,8 @@ message PhysicalExprNode {
920920
UnknownColumn unknown_column = 20;
921921

922922
PhysicalHashExprNode hash_expr = 21;
923+
924+
PhysicalPlaceholderNode placeholder_expr = 22;
923925
}
924926
}
925927

@@ -1047,6 +1049,11 @@ message PhysicalHashExprNode {
10471049
string description = 6;
10481050
}
10491051

1052+
message PhysicalPlaceholderNode {
1053+
string id = 1;
1054+
optional datafusion_common.Field field = 2;
1055+
}
1056+
10501057
message FilterExecNode {
10511058
PhysicalPlanNode input = 1;
10521059
PhysicalExprNode expr = 2;

0 commit comments

Comments
 (0)