@@ -739,10 +739,10 @@ index 9c529d14221..ab2850b5d68 100644
739739 }
740740diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
741741new file mode 100644
742- index 00000000000..5691536c114
742+ index 00000000000..4b31bea33de
743743--- /dev/null
744744+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
745- @@ -0,0 +1,45 @@
745+ @@ -0,0 +1,42 @@
746746+ /*
747747+ * Licensed to the Apache Software Foundation (ASF) under one or more
748748+ * contributor license agreements. See the NOTICE file distributed with
@@ -771,9 +771,6 @@ index 00000000000..5691536c114
771771+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
772772+ */
773773+ case class IgnoreComet(reason: String) extends Tag("DisableComet")
774- + case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet")
775- + case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
776- + case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
777774+
778775+ /**
779776+ * Helper trait that disables Comet for all tests regardless of default config values.
@@ -1199,7 +1196,7 @@ index 0df7f806272..92390bd819f 100644
11991196
12001197 test("non-matching optional group") {
12011198diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1202- index 2e33f6505ab..54f5081e10a 100644
1199+ index 2e33f6505ab..fc1a2c8f964 100644
12031200--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
12041201+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
12051202@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
@@ -1220,7 +1217,7 @@ index 2e33f6505ab..54f5081e10a 100644
12201217 _.asInstanceOf[FileScanRDD].filePartitions.forall(
12211218 _.files.forall(_.urlEncodedPath.contains("p=0"))))
12221219+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1223- + fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1220+ + fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
12241221+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
12251222+ fs.inputRDDs().forall(
12261223+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
@@ -2544,14 +2541,14 @@ index cd6f41b4ef4..4b6a17344bc 100644
25442541 ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
25452542 )
25462543diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
2547- index 6080a5e8e4b..f5dadef89ae 100644
2544+ index 6080a5e8e4b..23a451d5bcf 100644
25482545--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
25492546+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
25502547@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
25512548
25522549 import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
25532550 import org.apache.spark.sql._
2554- + import org.apache.spark.sql.IgnoreCometNativeScan
2551+ + import org.apache.spark.sql.IgnoreComet
25552552 import org.apache.spark.sql.catalyst.dsl.expressions._
25562553 import org.apache.spark.sql.catalyst.expressions._
25572554 import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
@@ -2574,7 +2571,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
25742571
25752572- test("Filters should be pushed down for vectorized Parquet reader at row group level") {
25762573+ test("Filters should be pushed down for vectorized Parquet reader at row group level",
2577- + IgnoreCometNativeScan ("Native scans do not support the tested accumulator")) {
2574+ + IgnoreComet ("Native scans do not support the tested accumulator")) {
25782575 import testImplicits._
25792576
25802577 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
@@ -2609,7 +2606,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26092606 }
26102607
26112608- test("filter pushdown - StringPredicate") {
2612- + test("filter pushdown - StringPredicate", IgnoreCometNativeScan ("cannot be pushed down")) {
2609+ + test("filter pushdown - StringPredicate", IgnoreComet ("cannot be pushed down")) {
26132610 import testImplicits._
26142611 // keep() should take effect on StartsWith/EndsWith/Contains
26152612 Seq(
@@ -2619,7 +2616,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26192616
26202617- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
26212618+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
2622- + IgnoreCometNativeScan ("Comet has different push-down behavior")) {
2619+ + IgnoreComet ("Comet has different push-down behavior")) {
26232620 val schema = StructType(Seq(
26242621 StructField("a", IntegerType, nullable = false)
26252622 ))
@@ -2666,7 +2663,7 @@ index 6080a5e8e4b..f5dadef89ae 100644
26662663
26672664- test("SPARK-34562: Bloom filter push down") {
26682665+ test("SPARK-34562: Bloom filter push down",
2669- + IgnoreCometNativeScan ("Native scans do not support the tested accumulator")) {
2666+ + IgnoreComet ("Native scans do not support the tested accumulator")) {
26702667 withTempPath { dir =>
26712668 val path = dir.getCanonicalPath
26722669 spark.range(100).selectExpr("id * 2 AS id")
@@ -2773,32 +2770,21 @@ index 30503af0fab..1491f4bc2d5 100644
27732770
27742771 import testImplicits._
27752772diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2776- index 08fd8a9ecb5..27aee839b8c 100644
2773+ index 08fd8a9ecb5..306958da489 100644
27772774--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
27782775+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2779- @@ -20,6 +20,7 @@ import java.io.File
2780-
2781- import scala.jdk.CollectionConverters._
2782-
2783- + import org.apache.comet.CometConf
2784- import org.apache.hadoop.fs.Path
2785- import org.apache.parquet.column.ParquetProperties._
2786- import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2787- @@ -27,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
2776+ @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
27882777
27892778 import org.apache.spark.SparkException
27902779 import org.apache.spark.sql.QueryTest
2791- + import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec }
2780+ + import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec}
27922781 import org.apache.spark.sql.execution.FileSourceScanExec
27932782 import org.apache.spark.sql.execution.datasources.FileFormat
27942783 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2795- @@ -245,6 +247,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2784+ @@ -245,6 +246,14 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
27962785 case f: FileSourceScanExec =>
27972786 numPartitions += f.inputRDD.partitions.length
27982787 numOutputRows += f.metrics("numOutputRows").value
2799- + case b: CometScanExec =>
2800- + numPartitions += b.inputRDD.partitions.length
2801- + numOutputRows += b.metrics("numOutputRows").value
28022788+ case b: CometBatchScanExec =>
28032789+ numPartitions += b.inputRDD.partitions.length
28042790+ numOutputRows += b.metrics("numOutputRows").value
@@ -2810,16 +2796,13 @@ index 08fd8a9ecb5..27aee839b8c 100644
28102796 case _ =>
28112797 }
28122798 assert(numPartitions > 0)
2813- @@ -303,6 +316,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2799+ @@ -303,6 +312,9 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
28142800 val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
28152801
28162802 test(s"invalid row index column type - ${conf.desc}") {
28172803+ // https://github.com/apache/datafusion-comet/issues/3886
28182804+ // Comet throws RuntimeException instead of SparkException
2819- + assume(!Seq(
2820- + CometConf.SCAN_NATIVE_DATAFUSION,
2821- + CometConf.SCAN_AUTO
2822- + ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
2805+ + assume(false)
28232806 withSQLConf(conf.sqlConfs: _*) {
28242807 withTempPath{ path =>
28252808 val df = spark.range(0, 10, 1, 1).toDF("id")
@@ -2844,15 +2827,15 @@ index 5c0b7def039..151184bc98c 100644
28442827 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
28452828 s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
28462829diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
2847- index 0acb21f3e6f..e7c65429119 100644
2830+ index 0acb21f3e6f..15bd866d8aa 100644
28482831--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
28492832+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
28502833@@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
28512834 import org.apache.parquet.schema.Type._
28522835
28532836 import org.apache.spark.SparkException
28542837- import org.apache.spark.sql.{AnalysisException, Row}
2855- + import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row}
2838+ + import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
28562839 import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
28572840 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
28582841 import org.apache.spark.sql.functions.desc
@@ -2872,7 +2855,7 @@ index 0acb21f3e6f..e7c65429119 100644
28722855
28732856- test("schema mismatch failure error message for parquet vectorized reader") {
28742857+ test("schema mismatch failure error message for parquet vectorized reader",
2875- + IgnoreCometNativeDataFusion ("https://github.com/apache/datafusion-comet/issues/4316")) {
2858+ + IgnoreComet ("https://github.com/apache/datafusion-comet/issues/4316")) {
28762859 withTempPath { dir =>
28772860 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
28782861 assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
@@ -3460,69 +3443,39 @@ index 86c4e49f6f6..2e639e5f38d 100644
34603443 val tblTargetName = "tbl_target"
34613444 val tblSourceQualified = s"default.$tblSourceName"
34623445diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3463- index f0f3f94b811..be5e113c3ed 100644
3446+ index f0f3f94b811..b7d18771314 100644
34643447--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
34653448+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
3466- @@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
3467- import scala.language.implicitConversions
3468- import scala.util.control.NonFatal
3469-
3470- + import org.apache.comet.CometConf
3471- import org.apache.hadoop.fs.Path
3472- import org.scalactic.source.Position
3473- import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
3449+ @@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, Suite, Tag}
34743450 import org.scalatest.concurrent.Eventually
34753451
34763452 import org.apache.spark.SparkFunSuite
34773453- import org.apache.spark.sql.{AnalysisException, Row}
3478- + import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, IgnoreCometNativeIcebergCompat, IgnoreCometNativeScan, Row}
3454+ + import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
34793455 import org.apache.spark.sql.catalyst.FunctionIdentifier
34803456 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
34813457 import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
3482- @@ -42,6 +43 ,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
3458+ @@ -42,6 +42 ,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTestBase
34833459 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
34843460 import org.apache.spark.sql.catalyst.util._
34853461 import org.apache.spark.sql.classic.{ClassicConversions, ColumnConversions, ColumnNodeToExpressionConverter, DataFrame, Dataset, SparkSession, SQLImplicits}
34863462+ import org.apache.spark.sql.comet.{CometFilterExec, CometProjectExec}
34873463 import org.apache.spark.sql.execution.FilterExec
34883464 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
34893465 import org.apache.spark.sql.execution.datasources.DataSourceUtils
3490- @@ -121,6 +123,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
3466+ @@ -121,6 +122,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
34913467
34923468 override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
34933469 (implicit pos: Position): Unit = {
34943470+ // Check Comet skip tags first, before DisableAdaptiveExecution handling
34953471+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
34963472+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
34973473+ return
3498- + }
3499- + if (isCometEnabled) {
3500- + val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
3501- + val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
3502- + cometScanImpl == CometConf.SCAN_AUTO
3503- + val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
3504- + cometScanImpl == CometConf.SCAN_AUTO
3505- + if (isNativeIcebergCompat &&
3506- + testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
3507- + ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
3508- + return
3509- + }
3510- + if (isNativeDataFusion &&
3511- + testTags.exists(_.isInstanceOf[IgnoreCometNativeDataFusion])) {
3512- + ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
3513- + return
3514- + }
3515- + if ((isNativeDataFusion || isNativeIcebergCompat) &&
3516- + testTags.exists(_.isInstanceOf[IgnoreCometNativeScan])) {
3517- + ignore(testName + " (disabled for NATIVE_DATAFUSION and NATIVE_ICEBERG_COMPAT)",
3518- + testTags: _*)(testFun)
3519- + return
3520- + }
35213474+ }
35223475 if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
35233476 super.test(testName, testTags: _*) {
35243477 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
3525- @@ -248,8 +278 ,15 @@ private[sql] trait SQLTestUtilsBase
3478+ @@ -248,8 +254 ,15 @@ private[sql] trait SQLTestUtilsBase
35263479 override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
35273480 }
35283481
@@ -3538,7 +3491,7 @@ index f0f3f94b811..be5e113c3ed 100644
35383491 super.withSQLConf(pairs: _*)(f)
35393492 }
35403493
3541- @@ -451,6 +488 ,8 @@ private[sql] trait SQLTestUtilsBase
3494+ @@ -451,6 +464 ,8 @@ private[sql] trait SQLTestUtilsBase
35423495 val schema = df.schema
35433496 val withoutFilters = df.queryExecution.executedPlan.transform {
35443497 case FilterExec(_, child) => child
0 commit comments