Skip to content

Commit e417f4b

Browse files
committed
[SPARK-57858][SQL] Emit BIN BY scaled DISTRIBUTE columns as produced attributes
1 parent db81fa6 commit e417f4b

5 files changed

Lines changed: 124 additions & 20 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
2020
import scala.collection.mutable
2121
import scala.util.control.NonFatal
2222

23-
import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId}
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, ExprId}
24+
import org.apache.spark.sql.catalyst.plans.logical.BinByOutputAliases
2425
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2526
import org.apache.spark.sql.errors.QueryCompilationErrors
2627
import org.apache.spark.sql.internal.SQLConf
@@ -154,4 +155,23 @@ object BinByResolution {
154155
timeZoneId = if (isLTZ) Some(sessionZone) else None
155156
)
156157
}
158+
159+
/**
160+
* Builds the three appended output attributes (`bin_start`, `bin_end`, `bin_distribute_ratio`),
161+
* applying any user renames from `aliases`. `rangeType` is the RANGE column type carried by
162+
* `bin_start` / `bin_end`.
163+
*/
164+
def appendedAttributesWithAliases(
165+
rangeType: DataType,
166+
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
167+
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
168+
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
169+
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
170+
171+
/**
172+
* Mints a produced output attribute for each DISTRIBUTE input column: same name, type, and
173+
* nullability, but a fresh `ExprId` so the rescaled value is a distinct attribute from the input.
174+
*/
175+
def scaledDistributeAttributes(distributeColumns: Seq[Attribute]): Seq[Attribute] =
176+
distributeColumns.map(a => AttributeReference(a.name, a.dataType, a.nullable)())
157177
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
202202
existingRelations,
203203
b,
204204
_.producedAttributes.map(_.exprId.id).toSeq,
205-
newBinBy => newBinBy.copy(appendedAttributes =
206-
newBinBy.appendedAttributes.map(_.newInstance())))
205+
newBinBy => newBinBy.copy(
206+
scaledDistributeColumns = newBinBy.scaledDistributeColumns.map(_.newInstance()),
207+
appendedAttributes = newBinBy.appendedAttributes.map(_.newInstance())))
207208

208209
case e: Expand =>
209210
deduplicateAndRenew[Expand](
@@ -470,6 +471,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
470471
case oldVersion: BinBy
471472
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
472473
val newVersion = oldVersion.copy(
474+
scaledDistributeColumns = oldVersion.scaledDistributeColumns.map(_.newInstance()),
473475
appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance()))
474476
newVersion.copyTagsFrom(oldVersion)
475477
Seq((oldVersion, newVersion))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ object ResolveBinBy extends Rule[LogicalPlan] {
6565
originExpr = b.originExpr)
6666

6767
val appendedAttributes =
68-
BinBy.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
68+
BinByResolution.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
69+
val scaledDistributeColumns = BinByResolution.scaledDistributeAttributes(distributeAttributes)
6970

7071
BinBy(
7172
binWidthMicros = parameters.binWidthMicros,
7273
rangeStart = rangeStart,
7374
rangeEnd = rangeEnd,
7475
originMicros = parameters.originMicros,
7576
distributeColumns = distributeAttributes,
77+
scaledDistributeColumns = scaledDistributeColumns,
7678
appendedAttributes = appendedAttributes,
7779
child = child,
7880
timeZoneId = parameters.timeZoneId)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,8 +1798,14 @@ case class UnresolvedBinBy(
17981798
* @param rangeEnd Resolved attribute holding each row's window-end timestamp.
17991799
* @param originMicros Alignment anchor in microseconds since the epoch: the folded value of
18001800
* `ALIGN TO`, or the type-specific default when the clause is omitted.
1801-
* @param distributeColumns Resolved columns to proportionally redistribute.
1802-
* @param appendedAttributes The three output attributes appended after `child.output`.
1801+
* @param distributeColumns Resolved input columns to proportionally redistribute. Read by the
1802+
* operator to compute the rescaled values; not part of `output`.
1803+
* @param scaledDistributeColumns
1804+
* Produced output attributes (fresh `ExprId`s, same names and types as
1805+
* `distributeColumns`) holding the rescaled values. They take the place
1806+
* of `distributeColumns` in `output` so the rescaled value carries a
1807+
* distinct identity from the input and cannot be mistaken for it.
1808+
* @param appendedAttributes The three output attributes appended after the child columns.
18031809
* @param child Input relation.
18041810
* @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ.
18051811
* Required when `rangeStart.dataType` is `TimestampType`; must be
@@ -1811,6 +1817,7 @@ case class BinBy(
18111817
rangeEnd: Attribute,
18121818
originMicros: Long,
18131819
distributeColumns: Seq[Attribute],
1820+
scaledDistributeColumns: Seq[Attribute],
18141821
appendedAttributes: Seq[Attribute],
18151822
child: LogicalPlan,
18161823
timeZoneId: Option[String])
@@ -1822,25 +1829,28 @@ case class BinBy(
18221829
s"${rangeStart.dataType}, timeZoneId=$timeZoneId")
18231830
}
18241831

1825-
override def output: Seq[Attribute] = child.output ++ appendedAttributes
1832+
assert(distributeColumns.length == scaledDistributeColumns.length,
1833+
"BinBy requires one scaled attribute per DISTRIBUTE column, got " +
1834+
s"${distributeColumns.length} distribute columns and " +
1835+
s"${scaledDistributeColumns.length} scaled attributes")
18261836

1827-
override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes)
1837+
// Each DISTRIBUTE input column is swapped in `output` for its scaled produced counterpart
1838+
// (fresh identity); the input itself stays on the node for the executor but is not forwarded.
1839+
private lazy val distributeReplacements: AttributeMap[Attribute] =
1840+
AttributeMap(distributeColumns.zip(scaledDistributeColumns))
1841+
1842+
override def output: Seq[Attribute] =
1843+
child.output.map(a => distributeReplacements.getOrElse(a, a)) ++ appendedAttributes
1844+
1845+
override def producedAttributes: AttributeSet =
1846+
AttributeSet(scaledDistributeColumns ++ appendedAttributes)
18281847

18291848
final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY)
18301849

18311850
override protected def withNewChildInternal(newChild: LogicalPlan): BinBy =
18321851
copy(child = newChild)
18331852
}
18341853

1835-
object BinBy {
1836-
def appendedAttributesWithAliases(
1837-
rangeType: DataType,
1838-
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
1839-
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
1840-
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
1841-
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
1842-
}
1843-
18441854
/**
18451855
* A logical plan node for creating a logical limit, which is split into two separate logical nodes:
18461856
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,73 @@ class ResolveBinBySuite extends AnalysisTest {
202202
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId))
203203
}
204204

205+
test("rescaled DISTRIBUTE columns are produced attributes that shadow the input") {
206+
val bi = ResolveBinBy.apply(unresolved()).asInstanceOf[BinBy]
207+
208+
// The input column is still read (held in distributeColumns) but is not forwarded by identity.
209+
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId))
210+
assert(!bi.output.exists(_.exprId == value.exprId))
211+
212+
// The output `value` keeps its name, type, and position, but has a fresh exprId, and it is a
213+
// produced attribute. This is what prevents the rescaled value from being confused with the
214+
// input by any rule that reasons on exprId (predicate pushdown, constraints, CSE).
215+
val outValue = bi.output(ltzChild.output.indexWhere(_.exprId == value.exprId))
216+
assert(outValue.name == "value")
217+
assert(outValue.dataType == DoubleType)
218+
assert(outValue.exprId != value.exprId)
219+
assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId))
220+
assert(bi.producedAttributes.contains(outValue))
221+
222+
// Forwarded (non-distribute) columns keep their identity.
223+
assert(bi.output.exists(_.exprId == label.exprId))
224+
assert(bi.output.exists(_.exprId == tsStart.exprId))
225+
}
226+
227+
test("each of multiple DISTRIBUTE columns is replaced in place with a distinct fresh id") {
228+
val value2 = $"value2".double
229+
val child = LocalRelation(tsStart, tsEnd, value, value2, label)
230+
val bi = ResolveBinBy.apply(
231+
unresolved(child = child, distribute = Seq(value, value2))).asInstanceOf[BinBy]
232+
233+
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId, value2.exprId))
234+
assert(bi.scaledDistributeColumns.length == 2)
235+
236+
// Each input column is replaced at its own position by a fresh-id, same-name attribute.
237+
Seq(value, value2).foreach { in =>
238+
val pos = child.output.indexWhere(_.exprId == in.exprId)
239+
val out = bi.output(pos)
240+
assert(out.name == in.name)
241+
assert(out.exprId != in.exprId)
242+
assert(!bi.output.exists(_.exprId == in.exprId))
243+
}
244+
245+
// The two scaled columns are distinct; non-distribute columns keep their identity.
246+
assert(bi.scaledDistributeColumns.map(_.exprId).distinct.length == 2)
247+
assert(bi.output.exists(_.exprId == label.exprId))
248+
assert(bi.output.exists(_.exprId == tsStart.exprId))
249+
}
250+
251+
test("rescaled DISTRIBUTE column drops the input qualifier and metadata (computed value)") {
252+
// The rescaled column is a computed value, not a rename, so it must not inherit the input's
253+
// qualifier or metadata (else stale value-derived metadata such as ML min/max could ride along).
254+
val md = new MetadataBuilder().putString("comment", "a measure").build()
255+
val qualifiedValue = AttributeReference("value", DoubleType, nullable = true, md)()
256+
val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, qualifiedValue))
257+
val bi = ResolveBinBy.apply(
258+
unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value")))))
259+
.asInstanceOf[BinBy]
260+
261+
// The resolved input carries the qualifier and metadata...
262+
assert(bi.distributeColumns.head.qualifier == Seq("m"))
263+
assert(bi.distributeColumns.head.metadata == md)
264+
265+
// ...but the produced output column drops both and has a fresh id.
266+
val outValue = bi.output.find(_.name == "value").get
267+
assert(outValue.exprId != qualifiedValue.exprId)
268+
assert(outValue.qualifier.isEmpty)
269+
assert(outValue.metadata == Metadata.empty)
270+
}
271+
205272
test("multipart identifiers disambiguate same-name columns across a JOIN") {
206273
val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)()
207274
val t1End = AttributeReference("ts_end", TimestampType, nullable = true)()
@@ -330,9 +397,12 @@ class ResolveBinBySuite extends AnalysisTest {
330397

331398
val binBys = analyzed.collect { case b: BinBy => b }
332399
assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}")
333-
val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId))
334-
assert(appendedExprIds.distinct.size == appendedExprIds.size,
335-
"appended BinBy attributes must have distinct exprIds across the two join sides")
400+
// All produced attributes (the scaled DISTRIBUTE columns plus the three appended ones) must be
401+
// renewed on one side, so both dedup phases have to cover them.
402+
val producedExprIds = binBys.flatMap(b =>
403+
(b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId))
404+
assert(producedExprIds.distinct.size == producedExprIds.size,
405+
"produced BinBy attributes must have distinct exprIds across the two join sides")
336406
}
337407

338408
// `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly.

0 commit comments

Comments
 (0)