Skip to content

Commit 62e4e16

Browse files
committed
[SPARK-56395][SQL][FOLLOWUP] Fix RewriteNearestByJoin nullability for LEFT OUTER
### What changes were proposed in this pull request? Followup to #55682. In `RewriteNearestByJoin`, when the `NEAREST BY` join type is `LEFT OUTER`, the synthesized `Join` widens the right-side columns to nullable. However, the synthesized `Aggregate` (and the optional `__ranking__` `Project`) built on top of that join still referenced the right-side columns via `right.output` and `rankingExpression` with their original (non-nullable) nullability. As a result the rewritten plan can declare a right-side column as non-nullable while its child -- the join -- produces it as nullable. This PR maps the right-side attributes to their widened (nullable) form for `LEFT OUTER` and rewrites both the `CreateStruct(right.*)` and the ranking expression to use that widened nullability, so the rewritten plan's schema is consistent with its child. For `INNER` joins the right side is not widened, so this is a no-op. ### Why are the changes needed? Without this fix the rewritten plan for a `LEFT OUTER NEAREST BY` declares right-side columns non-nullable while its join child produces them nullable -- an inconsistency that nullability/plan-integrity validation flags as a regression. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a regression test to `RewriteNearestByJoinSuite` that, for both `INNER` and `LEFT OUTER`, asserts every right-side attribute the synthesized `Aggregate` references agrees on nullability with its join child. The test uses **non-nullable** right-side columns so that `LEFT OUTER`'s widening is observable -- it fails without this fix (`x#.. declared nullable=false but its child produces nullable=true`) and passes with it, while `INNER` stays a no-op. The suite's expected-plan helper was also updated to mirror the widened nullability. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56484 from cloud-fan/SPARK-56395-followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 875982c commit 62e4e16

2 files changed

Lines changed: 91 additions & 7 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,34 @@ object RewriteNearestByJoin extends Rule[LogicalPlan] {
9292
// cross-product today -- should not silently bypass that choice.
9393
val join = Join(taggedLeft, right, joinType, None, JoinHint.NONE)
9494

95-
val (aggInput, rankingForAgg) = if (!rankingExpression.deterministic) {
96-
val rankingAlias = Alias(rankingExpression, "__ranking__")()
95+
// A LEFT OUTER join widens the right-side columns to nullable. The synthesized Aggregate
96+
// (and the optional `__ranking__` Project) below sit directly on top of this join, so every
97+
// reference to a right-side column must carry that widened nullability. Otherwise the
98+
// rewritten plan would declare a right column non-nullable while its child -- the join --
99+
// produces it as nullable, which plan-integrity validation flags as a nullability
100+
// regression. INNER joins do not widen the right side, so this is a no-op there.
101+
val rightAttrs = joinType match {
102+
case LeftOuter => right.output.map(_.withNullability(true))
103+
case _ => right.output
104+
}
105+
val rightNullabilityMap = AttributeMap(right.output.zip(rightAttrs))
106+
val rankingInJoin = rankingExpression.transform {
107+
case a: Attribute => rightNullabilityMap.getOrElse(a, a)
108+
}
109+
110+
val (aggInput, rankingForAgg) = if (!rankingInJoin.deterministic) {
111+
val rankingAlias = Alias(rankingInJoin, "__ranking__")()
97112
Project(join.output :+ rankingAlias, join) -> rankingAlias.toAttribute
98113
} else {
99-
join -> rankingExpression
114+
join -> rankingInJoin
100115
}
101116

102117
// 4. Aggregate grouped by `__qid`:
103118
// - first(col) for every left column so it flows to the output.
104119
// - max_by/min_by(struct(right.*), ranking, k) as `_matches`.
105120
// The ranking expression references left and right columns directly; no outer
106121
// reference is needed because both sides are present in the joined input.
107-
val rightStruct = CreateStruct(right.output)
122+
val rightStruct = CreateStruct(rightAttrs)
108123
// reverse = true -> MIN_BY (smallest ranking value first, for DISTANCE)
109124
// reverse = false -> MAX_BY (largest ranking value first, for SIMILARITY)
110125
val reverse = direction match {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoinSuite.scala

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.sql.catalyst.dsl.expressions._
2121
import org.apache.spark.sql.catalyst.dsl.plans._
22-
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, CreateStruct, Inline, Literal, Rand, Uuid}
22+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, CreateStruct, Inline, Literal, Rand, Uuid}
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, First, MaxMinByK}
2424
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, NearestByDistance, NearestBySimilarity, PlanTest}
2525
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Generate, Join, JoinHint, LocalRelation, NearestByJoin, Project}
@@ -46,9 +46,19 @@ class RewriteNearestByJoinSuite extends PlanTest {
4646
val taggedLeft = Project(left.output :+ qidAlias, left)
4747
val join = Join(taggedLeft, right, joinType, None, JoinHint.NONE)
4848

49-
val rightStruct = CreateStruct(right.output)
49+
// Mirror the rewrite: a LEFT OUTER join widens right-side columns to nullable, so the
50+
// struct and ranking that sit on top of the join must reference them with that nullability.
51+
val rightAttrs = joinType match {
52+
case LeftOuter => right.output.map(_.withNullability(true))
53+
case _ => right.output
54+
}
55+
val rightNullabilityMap = AttributeMap(right.output.zip(rightAttrs))
56+
val rankingInJoin = ranking.transform {
57+
case a: Attribute => rightNullabilityMap.getOrElse(a, a)
58+
}
59+
val rightStruct = CreateStruct(rightAttrs)
5060
val topKAgg = MaxMinByK(
51-
rightStruct, ranking, Literal(numResults), reverse = reverse)
61+
rightStruct, rankingInJoin, Literal(numResults), reverse = reverse)
5262
.toAggregateExpression()
5363
val matchesAlias = Alias(topKAgg, "__nearest_matches__")()
5464
val firstLeftAggs = left.output.map { attr =>
@@ -145,6 +155,65 @@ class RewriteNearestByJoinSuite extends PlanTest {
145155
comparePlans(normalizeUuidSeed(rewritten), expected, checkAnalysis = false)
146156
}
147157

158+
test("SPARK-56395: LEFT OUTER rewrite keeps right-side nullability consistent with its child") {
159+
// A LEFT OUTER NEAREST BY widens the synthetic join's right-side columns to nullable. Every
160+
// operator built on top of that join that references those columns (the `_matches` struct,
161+
// the ranking) must carry the widened nullability -- otherwise the rewritten plan declares a
162+
// column non-nullable while its child produces it as nullable, an internal inconsistency that
163+
// no framework check catches (`LogicalPlanIntegrity` compares types `asNullable` and schemas
164+
// `equalsIgnoreNullability`), so this assertion is the only guard. INNER does not widen the
165+
// right side, so it stays a no-op.
166+
//
167+
// The right-side columns are declared non-nullable here: that is what makes LEFT OUTER's
168+
// widening observable (with nullable columns the widening is a no-op and the bug is hidden).
169+
// The ranking is exercised both deterministic (the reference lands directly in the Aggregate)
170+
// and nondeterministic (the rule pre-materializes it into a `__ranking__` Project above the
171+
// Join), so the widening is checked wherever the reference ends up.
172+
val left = LocalRelation($"a".int, $"b".int)
173+
val right = LocalRelation(
174+
AttributeReference("x", IntegerType, nullable = false)(),
175+
AttributeReference("y", IntegerType, nullable = false)())
176+
val rankings = Seq(
177+
"deterministic" -> (left.output(0) + right.output(0)),
178+
"nondeterministic" -> (Rand(Literal(0L)) + right.output(0)))
179+
for ((joinType, rightNullable) <- Seq(Inner -> false, LeftOuter -> true);
180+
(label, ranking) <- rankings) {
181+
val query = NearestByJoin(
182+
left, right, joinType, approx = true, numResults = 1,
183+
rankingExpression = ranking,
184+
direction = NearestBySimilarity)
185+
186+
val rewritten = RewriteNearestByJoin(query.analyze)
187+
val join = rewritten.collect { case j: Join => j }.head
188+
189+
// Sanity-check the fixture: the synthetic join widens its right-side output to nullable
190+
// iff it is LEFT OUTER. (`join.right` is the right relation as it appears in the rewritten
191+
// plan, so its ExprIds line up with the join's output.)
192+
val rightExprIds = join.right.output.map(_.exprId).toSet
193+
val joinRightOutput = join.output.filter(a => rightExprIds.contains(a.exprId))
194+
assert(joinRightOutput.nonEmpty)
195+
assert(joinRightOutput.forall(_.nullable == rightNullable))
196+
197+
// Whole-plan integrity: at every operator, an attribute reference whose ExprId is produced
198+
// by one of that operator's children must agree with the child on nullability -- this is
199+
// exactly what the fix corrects for LEFT OUTER. Walking the whole plan (rather than just
200+
// the Aggregate) also covers the `__ranking__` Project that the nondeterministic path
201+
// inserts above the Join, where the widened ranking reference lands.
202+
rewritten.foreach { node =>
203+
val childNullability =
204+
node.children.flatMap(_.output).map(a => a.exprId -> a.nullable).toMap
205+
node.expressions.foreach(_.foreach {
206+
case ref: AttributeReference if childNullability.contains(ref.exprId) =>
207+
assert(ref.nullable == childNullability(ref.exprId),
208+
s"$joinType/$label: ${ref.name}#${ref.exprId.id} declared " +
209+
s"nullable=${ref.nullable} but its child produces " +
210+
s"nullable=${childNullability(ref.exprId)}")
211+
case _ =>
212+
})
213+
}
214+
}
215+
}
216+
148217
test("synthetic Join uses the user's joinType") {
149218
// Locks in that the rewrite's synthetic Join carries the user's `joinType`
150219
// (Inner or LeftOuter).

0 commit comments

Comments
 (0)