Skip to content

Commit 7e4c5c2

Browse files
committed
[SPARK-57858][SQL] Emit BIN BY scaled DISTRIBUTE columns as produced attributes
1 parent db81fa6 commit 7e4c5c2

5 files changed

Lines changed: 81 additions & 20 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
2020
import scala.collection.mutable
2121
import scala.util.control.NonFatal
2222

23-
import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId}
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, ExprId}
24+
import org.apache.spark.sql.catalyst.plans.logical.BinByOutputAliases
2425
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2526
import org.apache.spark.sql.errors.QueryCompilationErrors
2627
import org.apache.spark.sql.internal.SQLConf
@@ -154,4 +155,22 @@ object BinByResolution {
154155
timeZoneId = if (isLTZ) Some(sessionZone) else None
155156
)
156157
}
158+
159+
/**
160+
* Builds the three appended output attributes (`bin_start`, `bin_end`, `bin_distribute_ratio`),
161+
* applying `aliases`; `rangeType` is the type of `bin_start` / `bin_end`.
162+
*/
163+
def appendedAttributesWithAliases(
164+
rangeType: DataType,
165+
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
166+
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
167+
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
168+
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
169+
170+
/**
171+
* Mints a produced output attribute for each DISTRIBUTE input column: same name, type, and
172+
* nullability, but a fresh `ExprId` so the rescaled value is a distinct attribute from the input.
173+
*/
174+
def scaledDistributeAttributes(distributeColumns: Seq[Attribute]): Seq[Attribute] =
175+
distributeColumns.map(a => AttributeReference(a.name, a.dataType, a.nullable)())
157176
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
202202
existingRelations,
203203
b,
204204
_.producedAttributes.map(_.exprId.id).toSeq,
205-
newBinBy => newBinBy.copy(appendedAttributes =
206-
newBinBy.appendedAttributes.map(_.newInstance())))
205+
newBinBy => newBinBy.copy(
206+
scaledDistributeColumns = newBinBy.scaledDistributeColumns.map(_.newInstance()),
207+
appendedAttributes = newBinBy.appendedAttributes.map(_.newInstance())))
207208

208209
case e: Expand =>
209210
deduplicateAndRenew[Expand](
@@ -470,6 +471,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
470471
case oldVersion: BinBy
471472
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
472473
val newVersion = oldVersion.copy(
474+
scaledDistributeColumns = oldVersion.scaledDistributeColumns.map(_.newInstance()),
473475
appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance()))
474476
newVersion.copyTagsFrom(oldVersion)
475477
Seq((oldVersion, newVersion))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ object ResolveBinBy extends Rule[LogicalPlan] {
6565
originExpr = b.originExpr)
6666

6767
val appendedAttributes =
68-
BinBy.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
68+
BinByResolution.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
69+
val scaledDistributeColumns = BinByResolution.scaledDistributeAttributes(distributeAttributes)
6970

7071
BinBy(
7172
binWidthMicros = parameters.binWidthMicros,
7273
rangeStart = rangeStart,
7374
rangeEnd = rangeEnd,
7475
originMicros = parameters.originMicros,
7576
distributeColumns = distributeAttributes,
77+
scaledDistributeColumns = scaledDistributeColumns,
7678
appendedAttributes = appendedAttributes,
7779
child = child,
7880
timeZoneId = parameters.timeZoneId)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,8 +1798,13 @@ case class UnresolvedBinBy(
17981798
* @param rangeEnd Resolved attribute holding each row's window-end timestamp.
17991799
* @param originMicros Alignment anchor in microseconds since the epoch: the folded value of
18001800
* `ALIGN TO`, or the type-specific default when the clause is omitted.
1801-
* @param distributeColumns Resolved columns to proportionally redistribute.
1802-
* @param appendedAttributes The three output attributes appended after `child.output`.
1801+
* @param distributeColumns Resolved input columns to proportionally redistribute. Read by the
1802+
* operator to compute the rescaled values; not part of `output`.
1803+
* @param scaledDistributeColumns
1804+
* Produced output attributes holding the rescaled values (fresh
1805+
* `ExprId`s, same names/types as `distributeColumns`); they replace
1806+
* `distributeColumns` in `output`.
1807+
* @param appendedAttributes The three output attributes appended after the child columns.
18031808
* @param child Input relation.
18041809
* @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ.
18051810
* Required when `rangeStart.dataType` is `TimestampType`; must be
@@ -1811,6 +1816,7 @@ case class BinBy(
18111816
rangeEnd: Attribute,
18121817
originMicros: Long,
18131818
distributeColumns: Seq[Attribute],
1819+
scaledDistributeColumns: Seq[Attribute],
18141820
appendedAttributes: Seq[Attribute],
18151821
child: LogicalPlan,
18161822
timeZoneId: Option[String])
@@ -1822,25 +1828,27 @@ case class BinBy(
18221828
s"${rangeStart.dataType}, timeZoneId=$timeZoneId")
18231829
}
18241830

1825-
override def output: Seq[Attribute] = child.output ++ appendedAttributes
1831+
assert(distributeColumns.length == scaledDistributeColumns.length,
1832+
"BinBy requires one scaled attribute per DISTRIBUTE column, got " +
1833+
s"${distributeColumns.length} distribute columns and " +
1834+
s"${scaledDistributeColumns.length} scaled attributes")
18261835

1827-
override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes)
1836+
// In `output`, each DISTRIBUTE input is replaced by its scaled produced counterpart.
1837+
private lazy val distributeReplacements: AttributeMap[Attribute] =
1838+
AttributeMap(distributeColumns.zip(scaledDistributeColumns))
1839+
1840+
override def output: Seq[Attribute] =
1841+
child.output.map(a => distributeReplacements.getOrElse(a, a)) ++ appendedAttributes
1842+
1843+
override def producedAttributes: AttributeSet =
1844+
AttributeSet(scaledDistributeColumns ++ appendedAttributes)
18281845

18291846
final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY)
18301847

18311848
override protected def withNewChildInternal(newChild: LogicalPlan): BinBy =
18321849
copy(child = newChild)
18331850
}
18341851

1835-
object BinBy {
1836-
def appendedAttributesWithAliases(
1837-
rangeType: DataType,
1838-
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
1839-
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
1840-
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
1841-
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
1842-
}
1843-
18441852
/**
18451853
* A logical plan node for creating a logical limit, which is split into two separate logical nodes:
18461854
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,35 @@ class ResolveBinBySuite extends AnalysisTest {
202202
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId))
203203
}
204204

205+
test("resolved BinBy emits the DISTRIBUTE column as a produced attribute replacing the input") {
206+
// `value` sits mid-schema (not last) and carries a qualifier + metadata, so this covers
207+
// in-place replacement, produced identity, and the qualifier/metadata drop in one go.
208+
val md = new MetadataBuilder().putString("comment", "a measure").build()
209+
val valueMd = AttributeReference("value", DoubleType, nullable = true, md)()
210+
val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, valueMd, label))
211+
val bi = ResolveBinBy.apply(
212+
unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value")))))
213+
.asInstanceOf[BinBy]
214+
215+
// The input is read (held in distributeColumns) but not forwarded by identity.
216+
assert(bi.distributeColumns.head.qualifier == Seq("m"))
217+
assert(bi.distributeColumns.head.metadata == md)
218+
assert(!bi.output.exists(_.exprId == valueMd.exprId))
219+
220+
// It is replaced at its own position by a fresh-id, same-name produced attribute.
221+
val outValue = bi.output(child.output.indexWhere(_.exprId == valueMd.exprId))
222+
assert(outValue.name == "value" && outValue.exprId != valueMd.exprId)
223+
assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId))
224+
assert(bi.producedAttributes.contains(outValue))
225+
226+
// The produced (computed) value drops the input's qualifier and metadata.
227+
assert(outValue.qualifier.isEmpty && outValue.metadata == Metadata.empty)
228+
229+
// Forwarded (non-distribute) columns keep their identity.
230+
assert(bi.output.exists(_.exprId == label.exprId))
231+
assert(bi.output.exists(_.exprId == tsStart.exprId))
232+
}
233+
205234
test("multipart identifiers disambiguate same-name columns across a JOIN") {
206235
val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)()
207236
val t1End = AttributeReference("ts_end", TimestampType, nullable = true)()
@@ -330,9 +359,10 @@ class ResolveBinBySuite extends AnalysisTest {
330359

331360
val binBys = analyzed.collect { case b: BinBy => b }
332361
assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}")
333-
val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId))
334-
assert(appendedExprIds.distinct.size == appendedExprIds.size,
335-
"appended BinBy attributes must have distinct exprIds across the two join sides")
362+
val producedExprIds = binBys.flatMap(b =>
363+
(b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId))
364+
assert(producedExprIds.distinct.size == producedExprIds.size,
365+
"produced BinBy attributes must have distinct exprIds across the two join sides")
336366
}
337367

338368
// `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly.

0 commit comments

Comments
 (0)