Skip to content

Commit 9c1e7ab

Browse files
xudong963claude
andauthored
Refactor: expose predicate constant inference from physical-expr (#21167)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The predicate constant inference logic is helpful, we also use it in in our product, but we just copied that becuase it's now in the filter's method. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> The PR extracts the related logic into a helper method in physical expr. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ccaf802 commit 9c1e7ab

File tree

2 files changed

+98
-56
lines changed

2 files changed

+98
-56
lines changed

datafusion/physical-expr/src/utils/mod.rs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ pub use guarantee::{Guarantee, LiteralGuarantee};
2121
use std::borrow::Borrow;
2222
use std::sync::Arc;
2323

24-
use crate::PhysicalExpr;
25-
use crate::PhysicalSortExpr;
26-
use crate::expressions::{BinaryExpr, Column};
24+
use crate::expressions::{BinaryExpr, Column, Literal};
2725
use crate::tree_node::ExprContext;
26+
use crate::{
27+
AcrossPartitions, ConstExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
28+
};
2829

2930
use arrow::datatypes::Schema;
3031
use datafusion_common::tree_node::{
@@ -45,6 +46,66 @@ pub fn split_conjunction(
4546
split_impl(Operator::And, predicate, vec![])
4647
}
4748

49+
impl ConstExpr {
50+
/// Collects predicate-derived constants from equality conjunctions.
51+
///
52+
/// For each equality predicate of the form `lhs = rhs`, if either side is
53+
/// already known constant according to `input_eqs`, or is a literal, then
54+
/// the other side is also constant and will be returned as a [`ConstExpr`].
55+
///
56+
/// Literals are treated as uniform constants across partitions, so
57+
/// `col = literal` produces a constant for `col` with the literal value.
58+
///
59+
/// For example, given predicate `a = 5 AND b = c` where `c` is already
60+
/// known constant, this returns constants for both `a` (Uniform with value
61+
/// 5) and `b` (propagating `c`'s across-partitions value).
62+
pub fn collect_predicate_constants(
63+
input_eqs: &EquivalenceProperties,
64+
predicate: &Arc<dyn PhysicalExpr>,
65+
) -> Vec<ConstExpr> {
66+
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
67+
/// either already known constant in `input_eqs`, or a `Literal`
68+
/// (which is inherently constant across all partitions).
69+
fn expr_constant_or_literal(
70+
expr: &Arc<dyn PhysicalExpr>,
71+
input_eqs: &EquivalenceProperties,
72+
) -> Option<AcrossPartitions> {
73+
input_eqs.is_expr_constant(expr).or_else(|| {
74+
expr.as_any()
75+
.downcast_ref::<Literal>()
76+
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
77+
})
78+
}
79+
80+
let mut constants = Vec::new();
81+
for conjunction in split_conjunction(predicate) {
82+
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
83+
&& binary.op() == &Operator::Eq
84+
{
85+
// Check if either side is constant — either already known
86+
// constant from the input equivalence properties, or a literal
87+
// value (which is inherently constant across all partitions).
88+
let left_const = expr_constant_or_literal(binary.left(), input_eqs);
89+
let right_const = expr_constant_or_literal(binary.right(), input_eqs);
90+
91+
if let Some(left_across) = left_const {
92+
// LEFT is constant, so RIGHT must also be constant.
93+
// Use RIGHT's known across value if available, otherwise
94+
// propagate LEFT's (e.g. Uniform from a literal).
95+
let across = right_const.unwrap_or(left_across);
96+
constants.push(ConstExpr::new(Arc::clone(binary.right()), across));
97+
} else if let Some(right_across) = right_const {
98+
// RIGHT is constant, so LEFT must also be constant.
99+
constants
100+
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
101+
}
102+
}
103+
}
104+
105+
constants
106+
}
107+
}
108+
48109
/// Create a conjunction of the given predicates.
49110
/// If the input is empty, return a literal true.
50111
/// If the input contains a single predicate, return the predicate.
@@ -559,4 +620,31 @@ pub(crate) mod tests {
559620
assert_eq!(collect_columns(&expr3), expected);
560621
Ok(())
561622
}
623+
624+
#[test]
625+
fn test_collect_predicate_constants_propagates_uniform_literal_value() -> Result<()> {
626+
let schema = Arc::new(Schema::new(vec![Field::new(
627+
"ticker",
628+
DataType::Utf8,
629+
false,
630+
)]));
631+
let predicate = binary(
632+
col("ticker", schema.as_ref())?,
633+
Operator::Eq,
634+
lit(ScalarValue::Utf8(Some("NGJ26".to_string()))),
635+
schema.as_ref(),
636+
)?;
637+
let eq_properties = EquivalenceProperties::new(schema);
638+
639+
let constants =
640+
ConstExpr::collect_predicate_constants(&eq_properties, &predicate);
641+
642+
assert_eq!(constants.len(), 1);
643+
assert_eq!(
644+
constants[0].across_partitions,
645+
AcrossPartitions::Uniform(Some(ScalarValue::Utf8(Some("NGJ26".to_string()))))
646+
);
647+
648+
Ok(())
649+
}
562650
}

datafusion/physical-plan/src/filter.rs

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ use datafusion_common::{
5959
use datafusion_execution::TaskContext;
6060
use datafusion_expr::Operator;
6161
use datafusion_physical_expr::equivalence::ProjectionMapping;
62-
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
62+
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
6363
use datafusion_physical_expr::intervals::utils::check_support;
6464
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6565
use datafusion_physical_expr::{
66-
AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
67-
PhysicalExpr, analyze, conjunction, split_conjunction,
66+
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
67+
conjunction, split_conjunction,
6868
};
6969

7070
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -350,55 +350,6 @@ impl FilterExec {
350350
})
351351
}
352352

353-
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
354-
/// either already known constant in `input_eqs`, or a `Literal`
355-
/// (which is inherently constant across all partitions).
356-
fn expr_constant_or_literal(
357-
expr: &Arc<dyn PhysicalExpr>,
358-
input_eqs: &EquivalenceProperties,
359-
) -> Option<AcrossPartitions> {
360-
input_eqs.is_expr_constant(expr).or_else(|| {
361-
expr.as_any()
362-
.downcast_ref::<Literal>()
363-
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
364-
})
365-
}
366-
367-
fn extend_constants(
368-
input: &Arc<dyn ExecutionPlan>,
369-
predicate: &Arc<dyn PhysicalExpr>,
370-
) -> Vec<ConstExpr> {
371-
let mut res_constants = Vec::new();
372-
let input_eqs = input.equivalence_properties();
373-
374-
let conjunctions = split_conjunction(predicate);
375-
for conjunction in conjunctions {
376-
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
377-
&& binary.op() == &Operator::Eq
378-
{
379-
// Check if either side is constant — either already known
380-
// constant from the input equivalence properties, or a literal
381-
// value (which is inherently constant across all partitions).
382-
let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
383-
let right_const =
384-
Self::expr_constant_or_literal(binary.right(), input_eqs);
385-
386-
if let Some(left_across) = left_const {
387-
// LEFT is constant, so RIGHT must also be constant.
388-
// Use RIGHT's known across value if available, otherwise
389-
// propagate LEFT's (e.g. Uniform from a literal).
390-
let across = right_const.unwrap_or(left_across);
391-
res_constants
392-
.push(ConstExpr::new(Arc::clone(binary.right()), across));
393-
} else if let Some(right_across) = right_const {
394-
// RIGHT is constant, so LEFT must also be constant.
395-
res_constants
396-
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
397-
}
398-
}
399-
}
400-
res_constants
401-
}
402353
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
403354
fn compute_properties(
404355
input: &Arc<dyn ExecutionPlan>,
@@ -436,7 +387,10 @@ impl FilterExec {
436387
eq_properties.add_constants(constants)?;
437388
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
438389
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
439-
eq_properties.add_constants(Self::extend_constants(input, predicate))?;
390+
eq_properties.add_constants(ConstExpr::collect_predicate_constants(
391+
input.equivalence_properties(),
392+
predicate,
393+
))?;
440394

441395
let mut output_partitioning = input.output_partitioning().clone();
442396
// If contains projection, update the PlanProperties.

0 commit comments

Comments
 (0)