Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.BinByOutputAliases
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -154,4 +155,22 @@ object BinByResolution {
timeZoneId = if (isLTZ) Some(sessionZone) else None
)
}

/**
* Builds the three appended output attributes (`bin_start`, `bin_end`, `bin_distribute_ratio`),
* applying `aliases`; `rangeType` is the type of `bin_start` / `bin_end`.
*/
def appendedAttributesWithAliases(
rangeType: DataType,
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())

/**
* Mints a produced output attribute for each DISTRIBUTE input column: same name, type, and
* nullability, but a fresh `ExprId` so the rescaled value is a distinct attribute from the input.
*/
def scaledDistributeAttributes(distributeColumns: Seq[Attribute]): Seq[Attribute] =
distributeColumns.map(a => AttributeReference(a.name, a.dataType, a.nullable)())
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
existingRelations,
b,
_.producedAttributes.map(_.exprId.id).toSeq,
newBinBy => newBinBy.copy(appendedAttributes =
newBinBy.appendedAttributes.map(_.newInstance())))
newBinBy => newBinBy.copy(
scaledDistributeColumns = newBinBy.scaledDistributeColumns.map(_.newInstance()),
appendedAttributes = newBinBy.appendedAttributes.map(_.newInstance())))

case e: Expand =>
deduplicateAndRenew[Expand](
Expand Down Expand Up @@ -470,6 +471,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case oldVersion: BinBy
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(
scaledDistributeColumns = oldVersion.scaledDistributeColumns.map(_.newInstance()),
appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ object ResolveBinBy extends Rule[LogicalPlan] {
originExpr = b.originExpr)

val appendedAttributes =
BinBy.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
BinByResolution.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
val scaledDistributeColumns = BinByResolution.scaledDistributeAttributes(distributeAttributes)

BinBy(
binWidthMicros = parameters.binWidthMicros,
rangeStart = rangeStart,
rangeEnd = rangeEnd,
originMicros = parameters.originMicros,
distributeColumns = distributeAttributes,
scaledDistributeColumns = scaledDistributeColumns,
appendedAttributes = appendedAttributes,
child = child,
timeZoneId = parameters.timeZoneId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1798,8 +1798,13 @@ case class UnresolvedBinBy(
* @param rangeEnd Resolved attribute holding each row's window-end timestamp.
* @param originMicros Alignment anchor in microseconds since the epoch: the folded value of
* `ALIGN TO`, or the type-specific default when the clause is omitted.
* @param distributeColumns Resolved columns to proportionally redistribute.
* @param appendedAttributes The three output attributes appended after `child.output`.
* @param distributeColumns Resolved input columns to proportionally redistribute. Read by the
* operator to compute the rescaled values; not part of `output`.
* @param scaledDistributeColumns
* Produced output attributes holding the rescaled values (fresh
* `ExprId`s, same names/types as `distributeColumns`); they replace
* `distributeColumns` in `output`.
* @param appendedAttributes The three output attributes appended after the child columns.
* @param child Input relation.
* @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ.
* Required when `rangeStart.dataType` is `TimestampType`; must be
Expand All @@ -1811,6 +1816,7 @@ case class BinBy(
rangeEnd: Attribute,
originMicros: Long,
distributeColumns: Seq[Attribute],
scaledDistributeColumns: Seq[Attribute],
appendedAttributes: Seq[Attribute],
child: LogicalPlan,
timeZoneId: Option[String])
Expand All @@ -1822,25 +1828,27 @@ case class BinBy(
s"${rangeStart.dataType}, timeZoneId=$timeZoneId")
}

override def output: Seq[Attribute] = child.output ++ appendedAttributes
assert(distributeColumns.length == scaledDistributeColumns.length,
"BinBy requires one scaled attribute per DISTRIBUTE column, got " +
s"${distributeColumns.length} distribute columns and " +
s"${scaledDistributeColumns.length} scaled attributes")

override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes)
// In `output`, each DISTRIBUTE input is replaced by its scaled produced counterpart.
private lazy val distributeReplacements: AttributeMap[Attribute] =
AttributeMap(distributeColumns.zip(scaledDistributeColumns))

override def output: Seq[Attribute] =
child.output.map(a => distributeReplacements.getOrElse(a, a)) ++ appendedAttributes

override def producedAttributes: AttributeSet =
AttributeSet(scaledDistributeColumns ++ appendedAttributes)

final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY)

override protected def withNewChildInternal(newChild: LogicalPlan): BinBy =
copy(child = newChild)
}

object BinBy {
def appendedAttributesWithAliases(
rangeType: DataType,
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
}

/**
* A logical plan node for creating a logical limit, which is split into two separate logical nodes:
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,35 @@ class ResolveBinBySuite extends AnalysisTest {
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId))
}

test("resolved BinBy emits the DISTRIBUTE column as a produced attribute replacing the input") {
// `value` sits mid-schema (not last) and carries a qualifier + metadata, so this covers
// in-place replacement, produced identity, and the qualifier/metadata drop in one go.
val md = new MetadataBuilder().putString("comment", "a measure").build()
val valueMd = AttributeReference("value", DoubleType, nullable = true, md)()
val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, valueMd, label))
val bi = ResolveBinBy.apply(
unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value")))))
.asInstanceOf[BinBy]

// The input is read (held in distributeColumns) but not forwarded by identity.
assert(bi.distributeColumns.head.qualifier == Seq("m"))
assert(bi.distributeColumns.head.metadata == md)
assert(!bi.output.exists(_.exprId == valueMd.exprId))

// It is replaced at its own position by a fresh-id, same-name produced attribute.
val outValue = bi.output(child.output.indexWhere(_.exprId == valueMd.exprId))
assert(outValue.name == "value" && outValue.exprId != valueMd.exprId)
assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId))
assert(bi.producedAttributes.contains(outValue))

// The produced (computed) value drops the input's qualifier and metadata.
assert(outValue.qualifier.isEmpty && outValue.metadata == Metadata.empty)

// Forwarded (non-distribute) columns keep their identity.
assert(bi.output.exists(_.exprId == label.exprId))
assert(bi.output.exists(_.exprId == tsStart.exprId))
}

test("multipart identifiers disambiguate same-name columns across a JOIN") {
val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)()
val t1End = AttributeReference("ts_end", TimestampType, nullable = true)()
Expand Down Expand Up @@ -330,9 +359,10 @@ class ResolveBinBySuite extends AnalysisTest {

val binBys = analyzed.collect { case b: BinBy => b }
assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}")
val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId))
assert(appendedExprIds.distinct.size == appendedExprIds.size,
"appended BinBy attributes must have distinct exprIds across the two join sides")
val producedExprIds = binBys.flatMap(b =>
(b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId))
assert(producedExprIds.distinct.size == producedExprIds.size,
"produced BinBy attributes must have distinct exprIds across the two join sides")
}

// `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly.
Expand Down