Skip to content

Commit 22fdec9

Browse files
authored
feat: Enable native columnar-to-row by default (#3299)
1 parent adc5013 commit 22fdec9

5 files changed

Lines changed: 67 additions & 35 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ object CometConf extends ShimCometConf {
304304
"Whether to enable native columnar to row conversion. When enabled, Comet will use " +
305305
"native Rust code to convert Arrow columnar data to Spark UnsafeRow format instead " +
306306
"of the JVM implementation. This can improve performance for queries that need to " +
307-
"convert between columnar and row formats. This is an experimental feature.")
307+
"convert between columnar and row formats.")
308308
.booleanConf
309-
.createWithDefault(false)
309+
.createWithDefault(true)
310310

311311
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
312312
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")

dev/diffs/3.4.3.diff

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index d3544881af1..07d1ed97925 100644
2+
index d3544881af1..9c16099090c 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -1434,25 +1434,26 @@ index eec396b2e39..bf3f1c769d6 100644
14341434
nums.createOrReplaceTempView("nums")
14351435

14361436
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
1437-
index b14f4a405f6..ab7baf434a5 100644
1437+
index b14f4a405f6..90bed10eca9 100644
14381438
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
14391439
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
14401440
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
14411441
import org.apache.spark.sql.catalyst.InternalRow
14421442
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
14431443
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
1444-
+import org.apache.spark.sql.comet.CometColumnarToRowExec
1444+
+import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometNativeColumnarToRowExec}
14451445
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
14461446
import org.apache.spark.sql.internal.SQLConf
14471447
import org.apache.spark.sql.test.SharedSparkSession
1448-
@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
1448+
@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
14491449
spark.range(1).write.parquet(path.getAbsolutePath)
14501450
val df = spark.read.parquet(path.getAbsolutePath)
14511451
val columnarToRowExec =
14521452
- df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
14531453
+ df.queryExecution.executedPlan.collectFirst {
14541454
+ case p: ColumnarToRowExec => p
14551455
+ case p: CometColumnarToRowExec => p
1456+
+ case p: CometNativeColumnarToRowExec => p
14561457
+ }.get
14571458
try {
14581459
spark.range(1).foreach { _ =>
@@ -2384,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
23842385
import testImplicits._
23852386

23862387
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2387-
index 266bb343526..6675cf7b636 100644
2388+
index 266bb343526..e58a2f49eb9 100644
23882389
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23892390
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23902391
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 266bb343526..6675cf7b636 100644
24412442

24422443
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
24432444
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2444-
@@ -451,28 +463,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2445+
@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24452446
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
24462447
val executedPlan =
24472448
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 266bb343526..6675cf7b636 100644
24722473
+ case s: SortMergeJoinExec => s
24732474
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
24742475
+ }
2476+
+ case CometNativeColumnarToRowExec(child) =>
2477+
+ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
2478+
+ case s: SortMergeJoinExec => s
2479+
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
2480+
+ }
24752481
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
24762482
+ }
24772483
}
@@ -2499,7 +2505,7 @@ index 266bb343526..6675cf7b636 100644
24992505
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
25002506

25012507
// check the output partitioning
2502-
@@ -835,11 +868,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2508+
@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25032509
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
25042510

25052511
val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 266bb343526..6675cf7b636 100644
25132519
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
25142520
}
25152521
}
2516-
@@ -894,7 +927,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2522+
@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25172523
}
25182524

25192525
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
@@ -2524,7 +2530,7 @@ index 266bb343526..6675cf7b636 100644
25242530
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
25252531
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
25262532
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2527-
@@ -913,7 +949,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2533+
@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25282534
}
25292535

25302536
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
@@ -2535,7 +2541,7 @@ index 266bb343526..6675cf7b636 100644
25352541
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
25362542
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
25372543

2538-
@@ -943,7 +982,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2544+
@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25392545
}
25402546

25412547
test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 266bb343526..6675cf7b636 100644
25462552
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
25472553
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
25482554
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2549-
@@ -1026,15 +1068,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2555+
@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25502556
expectedNumShuffles: Int,
25512557
expectedCoalescedNumBuckets: Option[Int]): Unit = {
25522558
val plan = sql(query).queryExecution.executedPlan

dev/diffs/3.5.7.diff

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index a0e25ce4d8d..29d3b93f994 100644
2+
index a0e25ce4d8d..b95fba458f2 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -152,6 +152,8 @@
@@ -1402,25 +1402,26 @@ index eec396b2e39..bf3f1c769d6 100644
14021402
nums.createOrReplaceTempView("nums")
14031403

14041404
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
1405-
index b14f4a405f6..ab7baf434a5 100644
1405+
index b14f4a405f6..90bed10eca9 100644
14061406
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
14071407
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
14081408
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
14091409
import org.apache.spark.sql.catalyst.InternalRow
14101410
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
14111411
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate
1412-
+import org.apache.spark.sql.comet.CometColumnarToRowExec
1412+
+import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometNativeColumnarToRowExec}
14131413
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
14141414
import org.apache.spark.sql.internal.SQLConf
14151415
import org.apache.spark.sql.test.SharedSparkSession
1416-
@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
1416+
@@ -131,7 +132,11 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
14171417
spark.range(1).write.parquet(path.getAbsolutePath)
14181418
val df = spark.read.parquet(path.getAbsolutePath)
14191419
val columnarToRowExec =
14201420
- df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
14211421
+ df.queryExecution.executedPlan.collectFirst {
14221422
+ case p: ColumnarToRowExec => p
14231423
+ case p: CometColumnarToRowExec => p
1424+
+ case p: CometNativeColumnarToRowExec => p
14241425
+ }.get
14251426
try {
14261427
spark.range(1).foreach { _ =>
@@ -2383,7 +2384,7 @@ index d083cac48ff..3c11bcde807 100644
23832384
import testImplicits._
23842385

23852386
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2386-
index 746f289c393..a90106a1463 100644
2387+
index 746f289c393..a773971d3c1 100644
23872388
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23882389
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
23892390
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2441,7 +2442,7 @@ index 746f289c393..a90106a1463 100644
24412442

24422443
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
24432444
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2444-
@@ -452,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2445+
@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24452446
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
24462447
val executedPlan =
24472448
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2472,6 +2473,11 @@ index 746f289c393..a90106a1463 100644
24722473
+ case s: SortMergeJoinExec => s
24732474
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
24742475
+ }
2476+
+ case CometNativeColumnarToRowExec(child) =>
2477+
+ child.asInstanceOf[CometSortMergeJoinExec].originalPlan match {
2478+
+ case s: SortMergeJoinExec => s
2479+
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
2480+
+ }
24752481
+ case o => fail(s"expected SortMergeJoinExec, but found\n$o")
24762482
+ }
24772483
}
@@ -2499,7 +2505,7 @@ index 746f289c393..a90106a1463 100644
24992505
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
25002506

25012507
// check the output partitioning
2502-
@@ -836,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2508+
@@ -836,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25032509
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
25042510

25052511
val scanDF = spark.table("bucketed_table").select("j")
@@ -2513,7 +2519,7 @@ index 746f289c393..a90106a1463 100644
25132519
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
25142520
}
25152521
}
2516-
@@ -895,7 +928,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2522+
@@ -895,7 +933,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25172523
}
25182524

25192525
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
@@ -2524,7 +2530,7 @@ index 746f289c393..a90106a1463 100644
25242530
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
25252531
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
25262532
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2527-
@@ -914,7 +950,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2533+
@@ -914,7 +955,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25282534
}
25292535

25302536
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
@@ -2535,7 +2541,7 @@ index 746f289c393..a90106a1463 100644
25352541
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
25362542
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
25372543

2538-
@@ -944,7 +983,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2544+
@@ -944,7 +988,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25392545
}
25402546

25412547
test("bucket coalescing eliminates shuffle") {
@@ -2546,7 +2552,7 @@ index 746f289c393..a90106a1463 100644
25462552
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
25472553
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
25482554
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2549-
@@ -1029,15 +1071,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2555+
@@ -1029,15 +1076,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
25502556
Seq(true, false).foreach { aqeEnabled =>
25512557
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) {
25522558
val plan = sql(query).queryExecution.executedPlan

0 commit comments

Comments
 (0)