Skip to content

Commit 7b725b5

Browse files
committed
chore: collapse IgnoreCometNativeDataFusion/Scan into IgnoreComet
With native_datafusion as the only Parquet scan implementation, the per-impl tag variants are equivalent to the plain IgnoreComet tag. - Replace all IgnoreCometNativeDataFusion and IgnoreCometNativeScan call sites with IgnoreComet. - Drop the corresponding case-class definitions from IgnoreComet.scala. - Remove the now-redundant secondary check in SQLTestUtils; the existing IgnoreComet check handles all usages.
1 parent 4c62d35 commit 7b725b5

4 files changed

Lines changed: 72 additions & 104 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -608,10 +608,10 @@ index 2796b1cf154..53dcfde932e 100644
608608
}
609609
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
610610
new file mode 100644
611-
index 00000000000..c528360742a
611+
index 00000000000..4b31bea33de
612612
--- /dev/null
613613
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala
614-
@@ -0,0 +1,44 @@
614+
@@ -0,0 +1,42 @@
615615
+/*
616616
+ * Licensed to the Apache Software Foundation (ASF) under one or more
617617
+ * contributor license agreements. See the NOTICE file distributed with
@@ -640,8 +640,6 @@ index 00000000000..c528360742a
640640
+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`).
641641
+ */
642642
+case class IgnoreComet(reason: String) extends Tag("DisableComet")
643-
+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet")
644-
+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet")
645643
+
646644
+/**
647645
+ * Helper trait that disables Comet for all tests regardless of default config values.
@@ -1968,14 +1966,14 @@ index 07e2849ce6f..3e73645b638 100644
19681966
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19691967
)
19701968
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1971-
index 104b4e416cd..4adb273170a 100644
1969+
index 104b4e416cd..835aaa18e39 100644
19721970
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19731971
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19741972
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
19751973

19761974
import org.apache.spark.{SparkConf, SparkException}
19771975
import org.apache.spark.sql._
1978-
+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, IgnoreCometNativeScan}
1976+
+import org.apache.spark.sql.IgnoreComet
19791977
import org.apache.spark.sql.catalyst.dsl.expressions._
19801978
import org.apache.spark.sql.catalyst.expressions._
19811979
import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
@@ -1998,7 +1996,7 @@ index 104b4e416cd..4adb273170a 100644
19981996

19991997
- test("Filters should be pushed down for vectorized Parquet reader at row group level") {
20001998
+ test("Filters should be pushed down for vectorized Parquet reader at row group level",
2001-
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
1999+
+ IgnoreComet("Native scans do not support the tested accumulator")) {
20022000
import testImplicits._
20032001

20042002
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
@@ -2034,7 +2032,7 @@ index 104b4e416cd..4adb273170a 100644
20342032

20352033
- test("filter pushdown - StringPredicate") {
20362034
+ test("filter pushdown - StringPredicate",
2037-
+ IgnoreCometNativeDataFusion("cannot be pushed down")) {
2035+
+ IgnoreComet("cannot be pushed down")) {
20382036
import testImplicits._
20392037
// keep() should take effect on StartsWith/EndsWith/Contains
20402038
Seq(
@@ -2044,7 +2042,7 @@ index 104b4e416cd..4adb273170a 100644
20442042

20452043
- test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {
20462044
+ test("SPARK-17091: Convert IN predicate to Parquet filter push-down",
2047-
+ IgnoreCometNativeScan("Comet has different push-down behavior")) {
2045+
+ IgnoreComet("Comet has different push-down behavior")) {
20482046
val schema = StructType(Seq(
20492047
StructField("a", IntegerType, nullable = false)
20502048
))
@@ -2089,7 +2087,7 @@ index 104b4e416cd..4adb273170a 100644
20892087

20902088
- test("SPARK-34562: Bloom filter push down") {
20912089
+ test("SPARK-34562: Bloom filter push down",
2092-
+ IgnoreCometNativeScan("Native scans do not support the tested accumulator")) {
2090+
+ IgnoreComet("Native scans do not support the tested accumulator")) {
20932091
withTempPath { dir =>
20942092
val path = dir.getCanonicalPath
20952093
spark.range(100).selectExpr("id * 2 AS id")
@@ -2134,14 +2132,14 @@ index 8670d95c65e..b624c3811dd 100644
21342132
checkAnswer(
21352133
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
21362134
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2137-
index 29cb224c878..dcb8a0e9bef 100644
2135+
index 29cb224c878..62e3ab96004 100644
21382136
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21392137
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21402138
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
21412139

21422140
import org.apache.spark.{DebugFilesystem, SparkConf, SparkException}
21432141
import org.apache.spark.sql._
2144-
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
2142+
+import org.apache.spark.sql.IgnoreComet
21452143
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
21462144
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
21472145
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -2151,7 +2149,7 @@ index 29cb224c878..dcb8a0e9bef 100644
21512149

21522150
- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
21532151
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
2154-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) {
2152+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4219")) {
21552153
val data = (1 to 1000).map { i =>
21562154
val ts = new java.sql.Timestamp(i)
21572155
Row(ts)
@@ -2161,7 +2159,7 @@ index 29cb224c878..dcb8a0e9bef 100644
21612159

21622160
- test("SPARK-26677: negated null-safe equality comparison should not filter matched row groups") {
21632161
+ test("SPARK-26677: negated null-safe equality comparison should not filter matched row groups",
2164-
+ IgnoreCometNativeScan("Native scans had the filter pushed into DF operator, cannot strip")) {
2162+
+ IgnoreComet("Native scans had the filter pushed into DF operator, cannot strip")) {
21652163
withAllParquetReaders {
21662164
withTempPath { path =>
21672165
// Repeated values for dictionary encoding.
@@ -2171,7 +2169,7 @@ index 29cb224c878..dcb8a0e9bef 100644
21712169

21722170
- test("SPARK-34212 Parquet should read decimals correctly") {
21732171
+ test("SPARK-34212 Parquet should read decimals correctly",
2174-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) {
2172+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4354")) {
21752173
def readParquet(schema: String, path: File): DataFrame = {
21762174
spark.read.schema(schema).parquet(path.toString)
21772175
}
@@ -2201,7 +2199,7 @@ index 29cb224c878..dcb8a0e9bef 100644
22012199

22022200
- test("row group skipping doesn't overflow when reading into larger type") {
22032201
+ test("row group skipping doesn't overflow when reading into larger type",
2204-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4354")) {
2202+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4354")) {
22052203
withTempPath { path =>
22062204
Seq(0).toDF("a").write.parquet(path.toString)
22072205
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
@@ -2290,14 +2288,14 @@ index 5c0b7def039..151184bc98c 100644
22902288
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
22912289
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
22922290
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
2293-
index bf5c51b89bb..7e143a0e0f9 100644
2291+
index bf5c51b89bb..f7402b7d883 100644
22942292
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
22952293
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
22962294
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
22972295
import org.apache.parquet.schema.Type._
22982296

22992297
import org.apache.spark.SparkException
2300-
+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
2298+
+import org.apache.spark.sql.IgnoreComet
23012299
import org.apache.spark.sql.catalyst.ScalaReflection
23022300
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
23032301
import org.apache.spark.sql.functions.desc
@@ -2317,7 +2315,7 @@ index bf5c51b89bb..7e143a0e0f9 100644
23172315

23182316
- test("schema mismatch failure error message for parquet vectorized reader") {
23192317
+ test("schema mismatch failure error message for parquet vectorized reader",
2320-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4316")) {
2318+
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4316")) {
23212319
withTempPath { dir =>
23222320
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
23232321
assert(e.getCause.isInstanceOf[SparkException])
@@ -2863,7 +2861,7 @@ index abe606ad9c1..2d930b64cca 100644
28632861
val tblTargetName = "tbl_target"
28642862
val tblSourceQualified = s"default.$tblSourceName"
28652863
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2866-
index dd55fcfe42c..d4f94b1d608 100644
2864+
index dd55fcfe42c..d9a3f2df535 100644
28672865
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28682866
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28692867
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -2874,25 +2872,19 @@ index dd55fcfe42c..d4f94b1d608 100644
28742872
import org.apache.spark.sql.execution.FilterExec
28752873
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
28762874
import org.apache.spark.sql.execution.datasources.DataSourceUtils
2877-
@@ -119,6 +120,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
2875+
@@ -119,6 +120,11 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
28782876

28792877
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
28802878
(implicit pos: Position): Unit = {
28812879
+ // Check Comet skip tags first, before DisableAdaptiveExecution handling
28822880
+ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) {
28832881
+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun)
28842882
+ return
2885-
+ }
2886-
+ if (isCometEnabled &&
2887-
+ testTags.exists(t =>
2888-
+ t.isInstanceOf[IgnoreCometNativeDataFusion] || t.isInstanceOf[IgnoreCometNativeScan])) {
2889-
+ ignore(testName + " (disabled for NATIVE_DATAFUSION)", testTags: _*)(testFun)
2890-
+ return
28912883
+ }
28922884
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
28932885
super.test(testName, testTags: _*) {
28942886
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2895-
@@ -242,6 +254,11 @@ private[sql] trait SQLTestUtilsBase
2887+
@@ -242,6 +248,11 @@ private[sql] trait SQLTestUtilsBase
28962888
protected override def _sqlContext: SQLContext = self.spark.sqlContext
28972889
}
28982890

@@ -2904,7 +2896,7 @@ index dd55fcfe42c..d4f94b1d608 100644
29042896
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
29052897
SparkSession.setActiveSession(spark)
29062898
super.withSQLConf(pairs: _*)(f)
2907-
@@ -434,6 +451,8 @@ private[sql] trait SQLTestUtilsBase
2899+
@@ -434,6 +445,8 @@ private[sql] trait SQLTestUtilsBase
29082900
val schema = df.schema
29092901
val withoutFilters = df.queryExecution.executedPlan.transform {
29102902
case FilterExec(_, child) => child

0 commit comments

Comments
 (0)