From 7e4c5c24920133c59a8ce5956e38ecdb24415d9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikolina=20Vrane=C5=A1?= Date: Wed, 1 Jul 2026 16:18:20 +0000 Subject: [PATCH] [SPARK-57858][SQL] Emit BIN BY scaled DISTRIBUTE columns as produced attributes --- .../catalyst/analysis/BinByResolution.scala | 21 ++++++++++- .../analysis/DeduplicateRelations.scala | 6 ++-- .../sql/catalyst/analysis/ResolveBinBy.scala | 4 ++- .../plans/logical/basicLogicalOperators.scala | 34 +++++++++++------- .../catalyst/analysis/ResolveBinBySuite.scala | 36 +++++++++++++++++-- 5 files changed, 81 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala index 890df8eb83fc5..d7dad79b07f60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/BinByResolution.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import scala.util.control.NonFatal -import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.BinByOutputAliases import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -154,4 +155,22 @@ object BinByResolution { timeZoneId = if (isLTZ) Some(sessionZone) else None ) } + + /** + * Builds the three appended output attributes (`bin_start`, `bin_end`, `bin_distribute_ratio`), + * applying `aliases`; `rangeType` is the type of `bin_start` / `bin_end`. + */ + def appendedAttributesWithAliases( + rangeType: DataType, + aliases: BinByOutputAliases): Seq[Attribute] = Seq( + AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(), + AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(), + AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)()) + + /** + * Mints a produced output attribute for each DISTRIBUTE input column: same name, type, and + * nullability, but a fresh `ExprId` so the rescaled value is a distinct attribute from the input. + */ + def scaledDistributeAttributes(distributeColumns: Seq[Attribute]): Seq[Attribute] = + distributeColumns.map(a => AttributeReference(a.name, a.dataType, a.nullable)()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 57aede9805d7d..1fb703814fb91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -202,8 +202,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] { existingRelations, b, _.producedAttributes.map(_.exprId.id).toSeq, - newBinBy => newBinBy.copy(appendedAttributes = - newBinBy.appendedAttributes.map(_.newInstance()))) + newBinBy => newBinBy.copy( + scaledDistributeColumns = newBinBy.scaledDistributeColumns.map(_.newInstance()), + appendedAttributes = newBinBy.appendedAttributes.map(_.newInstance()))) case e: Expand => deduplicateAndRenew[Expand]( @@ -470,6 +471,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { case oldVersion: BinBy if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => val newVersion = oldVersion.copy( + scaledDistributeColumns = oldVersion.scaledDistributeColumns.map(_.newInstance()), appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance())) newVersion.copyTagsFrom(oldVersion) Seq((oldVersion, newVersion)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala index 9cd225628d646..d90326cc3e0b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala @@ -65,7 +65,8 @@ object ResolveBinBy extends Rule[LogicalPlan] { originExpr = b.originExpr) val appendedAttributes = - BinBy.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases) + BinByResolution.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases) + val scaledDistributeColumns = BinByResolution.scaledDistributeAttributes(distributeAttributes) BinBy( binWidthMicros = parameters.binWidthMicros, @@ -73,6 +74,7 @@ object ResolveBinBy extends Rule[LogicalPlan] { rangeEnd = rangeEnd, originMicros = parameters.originMicros, distributeColumns = distributeAttributes, + scaledDistributeColumns = scaledDistributeColumns, appendedAttributes = appendedAttributes, child = child, timeZoneId = parameters.timeZoneId) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0246d0b204ffd..fc4646e29ea16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1798,8 +1798,13 @@ case class UnresolvedBinBy( * @param rangeEnd Resolved attribute holding each row's window-end timestamp. * @param originMicros Alignment anchor in microseconds since the epoch: the folded value of * `ALIGN TO`, or the type-specific default when the clause is omitted. - * @param distributeColumns Resolved columns to proportionally redistribute. - * @param appendedAttributes The three output attributes appended after `child.output`. + * @param distributeColumns Resolved input columns to proportionally redistribute. Read by the + * operator to compute the rescaled values; not part of `output`. + * @param scaledDistributeColumns + * Produced output attributes holding the rescaled values (fresh + * `ExprId`s, same names/types as `distributeColumns`); they replace + * `distributeColumns` in `output`. + * @param appendedAttributes The three output attributes appended after the child columns. * @param child Input relation. * @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ. * Required when `rangeStart.dataType` is `TimestampType`; must be @@ -1811,6 +1816,7 @@ case class BinBy( rangeEnd: Attribute, originMicros: Long, distributeColumns: Seq[Attribute], + scaledDistributeColumns: Seq[Attribute], appendedAttributes: Seq[Attribute], child: LogicalPlan, timeZoneId: Option[String]) @@ -1822,9 +1828,20 @@ case class BinBy( s"${rangeStart.dataType}, timeZoneId=$timeZoneId") } - override def output: Seq[Attribute] = child.output ++ appendedAttributes + assert(distributeColumns.length == scaledDistributeColumns.length, + "BinBy requires one scaled attribute per DISTRIBUTE column, got " + + s"${distributeColumns.length} distribute columns and " + + s"${scaledDistributeColumns.length} scaled attributes") - override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes) + // In `output`, each DISTRIBUTE input is replaced by its scaled produced counterpart. + private lazy val distributeReplacements: AttributeMap[Attribute] = + AttributeMap(distributeColumns.zip(scaledDistributeColumns)) + + override def output: Seq[Attribute] = + child.output.map(a => distributeReplacements.getOrElse(a, a)) ++ appendedAttributes + + override def producedAttributes: AttributeSet = + AttributeSet(scaledDistributeColumns ++ appendedAttributes) final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY) @@ -1832,15 +1849,6 @@ case class BinBy( copy(child = newChild) } -object BinBy { - def appendedAttributesWithAliases( - rangeType: DataType, - aliases: BinByOutputAliases): Seq[Attribute] = Seq( - AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(), - AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(), - AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)()) -} - /** * A logical plan node for creating a logical limit, which is split into two separate logical nodes: * a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]]. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala index 43134eed760d0..3ae6f217ec293 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala @@ -202,6 +202,35 @@ class ResolveBinBySuite extends AnalysisTest { assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId)) } + test("resolved BinBy emits the DISTRIBUTE column as a produced attribute replacing the input") { + // `value` sits mid-schema (not last) and carries a qualifier + metadata, so this covers + // in-place replacement, produced identity, and the qualifier/metadata drop in one go. + val md = new MetadataBuilder().putString("comment", "a measure").build() + val valueMd = AttributeReference("value", DoubleType, nullable = true, md)() + val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, valueMd, label)) + val bi = ResolveBinBy.apply( + unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value"))))) + .asInstanceOf[BinBy] + + // The input is read (held in distributeColumns) but not forwarded by identity. + assert(bi.distributeColumns.head.qualifier == Seq("m")) + assert(bi.distributeColumns.head.metadata == md) + assert(!bi.output.exists(_.exprId == valueMd.exprId)) + + // It is replaced at its own position by a fresh-id, same-name produced attribute. + val outValue = bi.output(child.output.indexWhere(_.exprId == valueMd.exprId)) + assert(outValue.name == "value" && outValue.exprId != valueMd.exprId) + assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId)) + assert(bi.producedAttributes.contains(outValue)) + + // The produced (computed) value drops the input's qualifier and metadata. + assert(outValue.qualifier.isEmpty && outValue.metadata == Metadata.empty) + + // Forwarded (non-distribute) columns keep their identity. + assert(bi.output.exists(_.exprId == label.exprId)) + assert(bi.output.exists(_.exprId == tsStart.exprId)) + } + test("multipart identifiers disambiguate same-name columns across a JOIN") { val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)() val t1End = AttributeReference("ts_end", TimestampType, nullable = true)() @@ -330,9 +359,10 @@ class ResolveBinBySuite extends AnalysisTest { val binBys = analyzed.collect { case b: BinBy => b } assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}") - val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId)) - assert(appendedExprIds.distinct.size == appendedExprIds.size, - "appended BinBy attributes must have distinct exprIds across the two join sides") + val producedExprIds = binBys.flatMap(b => + (b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId)) + assert(producedExprIds.distinct.size == producedExprIds.size, + "produced BinBy attributes must have distinct exprIds across the two join sides") } // `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly.