Skip to content

Commit 51653dd

Browse files
authored
feat: Native columnar to row conversion (Phase 2) (apache#3266)
1 parent 72496da commit 51653dd

8 files changed

Lines changed: 828 additions & 797 deletions

File tree

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.nio.channels.Channels
2626
import scala.jdk.CollectionConverters._
2727

2828
import org.apache.arrow.c.CDataDictionaryProvider
29-
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
29+
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
3030
import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
3131
import org.apache.arrow.vector.dictionary.DictionaryProvider
3232
import org.apache.arrow.vector.ipc.ArrowStreamWriter
@@ -288,7 +288,7 @@ object Utils extends CometTypeShim {
288288
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
289289
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
290290
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector |
291-
_: MapVector) =>
291+
_: MapVector | _: NullVector) =>
292292
v.asInstanceOf[FieldVector]
293293
case _ =>
294294
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")

native/core/src/execution/columnar_to_row.rs

Lines changed: 635 additions & 772 deletions
Large diffs are not rendered by default.

spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ package org.apache.comet.rules
2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.util.sideBySide
25-
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
25+
import org.apache.spark.sql.comet.{CometBatchScanExec, CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec}
2626
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
2727
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
2828
import org.apache.spark.sql.execution.adaptive.QueryStageExec
2929
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
3030

3131
import org.apache.comet.CometConf
32+
import org.apache.comet.parquet.CometParquetScan
3233

3334
// This rule is responsible for eliminating redundant transitions between row-based and
3435
// columnar-based operators for Comet. Currently, three potential redundant transitions are:
@@ -139,12 +140,39 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
139140
private def createColumnarToRowExec(child: SparkPlan): SparkPlan = {
140141
val schema = child.schema
141142
val useNative = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() &&
142-
CometNativeColumnarToRowExec.supportsSchema(schema)
143+
CometNativeColumnarToRowExec.supportsSchema(schema) &&
144+
!hasScanUsingMutableBuffers(child)
143145

144146
if (useNative) {
145147
CometNativeColumnarToRowExec(child)
146148
} else {
147149
CometColumnarToRowExec(child)
148150
}
149151
}
152+
153+
/**
154+
* Checks if the plan contains a scan that uses mutable buffers. Native C2R is not compatible
155+
* with such scans because the buffers may be modified after C2R reads them.
156+
*
157+
* This includes:
158+
* - CometScanExec with native_comet scan implementation (V1 path) - uses BatchReader
159+
* - CometScanExec with native_iceberg_compat and partition columns - uses
160+
* ConstantColumnReader
161+
* - CometBatchScanExec with CometParquetScan (V2 Parquet path) - uses BatchReader
162+
*/
163+
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
164+
op match {
165+
case c: QueryStageExec => hasScanUsingMutableBuffers(c.plan)
166+
case c: ReusedExchangeExec => hasScanUsingMutableBuffers(c.child)
167+
case _ =>
168+
op.exists {
169+
case scan: CometScanExec =>
170+
scan.scanImpl == CometConf.SCAN_NATIVE_COMET ||
171+
(scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
172+
scan.relation.partitionSchema.nonEmpty)
173+
case scan: CometBatchScanExec => scan.scan.isInstanceOf[CometParquetScan]
174+
case _ => false
175+
}
176+
}
177+
}
150178
}

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,25 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import org.apache.spark.TaskContext
22+
import java.util.UUID
23+
import java.util.concurrent.{Future, TimeoutException, TimeUnit}
24+
25+
import scala.concurrent.Promise
26+
import scala.util.control.NonFatal
27+
28+
import org.apache.spark.{broadcast, SparkException, TaskContext}
2329
import org.apache.spark.rdd.RDD
2430
import org.apache.spark.sql.catalyst.InternalRow
2531
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
2632
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
27-
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
33+
import org.apache.spark.sql.comet.util.{Utils => CometUtils}
34+
import org.apache.spark.sql.errors.QueryExecutionErrors
35+
import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan, SQLExecution}
36+
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
37+
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2838
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2939
import org.apache.spark.sql.types.StructType
30-
import org.apache.spark.util.Utils
40+
import org.apache.spark.util.{SparkFatalException, Utils}
3141

3242
import org.apache.comet.{CometConf, NativeColumnarToRowConverter}
3343

@@ -64,6 +74,116 @@ case class CometNativeColumnarToRowExec(child: SparkPlan)
6474
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
6575
"convertTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time in conversion"))
6676

77+
@transient
78+
private lazy val promise = Promise[broadcast.Broadcast[Any]]()
79+
80+
@transient
81+
private val timeout: Long = conf.broadcastTimeout
82+
83+
private val runId: UUID = UUID.randomUUID
84+
85+
private lazy val cometBroadcastExchange = findCometBroadcastExchange(child)
86+
87+
@transient
88+
lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
89+
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
90+
session,
91+
CometBroadcastExchangeExec.executionContext) {
92+
try {
93+
// Setup a job group here so later it may get cancelled by groupId if necessary.
94+
sparkContext.setJobGroup(
95+
runId.toString,
96+
s"CometNativeColumnarToRow broadcast exchange (runId $runId)",
97+
interruptOnCancel = true)
98+
99+
val numOutputRows = longMetric("numOutputRows")
100+
val numInputBatches = longMetric("numInputBatches")
101+
val localSchema = this.schema
102+
val batchSize = CometConf.COMET_BATCH_SIZE.get()
103+
val broadcastColumnar = child.executeBroadcast()
104+
val serializedBatches =
105+
broadcastColumnar.value.asInstanceOf[Array[org.apache.spark.util.io.ChunkedByteBuffer]]
106+
107+
// Use native converter to convert columnar data to rows
108+
val converter = new NativeColumnarToRowConverter(localSchema, batchSize)
109+
try {
110+
val rows = serializedBatches.iterator
111+
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
112+
.flatMap { batch =>
113+
numInputBatches += 1
114+
numOutputRows += batch.numRows()
115+
val result = converter.convert(batch)
116+
// Wrap iterator to close batch after consumption
117+
new Iterator[InternalRow] {
118+
override def hasNext: Boolean = {
119+
val hasMore = result.hasNext
120+
if (!hasMore) {
121+
batch.close()
122+
}
123+
hasMore
124+
}
125+
override def next(): InternalRow = result.next()
126+
}
127+
}
128+
129+
val mode = cometBroadcastExchange.get.mode
130+
val relation = mode.transform(rows, Some(numOutputRows.value))
131+
val broadcasted = sparkContext.broadcastInternal(relation, serializedOnly = true)
132+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
133+
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
134+
promise.trySuccess(broadcasted)
135+
broadcasted
136+
} finally {
137+
converter.close()
138+
}
139+
} catch {
140+
// SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw
141+
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult
142+
// will catch this exception and re-throw the wrapped fatal throwable.
143+
case oe: OutOfMemoryError =>
144+
val ex = new SparkFatalException(oe)
145+
promise.tryFailure(ex)
146+
throw ex
147+
case e if !NonFatal(e) =>
148+
val ex = new SparkFatalException(e)
149+
promise.tryFailure(ex)
150+
throw ex
151+
case e: Throwable =>
152+
promise.tryFailure(e)
153+
throw e
154+
}
155+
}
156+
}
157+
158+
override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
159+
if (cometBroadcastExchange.isEmpty) {
160+
throw new SparkException(
161+
"CometNativeColumnarToRowExec only supports doExecuteBroadcast when child contains a " +
162+
"CometBroadcastExchange, but got " + child)
163+
}
164+
165+
try {
166+
relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]]
167+
} catch {
168+
case ex: TimeoutException =>
169+
logError(s"Could not execute broadcast in $timeout secs.", ex)
170+
if (!relationFuture.isDone) {
171+
sparkContext.cancelJobGroup(runId.toString)
172+
relationFuture.cancel(true)
173+
}
174+
throw QueryExecutionErrors.executeBroadcastTimeoutError(timeout, Some(ex))
175+
}
176+
}
177+
178+
private def findCometBroadcastExchange(op: SparkPlan): Option[CometBroadcastExchangeExec] = {
179+
op match {
180+
case b: CometBroadcastExchangeExec => Some(b)
181+
case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan)
182+
case b: ReusedExchangeExec => findCometBroadcastExchange(b.child)
183+
case _ => op.children.collectFirst(Function.unlift(findCometBroadcastExchange))
184+
}
185+
}
186+
67187
override def doExecute(): RDD[InternalRow] = {
68188
val numOutputRows = longMetric("numOutputRows")
69189
val numInputBatches = longMetric("numInputBatches")
@@ -91,7 +211,17 @@ case class CometNativeColumnarToRowExec(child: SparkPlan)
91211
val result = converter.convert(batch)
92212
convertTime += System.nanoTime() - startTime
93213

94-
result
214+
// Wrap iterator to close batch after consumption
215+
new Iterator[InternalRow] {
216+
override def hasNext: Boolean = {
217+
val hasMore = result.hasNext
218+
if (!hasMore) {
219+
batch.close()
220+
}
221+
hasMore
222+
}
223+
override def next(): InternalRow = result.next()
224+
}
95225
}
96226
}
97227
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.Path
3030
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3131
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, TruncDate, TruncTimestamp}
3232
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
33-
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
34-
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
33+
import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometProjectExec}
34+
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
3535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3636
import org.apache.spark.sql.functions._
3737
import org.apache.spark.sql.internal.SQLConf
@@ -1020,11 +1020,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10201020
val query = sql(s"select cast(id as string) from $table")
10211021
val (_, cometPlan) = checkSparkAnswerAndOperator(query)
10221022
val project = cometPlan
1023-
.asInstanceOf[WholeStageCodegenExec]
1024-
.child
1025-
.asInstanceOf[CometColumnarToRowExec]
1026-
.child
1027-
.asInstanceOf[InputAdapter]
1023+
.asInstanceOf[CometNativeColumnarToRowExec]
10281024
.child
10291025
.asInstanceOf[CometProjectExec]
10301026
val id = project.expressions.head

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, He
3535
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, BloomFilterAggregate}
3636
import org.apache.spark.sql.comet._
3737
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
38-
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
38+
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SparkPlan, SQLExecution, UnionExec}
3939
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec}
4040
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4141
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
@@ -864,9 +864,11 @@ class CometExecSuite extends CometTestBase {
864864
checkSparkAnswerAndOperator(df)
865865

866866
// Before AQE: one CometBroadcastExchange, no CometColumnarToRow
867-
var columnarToRowExec = stripAQEPlan(df.queryExecution.executedPlan).collect {
868-
case s: CometColumnarToRowExec => s
869-
}
867+
var columnarToRowExec: Seq[SparkPlan] =
868+
stripAQEPlan(df.queryExecution.executedPlan).collect {
869+
case s: CometColumnarToRowExec => s
870+
case s: CometNativeColumnarToRowExec => s
871+
}
870872
assert(columnarToRowExec.isEmpty)
871873

872874
// Disable CometExecRule after the initial plan is generated. The CometSortMergeJoin and
@@ -880,14 +882,25 @@ class CometExecSuite extends CometTestBase {
880882
// After AQE: CometBroadcastExchange has to be converted to rows to conform to Spark
881883
// BroadcastHashJoin.
882884
val plan = stripAQEPlan(df.queryExecution.executedPlan)
883-
columnarToRowExec = plan.collect { case s: CometColumnarToRowExec =>
884-
s
885+
columnarToRowExec = plan.collect {
886+
case s: CometColumnarToRowExec => s
887+
case s: CometNativeColumnarToRowExec => s
885888
}
886889
assert(columnarToRowExec.length == 1)
887890

888-
// This ColumnarToRowExec should be the immediate child of BroadcastHashJoinExec
889-
val parent = plan.find(_.children.contains(columnarToRowExec.head))
890-
assert(parent.get.isInstanceOf[BroadcastHashJoinExec])
891+
// This ColumnarToRowExec should be a descendant of BroadcastHashJoinExec (possibly
892+
// wrapped by InputAdapter for codegen).
893+
val broadcastJoins = plan.collect { case b: BroadcastHashJoinExec => b }
894+
assert(broadcastJoins.nonEmpty, s"Expected BroadcastHashJoinExec in plan:\n$plan")
895+
val hasC2RDescendant = broadcastJoins.exists { join =>
896+
join.find {
897+
case _: CometColumnarToRowExec | _: CometNativeColumnarToRowExec => true
898+
case _ => false
899+
}.isDefined
900+
}
901+
assert(
902+
hasC2RDescendant,
903+
"BroadcastHashJoinExec should have a columnar-to-row descendant")
891904

892905
// There should be a CometBroadcastExchangeExec under CometColumnarToRowExec
893906
val broadcastQueryStage =

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ abstract class CometTestBase
8080
conf.set(CometConf.COMET_ONHEAP_ENABLED.key, "true")
8181
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
8282
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
83+
conf.set(CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key, "true")
8384
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
8485
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8586
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")

spark/src/test/scala/org/apache/spark/sql/comet/CometPlanChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ trait CometPlanChecker {
4646
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
4747
_: CometIcebergNativeScanExec =>
4848
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
49-
case _: CometColumnarToRowExec =>
49+
case _: CometColumnarToRowExec | _: CometNativeColumnarToRowExec =>
5050
case _: CometSparkToColumnarExec =>
5151
case _: CometExec | _: CometShuffleExchangeExec =>
5252
case _: CometBroadcastExchangeExec =>

0 commit comments

Comments
 (0)