Skip to content

Commit dfdd011

Browse files
committed
Revert to 787d0a4
1 parent ff034f2 commit dfdd011

File tree

3 files changed

+91
-117
lines changed

3 files changed

+91
-117
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 47 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -285,27 +285,26 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
285285
Ok(is_evaluate)
286286
}
287287

288-
fn is_scalar_subquery_cross_join(join: &Join) -> bool {
289-
fn is_scalar_aggregate_or_derived_relation(plan: &LogicalPlan) -> (bool, bool) {
290-
match plan {
291-
LogicalPlan::SubqueryAlias(subquery_alias) => {
292-
let (is_scalar_aggregate, _) = is_scalar_aggregate_or_derived_relation(
293-
subquery_alias.input.as_ref(),
294-
);
295-
(is_scalar_aggregate, true)
296-
}
297-
LogicalPlan::Projection(projection) => {
298-
is_scalar_aggregate_or_derived_relation(projection.input.as_ref())
299-
}
300-
LogicalPlan::Aggregate(aggregate) => (aggregate.group_expr.is_empty(), false),
301-
_ => (false, false),
288+
fn classify_join_input(plan: &LogicalPlan) -> (bool, bool) {
289+
match plan {
290+
LogicalPlan::SubqueryAlias(subquery_alias) => {
291+
let (is_scalar_aggregate, _) =
292+
classify_join_input(subquery_alias.input.as_ref());
293+
(is_scalar_aggregate, true)
294+
}
295+
LogicalPlan::Projection(projection) => {
296+
classify_join_input(projection.input.as_ref())
302297
}
298+
LogicalPlan::Aggregate(aggregate) => (aggregate.group_expr.is_empty(), false),
299+
_ => (false, false),
303300
}
301+
}
304302

303+
fn is_scalar_subquery_cross_join(join: &Join) -> bool {
305304
let (left_scalar_aggregate, left_is_derived_relation) =
306-
is_scalar_aggregate_or_derived_relation(join.left.as_ref());
305+
classify_join_input(join.left.as_ref());
307306
let (right_scalar_aggregate, right_is_derived_relation) =
308-
is_scalar_aggregate_or_derived_relation(join.right.as_ref());
307+
classify_join_input(join.right.as_ref());
309308
join.on.is_empty()
310309
&& join.filter.is_none()
311310
&& ((left_scalar_aggregate && right_is_derived_relation)
@@ -456,12 +455,9 @@ fn push_down_all_join(
456455
let keep_mixed_scalar_subquery_filters =
457456
is_inner_join && is_scalar_subquery_cross_join(&join);
458457
for predicate in predicates {
459-
let left_only = left_preserved && checker.is_left_only(&predicate);
460-
let right_only =
461-
!left_only && right_preserved && checker.is_right_only(&predicate);
462-
if left_only {
458+
if left_preserved && checker.is_left_only(&predicate) {
463459
left_push.push(predicate);
464-
} else if right_only {
460+
} else if right_preserved && checker.is_right_only(&predicate) {
465461
right_push.push(predicate);
466462
} else if is_inner_join
467463
&& !keep_mixed_scalar_subquery_filters
@@ -487,63 +483,43 @@ fn push_down_all_join(
487483
let mut on_filter_join_conditions = vec![];
488484
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type);
489485

490-
for on in on_filter {
491-
if on_left_preserved && checker.is_left_only(&on) {
492-
left_push.push(on)
493-
} else if on_right_preserved && checker.is_right_only(&on) {
494-
right_push.push(on)
495-
} else {
496-
on_filter_join_conditions.push(on)
486+
if !on_filter.is_empty() {
487+
for on in on_filter {
488+
if on_left_preserved && checker.is_left_only(&on) {
489+
left_push.push(on)
490+
} else if on_right_preserved && checker.is_right_only(&on) {
491+
right_push.push(on)
492+
} else {
493+
on_filter_join_conditions.push(on)
494+
}
497495
}
498496
}
499497

500498
// Extract from OR clause, generate new predicates for both side of join if possible.
501499
// We only track the unpushable predicates above.
502-
let extend_or_clauses =
503-
|target: &mut Vec<Expr>, filters: &[Expr], schema: &DFSchema, preserved| {
504-
if preserved {
505-
target.extend(extract_or_clauses_for_join(filters, schema));
506-
}
507-
};
508-
extend_or_clauses(
509-
&mut left_push,
510-
&keep_predicates,
511-
left_schema,
512-
left_preserved,
513-
);
514-
extend_or_clauses(
515-
&mut left_push,
516-
&join_conditions,
517-
left_schema,
518-
left_preserved,
519-
);
520-
extend_or_clauses(
521-
&mut right_push,
522-
&keep_predicates,
523-
right_schema,
524-
right_preserved,
525-
);
526-
extend_or_clauses(
527-
&mut right_push,
528-
&join_conditions,
529-
right_schema,
530-
right_preserved,
531-
);
500+
if left_preserved {
501+
left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema));
502+
left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema));
503+
}
504+
if right_preserved {
505+
right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema));
506+
right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema));
507+
}
532508

533509
// For predicates from join filter, we should check with if a join side is preserved
534510
// in term of join filtering.
535-
extend_or_clauses(
536-
&mut left_push,
537-
&on_filter_join_conditions,
538-
left_schema,
539-
on_left_preserved,
540-
);
541-
extend_or_clauses(
542-
&mut right_push,
543-
&on_filter_join_conditions,
544-
right_schema,
545-
on_right_preserved,
546-
);
511+
if on_left_preserved {
512+
left_push.extend(extract_or_clauses_for_join(
513+
&on_filter_join_conditions,
514+
left_schema,
515+
));
516+
}
517+
if on_right_preserved {
518+
right_push.extend(extract_or_clauses_for_join(
519+
&on_filter_join_conditions,
520+
right_schema,
521+
));
522+
}
547523

548524
if let Some(predicate) = conjunction(left_push) {
549525
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?));
@@ -778,11 +754,10 @@ fn infer_join_predicates_impl<
778754
inferred_predicates: &mut InferredPredicates,
779755
) -> Result<()> {
780756
for predicate in input_predicates {
781-
let column_refs = predicate.column_refs();
782757
let mut join_cols_to_replace = HashMap::new();
783758
let mut saw_non_replaceable_ref = false;
784759

785-
for &col in &column_refs {
760+
for &col in &predicate.column_refs() {
786761
let replacement = join_col_keys.iter().find_map(|(l, r)| {
787762
if ENABLE_LEFT_TO_RIGHT && col == *l {
788763
Some((col, *r))

datafusion/optimizer/src/utils.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ use log::{debug, trace};
3838
/// as it was initially placed here and then moved elsewhere.
3939
pub use datafusion_expr::expr_rewriter::NamePreserver;
4040

41+
#[cfg(test)]
42+
use self::test_eval_mode::{
43+
NullRestrictionEvalMode, null_restriction_eval_mode,
44+
set_null_restriction_eval_mode_for_test, with_null_restriction_eval_mode_for_test,
45+
};
46+
4147
/// Returns true if `expr` contains all columns in `schema_cols`
4248
pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) -> bool {
4349
expr.column_refs()
@@ -73,7 +79,7 @@ pub fn is_restrict_null_predicate<'a>(
7379
predicate: Expr,
7480
join_cols_of_predicate: impl IntoIterator<Item = &'a Column>,
7581
) -> Result<bool> {
76-
if is_column_expr(&predicate) {
82+
if matches!(predicate, Expr::Column(_)) {
7783
return Ok(true);
7884
}
7985

@@ -93,8 +99,8 @@ pub fn is_restrict_null_predicate<'a>(
9399

94100
#[cfg(test)]
95101
if matches!(
96-
test_eval_mode::null_restriction_eval_mode(),
97-
test_eval_mode::NullRestrictionEvalMode::AuthoritativeOnly
102+
null_restriction_eval_mode(),
103+
NullRestrictionEvalMode::AuthoritativeOnly
98104
) {
99105
return authoritative_restrict_null_predicate(predicate, join_cols);
100106
}
@@ -127,16 +133,16 @@ pub fn evaluates_to_null<'a>(
127133
predicate: Expr,
128134
null_columns: impl IntoIterator<Item = &'a Column>,
129135
) -> Result<bool> {
130-
if is_column_expr(&predicate) {
136+
if matches!(predicate, Expr::Column(_)) {
131137
return Ok(true);
132138
}
133139

134-
evaluate_with_null_columns(predicate, null_columns, |result| {
135-
Ok(match result {
140+
Ok(
141+
match evaluate_expr_with_null_column(predicate, null_columns)? {
136142
ColumnarValue::Array(_) => false,
137143
ColumnarValue::Scalar(scalar) => scalar.is_null(),
138-
})
139-
})
144+
},
145+
)
140146
}
141147

142148
fn evaluate_expr_with_null_column<'a>(
@@ -170,8 +176,8 @@ fn authoritative_restrict_null_predicate<'a>(
170176
predicate: Expr,
171177
join_cols_of_predicate: impl IntoIterator<Item = &'a Column>,
172178
) -> Result<bool> {
173-
evaluate_with_null_columns(predicate, join_cols_of_predicate, |result| {
174-
Ok(match result {
179+
Ok(
180+
match evaluate_expr_with_null_column(predicate, join_cols_of_predicate)? {
175181
ColumnarValue::Array(array) if array.len() == 1 => {
176182
let boolean_array = as_boolean_array(&array)?;
177183
boolean_array.is_null(0) || !boolean_array.value(0)
@@ -183,27 +189,15 @@ fn authoritative_restrict_null_predicate<'a>(
183189
| ScalarValue::Boolean(Some(false))
184190
| ScalarValue::Null
185191
),
186-
})
187-
})
192+
},
193+
)
188194
}
189195

190196
fn coerce(expr: Expr, schema: &DFSchema) -> Result<Expr> {
191197
let mut expr_rewrite = TypeCoercionRewriter { schema };
192198
expr.rewrite(&mut expr_rewrite).data()
193199
}
194200

195-
fn is_column_expr(expr: &Expr) -> bool {
196-
matches!(expr, Expr::Column(_))
197-
}
198-
199-
fn evaluate_with_null_columns<'a, T>(
200-
predicate: Expr,
201-
null_columns: impl IntoIterator<Item = &'a Column>,
202-
f: impl FnOnce(ColumnarValue) -> Result<T>,
203-
) -> Result<T> {
204-
f(evaluate_expr_with_null_column(predicate, null_columns)?)
205-
}
206-
207201
#[cfg(test)]
208202
mod test_eval_mode {
209203
use std::cell::Cell;
@@ -252,11 +246,6 @@ mod tests {
252246
use super::*;
253247
use std::panic::{AssertUnwindSafe, catch_unwind};
254248

255-
use crate::utils::test_eval_mode::{
256-
NullRestrictionEvalMode, null_restriction_eval_mode,
257-
set_null_restriction_eval_mode_for_test,
258-
with_null_restriction_eval_mode_for_test,
259-
};
260249
use datafusion_expr::{
261250
Operator, binary_expr, case, col, in_list, is_null, lit, when,
262251
};

datafusion/optimizer/src/utils/null_restriction.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub(super) fn syntactic_restrict_null_predicate(
3939
match syntactic_null_substitution_value(predicate, join_cols) {
4040
Some(NullSubstitutionValue::Boolean(value)) => Some(!value),
4141
Some(NullSubstitutionValue::Null) => Some(true),
42-
_ => None,
42+
Some(NullSubstitutionValue::NonNull) | None => None,
4343
}
4444
}
4545

@@ -49,7 +49,7 @@ fn not(value: Option<NullSubstitutionValue>) -> Option<NullSubstitutionValue> {
4949
Some(NullSubstitutionValue::Boolean(!value))
5050
}
5151
Some(NullSubstitutionValue::Null) => Some(NullSubstitutionValue::Null),
52-
_ => None,
52+
Some(NullSubstitutionValue::NonNull) | None => None,
5353
}
5454
}
5555

@@ -86,10 +86,15 @@ fn null_check_value(
8686
value: Option<NullSubstitutionValue>,
8787
is_not_null: bool,
8888
) -> Option<NullSubstitutionValue> {
89-
value.map(|value| match value {
90-
NullSubstitutionValue::Null => NullSubstitutionValue::Boolean(!is_not_null),
91-
_ => NullSubstitutionValue::Boolean(is_not_null),
92-
})
89+
match value {
90+
Some(NullSubstitutionValue::Null) => {
91+
Some(NullSubstitutionValue::Boolean(!is_not_null))
92+
}
93+
Some(NullSubstitutionValue::NonNull | NullSubstitutionValue::Boolean(_)) => {
94+
Some(NullSubstitutionValue::Boolean(is_not_null))
95+
}
96+
None => None,
97+
}
9398
}
9499

95100
fn null_if_contains_null(
@@ -101,6 +106,12 @@ fn null_if_contains_null(
101106
.then_some(NullSubstitutionValue::Null)
102107
}
103108

109+
fn strict_null_only(
110+
value: Option<NullSubstitutionValue>,
111+
) -> Option<NullSubstitutionValue> {
112+
value.filter(|value| matches!(value, NullSubstitutionValue::Null))
113+
}
114+
104115
fn syntactic_null_substitution_value(
105116
expr: &Expr,
106117
join_cols: &HashSet<&Column>,
@@ -139,17 +150,16 @@ fn syntactic_null_substitution_value(
139150
syntactic_null_substitution_value(between.low.as_ref(), join_cols),
140151
syntactic_null_substitution_value(between.high.as_ref(), join_cols),
141152
]),
142-
Expr::Cast(cast) => {
143-
syntactic_null_substitution_value(cast.expr.as_ref(), join_cols)
144-
.filter(|value| matches!(value, NullSubstitutionValue::Null))
145-
}
146-
Expr::TryCast(try_cast) => {
147-
syntactic_null_substitution_value(try_cast.expr.as_ref(), join_cols)
148-
.filter(|value| matches!(value, NullSubstitutionValue::Null))
149-
}
153+
Expr::Cast(cast) => strict_null_only(syntactic_null_substitution_value(
154+
cast.expr.as_ref(),
155+
join_cols,
156+
)),
157+
Expr::TryCast(try_cast) => strict_null_only(syntactic_null_substitution_value(
158+
try_cast.expr.as_ref(),
159+
join_cols,
160+
)),
150161
Expr::Negative(expr) => {
151-
syntactic_null_substitution_value(expr.as_ref(), join_cols)
152-
.filter(|value| matches!(value, NullSubstitutionValue::Null))
162+
strict_null_only(syntactic_null_substitution_value(expr.as_ref(), join_cols))
153163
}
154164
Expr::Like(like) | Expr::SimilarTo(like) => null_if_contains_null([
155165
syntactic_null_substitution_value(like.expr.as_ref(), join_cols),

0 commit comments

Comments
 (0)