Skip to content

Commit 4c62d35

Browse files
committed
chore: update Spark diffs after scan-impl constant removal
Updates the Spark test diffs to compile against the trimmed CometConf API and the new 10-arg CometScanExec signature: - Drop references to COMET_NATIVE_SCAN_IMPL, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, and SCAN_AUTO in patched Spark sources. - Fix the CometScanExec extractor pattern in SubquerySuite to use 10 placeholders now that scanImpl is gone. - Collapse the SQLTestUtils per-impl branching to a single check for IgnoreCometNativeDataFusion / IgnoreCometNativeScan, since native_datafusion is the only Parquet scan impl. - Remove the unused IgnoreCometNativeIcebergCompat tag.
1 parent 7f7c414 commit 4c62d35

4 files changed

Lines changed: 80 additions & 216 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -608,10 +608,10 @@ index 2796b1cf154..53dcfde932e 100644
608608
}
609609
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
610610
new file mode 100644
611-
index 00000000000..5691536c114
611+
index 00000000000..c528360742a
612612
--- /dev/null
613613
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
614-
@@ -0,0 +1,45 @@
614+
@@ -0,0 +1,44 @@
615615
+/*
616616
+ * Licensed to the Apache Software Foundation (ASF) under one or more
617617
+ * contributor license agreements. See the NOTICE file distributed with
@@ -640,7 +640,6 @@ index 00000000000..5691536c114
640640
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
641641
+ */
642642
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
643-
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
644643
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
645644
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
646645
+
@@ -1040,7 +1039,7 @@ index 18123a4d6ec..0fe185baa33 100644
10401039

10411040
test("non-matching optional group") {
10421041
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1043-
index 75eabcb96f2..7a681f147e4 100644
1042+
index 75eabcb96f2..f8141c28f60 100644
10441043
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10451044
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10461045
@@ -21,10 +21,11 @@ import scala.collection.mutable.ArrayBuffer
@@ -1061,7 +1060,7 @@ index 75eabcb96f2..7a681f147e4 100644
10611060
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10621061
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10631062
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1064-
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1063+
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10651064
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10661065
+ fs.inputRDDs().forall(
10671066
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
@@ -2864,26 +2863,18 @@ index abe606ad9c1..2d930b64cca 100644
28642863
val tblTargetName = "tbl_target"
28652864
val tblSourceQualified = s"default.$tblSourceName"
28662865
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2867-
index dd55fcfe42c..cd18a23d4de 100644
2866+
index dd55fcfe42c..d4f94b1d608 100644
28682867
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28692868
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2870-
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
2871-
import scala.language.implicitConversions
2872-
import scala.util.control.NonFatal
2873-
2874-
+import org.apache.comet.CometConf
2875-
import org.apache.hadoop.fs.Path
2876-
import org.scalactic.source.Position
2877-
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
2878-
@@ -41,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2869+
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
28792870
import org.apache.spark.sql.catalyst.plans.PlanTestBase
28802871
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28812872
import org.apache.spark.sql.catalyst.util._
28822873
+import org.apache.spark.sql.comet._
28832874
import org.apache.spark.sql.execution.FilterExec
28842875
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
28852876
import org.apache.spark.sql.execution.datasources.DataSourceUtils
2886-
@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
2877+
@@ -119,6 +120,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
28872878

28882879
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
28892880
(implicit pos: Position): Unit = {
@@ -2892,33 +2883,16 @@ index dd55fcfe42c..cd18a23d4de 100644
28922883
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
28932884
+ return
28942885
+ }
2895-
+ if (isCometEnabled) {
2896-
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
2897-
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
2898-
+ cometScanImpl == CometConf.SCAN_AUTO
2899-
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
2900-
+ cometScanImpl == CometConf.SCAN_AUTO
2901-
+ if (isNativeIcebergCompat &&
2902-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
2903-
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
2904-
+ return
2905-
+ }
2906-
+ if (isNativeDataFusion &&
2907-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
2908-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
2909-
+ return
2910-
+ }
2911-
+ if ((isNativeDataFusion || isNativeIcebergCompat) &&
2912-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
2913-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
2914-
+ testTags: _*)(testFun)
2915-
+ return
2916-
+ }
2886+
+ if (isCometEnabled &&
2887+
+ testTags.exists(t =>
2888+
+ t.isInstanceOf[IgnoreCometNativeDataFusion] || t.isInstanceOf[IgnoreCometNativeScan])) {
2889+
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
2890+
+ return
29172891
+ }
29182892
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
29192893
super.test(testName, testTags: _*) {
29202894
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2921-
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
2895+
@@ -242,6 +254,11 @@ private[sql] trait SQLTestUtilsBase
29222896
protected override def _sqlContext: SQLContext = self.spark.sqlContext
29232897
}
29242898

@@ -2930,7 +2904,7 @@ index dd55fcfe42c..cd18a23d4de 100644
29302904
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
29312905
SparkSession.setActiveSession(spark)
29322906
super.withSQLConf(pairs: _*)(f)
2933-
@@ -434,6 +469,8 @@ private[sql] trait SQLTestUtilsBase
2907+
@@ -434,6 +451,8 @@ private[sql] trait SQLTestUtilsBase
29342908
val schema = df.schema
29352909
val withoutFilters = df.queryExecution.executedPlan.transform {
29362910
case FilterExec(_, child) => child

dev/diffs/3.5.8.diff

Lines changed: 24 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -503,39 +503,27 @@ index a206e97c353..fea1149b67d 100644
503503

504504
test("SPARK-35884: Explain Formatted") {
505505
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
506-
index 93275487f29..78150c9163e 100644
506+
index 93275487f29..a5208b8d54b 100644
507507
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
508508
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
509-
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
510-
511-
import scala.collection.mutable
512-
513-
+import org.apache.comet.CometConf
514-
import org.apache.hadoop.conf.Configuration
515-
import org.apache.hadoop.fs.{LocalFileSystem, Path}
516-
517-
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
509+
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
518510
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
519511
import org.apache.spark.sql.catalyst.plans.logical.Filter
520512
import org.apache.spark.sql.catalyst.types.DataTypeUtils
521513
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
522514
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
523515
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
524516
import org.apache.spark.sql.execution.datasources.FilePartition
525-
@@ -250,6 +252,12 @@ class FileBasedDataSourceSuite extends QueryTest
517+
@@ -250,6 +251,8 @@ class FileBasedDataSourceSuite extends QueryTest
526518
case "" => "_LEGACY_ERROR_TEMP_2062"
527519
case _ => "_LEGACY_ERROR_TEMP_2055"
528520
}
529-
+ // native_datafusion Parquet scan cannot throw
530-
+ // a SparkFileNotFoundException
531-
+ assume(!Seq(
532-
+ CometConf.SCAN_NATIVE_DATAFUSION,
533-
+ CometConf.SCAN_AUTO
534-
+ ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
521+
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
522+
+ assume(false)
535523
checkErrorMatchPVals(
536524
exception = intercept[SparkException] {
537525
testIgnoreMissingFiles(options)
538-
@@ -656,18 +664,25 @@ class FileBasedDataSourceSuite extends QueryTest
526+
@@ -656,18 +659,25 @@ class FileBasedDataSourceSuite extends QueryTest
539527
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
540528

541529
// RuntimeException is triggered at executor side, which is then wrapped as
@@ -568,31 +556,31 @@ index 93275487f29..78150c9163e 100644
568556
errorClass = "_LEGACY_ERROR_TEMP_2093",
569557
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
570558
)
571-
@@ -955,6 +970,7 @@ class FileBasedDataSourceSuite extends QueryTest
559+
@@ -955,6 +965,7 @@ class FileBasedDataSourceSuite extends QueryTest
572560
assert(bJoinExec.isEmpty)
573561
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
574562
case smJoin: SortMergeJoinExec => smJoin
575563
+ case smJoin: CometSortMergeJoinExec => smJoin
576564
}
577565
assert(smJoinExec.nonEmpty)
578566
}
579-
@@ -1015,6 +1031,7 @@ class FileBasedDataSourceSuite extends QueryTest
567+
@@ -1015,6 +1026,7 @@ class FileBasedDataSourceSuite extends QueryTest
580568

581569
val fileScan = df.queryExecution.executedPlan collectFirst {
582570
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
583571
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
584572
}
585573
assert(fileScan.nonEmpty)
586574
assert(fileScan.get.partitionFilters.nonEmpty)
587-
@@ -1056,6 +1073,7 @@ class FileBasedDataSourceSuite extends QueryTest
575+
@@ -1056,6 +1068,7 @@ class FileBasedDataSourceSuite extends QueryTest
588576

589577
val fileScan = df.queryExecution.executedPlan collectFirst {
590578
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
591579
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
592580
}
593581
assert(fileScan.nonEmpty)
594582
assert(fileScan.get.partitionFilters.isEmpty)
595-
@@ -1240,6 +1258,9 @@ class FileBasedDataSourceSuite extends QueryTest
583+
@@ -1240,6 +1253,9 @@ class FileBasedDataSourceSuite extends QueryTest
596584
val filters = df.queryExecution.executedPlan.collect {
597585
case f: FileSourceScanLike => f.dataFilters
598586
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -604,10 +592,10 @@ index 93275487f29..78150c9163e 100644
604592
}
605593
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
606594
new file mode 100644
607-
index 00000000000..1ee842b6f62
595+
index 00000000000..903e6b1342a
608596
--- /dev/null
609597
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
610-
@@ -0,0 +1,45 @@
598+
@@ -0,0 +1,44 @@
611599
+/*
612600
+ * Licensed to the Apache Software Foundation (ASF) under one or more
613601
+ * contributor license agreements. See the NOTICE file distributed with
@@ -636,7 +624,6 @@ index 00000000000..1ee842b6f62
636624
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
637625
+ */
638626
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
639-
+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
640627
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
641628
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
642629
+
@@ -992,7 +979,7 @@ index 8b4ac474f87..3f79f20822f 100644
992979
extensions.injectColumnar(session =>
993980
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) }
994981
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
995-
index 04702201f82..a4b5c6c93ce 100644
982+
index 04702201f82..c5ab5443ff7 100644
996983
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
997984
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
998985
@@ -22,10 +22,11 @@ import scala.collection.mutable.ArrayBuffer
@@ -1013,7 +1000,7 @@ index 04702201f82..a4b5c6c93ce 100644
10131000
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10141001
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10151002
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1016-
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1003+
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10171004
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10181005
+ fs.inputRDDs().forall(
10191006
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
@@ -2824,26 +2811,18 @@ index abe606ad9c1..2d930b64cca 100644
28242811
val tblTargetName = "tbl_target"
28252812
val tblSourceQualified = s"default.$tblSourceName"
28262813
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2827-
index e937173a590..3134078a122 100644
2814+
index e937173a590..5fa7207a3d9 100644
28282815
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28292816
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2830-
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
2831-
import scala.language.implicitConversions
2832-
import scala.util.control.NonFatal
2833-
2834-
+import org.apache.comet.CometConf
2835-
import org.apache.hadoop.fs.Path
2836-
import org.scalactic.source.Position
2837-
import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
2838-
@@ -41,6 +42,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2817+
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
28392818
import org.apache.spark.sql.catalyst.plans.PlanTestBase
28402819
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28412820
import org.apache.spark.sql.catalyst.util._
28422821
+import org.apache.spark.sql.comet._
28432822
import org.apache.spark.sql.execution.FilterExec
28442823
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
28452824
import org.apache.spark.sql.execution.datasources.DataSourceUtils
2846-
@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
2825+
@@ -119,6 +120,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
28472826

28482827
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
28492828
(implicit pos: Position): Unit = {
@@ -2852,33 +2831,16 @@ index e937173a590..3134078a122 100644
28522831
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
28532832
+ return
28542833
+ }
2855-
+ if (isCometEnabled) {
2856-
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
2857-
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
2858-
+ cometScanImpl == CometConf.SCAN_AUTO
2859-
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
2860-
+ cometScanImpl == CometConf.SCAN_AUTO
2861-
+ if (isNativeIcebergCompat &&
2862-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
2863-
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
2864-
+ return
2865-
+ }
2866-
+ if (isNativeDataFusion &&
2867-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
2868-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
2869-
+ return
2870-
+ }
2871-
+ if ((isNativeDataFusion || isNativeIcebergCompat) &&
2872-
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
2873-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
2874-
+ testTags: _*)(testFun)
2875-
+ return
2876-
+ }
2834+
+ if (isCometEnabled &&
2835+
+ testTags.exists(t =>
2836+
+ t.isInstanceOf[IgnoreCometNativeDataFusion] || t.isInstanceOf[IgnoreCometNativeScan])) {
2837+
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
2838+
+ return
28772839
+ }
28782840
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
28792841
super.test(testName, testTags: _*) {
28802842
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2881-
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
2843+
@@ -242,6 +254,11 @@ private[sql] trait SQLTestUtilsBase
28822844
protected override def _sqlContext: SQLContext = self.spark.sqlContext
28832845
}
28842846

@@ -2890,7 +2852,7 @@ index e937173a590..3134078a122 100644
28902852
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
28912853
SparkSession.setActiveSession(spark)
28922854
super.withSQLConf(pairs: _*)(f)
2893-
@@ -435,6 +470,8 @@ private[sql] trait SQLTestUtilsBase
2855+
@@ -435,6 +452,8 @@ private[sql] trait SQLTestUtilsBase
28942856
val schema = df.schema
28952857
val withoutFilters = df.queryExecution.executedPlan.transform {
28962858
case FilterExec(_, child) => child

0 commit comments

Comments
 (0)