Skip to content

Commit 8457567

Browse files
peter-tothdongjoon-hyun
authored andcommitted
[SPARK-56677][SQL] Propagate filter conditions through Join nodes in PlanMerger
### What changes were proposed in this pull request? `PlanMerger` now supports filter propagation through `Join` nodes when merging similar subplans. Previously, when two subplans contained identical `Join` nodes but differed only in a filter applied to one of the join's children, they could not be merged. This PR adds the ability to propagate such filter conditions through a `Join` and into the parent `Aggregate`'s `FILTER` clause. A new `filterSafeForJoin` helper checks that the filter originates from the non-nullable (preserved) side of the join: the left side of `LeftOuter`/`LeftSemi`/`LeftAnti`, the right side of `RightOuter`, or either side of `Inner`/`Cross`. `FullOuter` joins are not eligible. The feature is gated by a new SQL config `spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled` (default: `false`). ### Why are the changes needed? Without this change, scalar subqueries that differ only in a filter on one side of an identical join cannot be merged, resulting in redundant scans and compute. For example: SELECT (SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id), (SELECT sum(key) FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.b > 1) Both subqueries scan `t1` and `t2` in full even though they share the same base join. After this change a single merged scan is used and the second subquery's result is derived from it via an aggregate `FILTER` clause. ### Does this PR introduce _any_ user-facing change? Yes. When `spark.sql.optimizer.mergeSubplans.filterPropagation.filterPropagationThroughJoin.enabled` is set to `true`, the optimizer may merge scalar subqueries that were previously kept separate, reducing the number of scan and join operations. ### How was this patch tested? Added unit tests in `MergeSubplansSuite`: - Merge with filter on left inner join child - Merge with filter on right inner join child - No merge when both join children have independent filters - Merge with filter on the preserved side of a `LeftSemi` join - No merge when filter is on the non-output side of a `LeftSemi` join - No merge when filter is on the nullable side of an outer join - No merge when the feature is disabled via config Added integration test in `PlanMergeSuite` verifying correctness (`checkAnswer`) and plan shape (`SubqueryExec`/`ReusedSubqueryExec` counts) for both the enabled and disabled config cases, with and without AQE. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes #55628 from peter-toth/SPARK-56677-filter-propagation-through-join. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 3416213 commit 8457567

4 files changed

Lines changed: 361 additions & 13 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PlanMerger.scala

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeMap, Expression, If, Literal, NamedExpression, Or}
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
24+
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter}
2425
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan, Project}
2526
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2627
import org.apache.spark.sql.internal.SQLConf
@@ -85,12 +86,24 @@ object PlanMerger {
8586
* When `filterPropagationEnabled` is true, non-grouping [[Aggregate]]s over the same base plan
8687
* with different [[Filter]] conditions can also be merged. The filter conditions are exposed as
8788
* boolean [[Project]] attributes and consumed at the [[Aggregate]] as FILTER clauses.
88-
* When both sides carry a [[Filter]] (the symmetric case), merging broadens the scan to
89-
* OR(f1, f2), which may reduce IO pruning. This path is separately gated by
89+
* When both sides carry a [[Filter]] (the symmetric case), merging broadens the scan to OR(f1, f2),
90+
* which may reduce IO pruning. This path is separately gated by
9091
* `symmetricFilterPropagationEnabled`.
9192
* When plans also differ in intermediate [[Project]] expressions, those are wrapped with
92-
* `If(filterAttr, expr, null)` to avoid computing the expression for rows that do not
93-
* match that side's filter condition.
93+
* `If(filterAttr, expr, null)` to avoid computing the expression for rows that do not match that
94+
* side's filter condition.
95+
* Filter propagation also works through [[Join]] nodes: a filter on one child of the join produces
96+
* a boolean attribute that flows through the join output to the enclosing [[Aggregate]].
97+
* Propagation is only safe when the filter originates from the non-nullable side of the join, as
98+
* enforced by `filterSafeForJoin`. When the filter is on the nullable side, the merged base plan
99+
* restores rows that were filtered out of the nullable child, turning what were unmatched
100+
* NULL-padded rows in the original plan into matched rows with real column values. This changes the
101+
* result of expressions like `coalesce(col, default)` in the aggregate: an originally unmatched row
102+
* would have contributed `default` via `coalesce(NULL, default)`, but in the merged plan it is
103+
* matched, its real column value fails the filter, and `FILTER (WHERE false)` discards it entirely.
104+
* Propagation is also skipped when both the left and right children simultaneously produce filter
105+
* attributes, as combining them would require an additional AND alias above the join (not yet
106+
* supported).
94107
*
95108
* {{{
96109
* // Input plans
@@ -120,7 +133,9 @@ class PlanMerger(
120133
filterPropagationEnabled: Boolean =
121134
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_FILTER_PROPAGATION_ENABLED),
122135
symmetricFilterPropagationEnabled: Boolean =
123-
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_SYMMETRIC_FILTER_PROPAGATION_ENABLED)) {
136+
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_SYMMETRIC_FILTER_PROPAGATION_ENABLED),
137+
filterPropagationThroughJoinEnabled: Boolean =
138+
SQLConf.get.getConf(SQLConf.MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED)) {
124139
val cache = mutable.ArrayBuffer.empty[MergedPlan]
125140

126141
/**
@@ -224,7 +239,8 @@ class PlanMerger(
224239
* - Aggregate nodes: Combines aggregate expressions if grouping is identical and both
225240
* support the same aggregate implementation (hash/object-hash/sort-based)
226241
* - Filter nodes: Only if filter conditions are identical
227-
* - Join nodes: Only if join type, hints, and conditions are identical
242+
* - Join nodes: Requires identical join type, hints, and conditions; filter propagation is
243+
* forwarded into the join's children so a filter difference on one child can still be merged
228244
*
229245
* @param newPlan The plan to merge into the cached plan.
230246
* @param cachedPlan The cached plan to merge with.
@@ -416,18 +432,37 @@ class PlanMerger(
416432
}
417433

418434
case (np: Join, cp: Join) if np.joinType == cp.joinType && np.hint == cp.hint =>
419-
// Filter propagation across joins is not yet supported.
420-
tryMergePlans(np.left, cp.left, false).flatMap {
421-
case TryMergeResult(mergedLeft, leftNPMapping, None, None) =>
422-
tryMergePlans(np.right, cp.right, false).flatMap {
423-
case TryMergeResult(mergedRight, rightNPMapping, None, None) =>
435+
tryMergePlans(np.left, cp.left, filterPropagationSupported).flatMap {
436+
case TryMergeResult(mergedLeft, leftNPMapping, leftNPFilter, leftCPFilter) =>
437+
tryMergePlans(np.right, cp.right, filterPropagationSupported).flatMap {
438+
case TryMergeResult(mergedRight, rightNPMapping, rightNPFilter, rightCPFilter)
439+
// If both children independently propagate filter attributes we would need to
440+
// AND them into a new alias above the join, which is not yet supported.
441+
if !(leftNPFilter.isDefined && rightNPFilter.isDefined) &&
442+
!(leftCPFilter.isDefined && rightCPFilter.isDefined) &&
443+
// Gate join-crossing filter propagation behind its own config flag.
444+
// When no filter attributes are in play the merge is unconditionally safe.
445+
(leftNPFilter.isEmpty && leftCPFilter.isEmpty &&
446+
rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
447+
filterPropagationThroughJoinEnabled) &&
448+
// A filter attribute is only safe to propagate through a join if it comes
449+
// from the "preserved" (non-nullable) side. On the nullable side, unmatched
450+
// rows are NULL-padded so f=NULL, causing FILTER (WHERE f) to incorrectly
451+
// exclude rows that should contribute to the aggregate. Right-side
452+
// attributes are also absent from semi/anti join output.
453+
(leftNPFilter.isEmpty && leftCPFilter.isEmpty ||
454+
filterSafeForJoin(fromLeft = true, cp.joinType)) &&
455+
(rightNPFilter.isEmpty && rightCPFilter.isEmpty ||
456+
filterSafeForJoin(fromLeft = false, cp.joinType)) =>
424457
val npMapping = leftNPMapping ++ rightNPMapping
425458
val mappedNPCondition = np.condition.map(mapAttributes(_, npMapping))
426459
// Comparing the canonicalized form is required to ignore different forms of the
427460
// same expression and `AttributeReference.qualifier`s in `cp.condition`.
428461
if (mappedNPCondition.map(_.canonicalized) == cp.condition.map(_.canonicalized)) {
429-
val mergedPlan = cp.withNewChildren(Seq(mergedLeft, mergedRight))
430-
Some(TryMergeResult(mergedPlan, npMapping))
462+
val npFilter = leftNPFilter.orElse(rightNPFilter)
463+
val cpFilter = leftCPFilter.orElse(rightCPFilter)
464+
Some(TryMergeResult(cp.withNewChildren(Seq(mergedLeft, mergedRight)), npMapping,
465+
npFilter, cpFilter))
431466
} else {
432467
None
433468
}
@@ -441,6 +476,35 @@ class PlanMerger(
441476
})
442477
}
443478

479+
// Returns true when a filter attribute originating from `fromLeft` child of a join with
480+
// `joinType` can be safely propagated through that join to a parent Aggregate.
481+
//
482+
// Two conditions must both hold:
483+
// 1. The attribute is in the join's output (rules out the right side of LeftSemi/LeftAnti).
484+
// 2. The filter must originate from the non-nullable ("preserved") side of the join.
485+
// When a filter is on the nullable side, the merged base plan no longer applies it to the
486+
// nullable child's scan, so rows that were previously absent from that child reappear as
487+
// matched join rows instead of unmatched NULL-padded rows. This changes aggregate
488+
// expressions that use the NULL-padded column: e.g. for `sum(coalesce(col, default))`, an
489+
// originally unmatched row would have contributed `default` via `coalesce(NULL, default)`,
490+
// but in the merged plan the row is now matched with its real column value, fails the
491+
// filter, and FILTER (WHERE false) discards it -- losing the `default` contribution
492+
// entirely.
493+
private def filterSafeForJoin(fromLeft: Boolean, joinType: JoinType): Boolean =
494+
if (fromLeft) {
495+
// Left side is never NULL-padded in: Inner, LeftOuter, LeftSemi, LeftAnti, Cross.
496+
joinType match {
497+
case Inner | LeftOuter | LeftSemi | LeftAnti | Cross => true
498+
case _ => false // RightOuter and FullOuter can NULL-pad the left side
499+
}
500+
} else {
501+
// Right side is never NULL-padded AND is in the join output in: Inner, RightOuter, Cross.
502+
joinType match {
503+
case Inner | RightOuter | Cross => true
504+
case _ => false // LeftOuter/FullOuter can NULL-pad right; LeftSemi/LeftAnti drop right
505+
}
506+
}
507+
444508
private def mapAttributes[T <: Expression](expr: T, outputMap: AttributeMap[Attribute]) = {
445509
expr.transform {
446510
case a: Attribute => outputMap.getOrElse(a, a)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6608,6 +6608,20 @@ object SQLConf {
66086608
.booleanConf
66096609
.createWithDefault(false)
66106610

6611+
val MERGE_SUBPLANS_FILTER_PROPAGATION_THROUGH_JOIN_ENABLED =
6612+
buildConf("spark.sql.optimizer.mergeSubplans.filterPropagation.throughJoin.enabled")
6613+
.doc("When set to true, filter attributes can propagate through Join nodes during subplan " +
6614+
"merging, allowing subplans that differ only in their filter conditions and share a " +
6615+
"common join to be merged into a single scan. A filter attribute is only propagated " +
6616+
"through a join when it originates from the non-nullable (preserved) side: the left side " +
6617+
"of LeftOuter/LeftSemi/LeftAnti, the right side of RightOuter, or either side of " +
6618+
"Inner/Cross. FullOuter joins are never eligible. " +
6619+
s"Has no effect when ${MERGE_SUBPLANS_FILTER_PROPAGATION_ENABLED.key} is false.")
6620+
.version("4.2.0")
6621+
.withBindingPolicy(ConfigBindingPolicy.SESSION)
6622+
.booleanConf
6623+
.createWithDefault(false)
6624+
66116625
val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
66126626
.doc("When PRETTY, the error message consists of textual representation of error class, " +
66136627
"message and query context. Stack traces are only shown for internal errors " +

0 commit comments

Comments
 (0)