Skip to content

Commit f5fe433

Browse files
committed
fix: enable native_datafusion Spark SQL tests for issues #3320 and #3401
Remove IgnoreCometNativeDataFusion tags from 4 tests that now pass: - ParquetFilterSuite: SPARK-31026 (dotted field names) and row group level filter pushdown - conditional assertions already guard the stripSparkFilter checks for native execution - StreamingSelfUnionSuite: DSv1 self-union via DataStreamReader and table API - streaming works correctly with native_datafusion
1 parent cb6d5b6 commit f5fe433

1 file changed

Lines changed: 28 additions & 80 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 28 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index edd2ad57880..77a975ea48f 100644
2+
index edd2ad57880..837b95d1ada 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -152,6 +152,8 @@
@@ -485,7 +485,7 @@ index a206e97c353..fea1149b67d 100644
485485

486486
test("SPARK-35884: Explain Formatted") {
487487
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
488-
index 93275487f29..ca79ad8b6d9 100644
488+
index 93275487f29..6a62cc06535 100644
489489
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
490490
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
491491
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
@@ -502,7 +502,7 @@ index 93275487f29..ca79ad8b6d9 100644
502502
Seq("parquet", "orc").foreach { format =>
503503
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
504504
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
505-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
505+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3719")) {
506506
withTempDir { dir =>
507507
val tableName = s"spark_25132_${format}_native"
508508
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -1969,7 +1969,7 @@ index 07e2849ce6f..3e73645b638 100644
19691969
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19701970
)
19711971
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1972-
index 8e88049f51e..6150a556f9b 100644
1972+
index 8e88049f51e..c85cf751871 100644
19731973
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19741974
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19751975
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -1995,17 +1995,7 @@ index 8e88049f51e..6150a556f9b 100644
19951995
import testImplicits._
19961996

19971997
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
1998-
@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1999-
}
2000-
}
2001-
2002-
- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") {
2003-
+ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names",
2004-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
2005-
import testImplicits._
2006-
2007-
withAllParquetReaders {
2008-
@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1998+
@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20091999
// than the total length but should not be a single record.
20102000
// Note that, if record level filtering is enabled, it should be a single record.
20112001
// If no filter is pushed down to Parquet, it should be the total length of data.
@@ -2018,15 +2008,7 @@ index 8e88049f51e..6150a556f9b 100644
20182008
}
20192009
}
20202010
}
2021-
}
2022-
2023-
- test("Filters should be pushed down for Parquet readers at row group level") {
2024-
+ test("Filters should be pushed down for Parquet readers at row group level",
2025-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
2026-
import testImplicits._
2027-
2028-
withSQLConf(
2029-
@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2011+
@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20302012
// than the total length but should not be a single record.
20312013
// Note that, if record level filtering is enabled, it should be a single record.
20322014
// If no filter is pushed down to Parquet, it should be the total length of data.
@@ -2039,7 +2021,7 @@ index 8e88049f51e..6150a556f9b 100644
20392021
}
20402022
}
20412023
}
2042-
@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2024+
@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20432025
(attr, value) => sources.StringContains(attr, value))
20442026
}
20452027

@@ -2048,7 +2030,7 @@ index 8e88049f51e..6150a556f9b 100644
20482030
import testImplicits._
20492031
// keep() should take effect on StartsWith/EndsWith/Contains
20502032
Seq(
2051-
@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2033+
@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20522034
}
20532035
}
20542036

@@ -2058,7 +2040,7 @@ index 8e88049f51e..6150a556f9b 100644
20582040
val schema = StructType(Seq(
20592041
StructField("a", IntegerType, nullable = false)
20602042
))
2061-
@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2043+
@@ -1952,8 +1966,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20622044
val e = intercept[SparkException] {
20632045
sql(s"select a from $tableName where b > 0").collect()
20642046
}
@@ -2075,7 +2057,7 @@ index 8e88049f51e..6150a556f9b 100644
20752057
}
20762058

20772059
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2078-
@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2060+
@@ -1984,7 +2004,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20792061
}
20802062
}
20812063

@@ -2085,7 +2067,7 @@ index 8e88049f51e..6150a556f9b 100644
20852067
// block 1:
20862068
// null count min max
20872069
// page-0 0 0 99
2088-
@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2070+
@@ -2044,7 +2065,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20892071
}
20902072
}
20912073

@@ -2095,7 +2077,7 @@ index 8e88049f51e..6150a556f9b 100644
20952077
withTempPath { dir =>
20962078
val path = dir.getCanonicalPath
20972079
spark.range(100).selectExpr("id * 2 AS id")
2098-
@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2080+
@@ -2276,7 +2298,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
20992081
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21002082
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21012083

@@ -2108,7 +2090,7 @@ index 8e88049f51e..6150a556f9b 100644
21082090
} else {
21092091
assert(selectedFilters.isEmpty, "There is filter pushed down")
21102092
}
2111-
@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2093+
@@ -2336,7 +2362,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
21122094
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21132095
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21142096

@@ -2122,7 +2104,7 @@ index 8e88049f51e..6150a556f9b 100644
21222104
case _ =>
21232105
throw new AnalysisException("Can not match ParquetTable in the query.")
21242106
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2125-
index 8ed9ef1630e..f312174b182 100644
2107+
index 8ed9ef1630e..a865928c1b2 100644
21262108
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
21272109
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
21282110
@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -2131,7 +2113,7 @@ index 8ed9ef1630e..f312174b182 100644
21312113

21322114
- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
21332115
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
2134-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2116+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
21352117
val data = (1 to 4).map(i => Tuple1(i.toString))
21362118
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))
21372119

@@ -2141,7 +2123,7 @@ index 8ed9ef1630e..f312174b182 100644
21412123

21422124
- test("SPARK-35640: int as long should throw schema incompatible error") {
21432125
+ test("SPARK-35640: int as long should throw schema incompatible error",
2144-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2126+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
21452127
val data = (1 to 4).map(i => Tuple1(i))
21462128
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))
21472129

@@ -2156,7 +2138,7 @@ index 8ed9ef1630e..f312174b182 100644
21562138
checkAnswer(
21572139
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
21582140
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2159-
index f6472ba3d9d..ce39ebb52e6 100644
2141+
index f6472ba3d9d..5ea2d938664 100644
21602142
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21612143
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21622144
@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -2165,7 +2147,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
21652147

21662148
- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
21672149
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
2168-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2150+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
21692151
val data = (1 to 1000).map { i =>
21702152
val ts = new java.sql.Timestamp(i)
21712153
Row(ts)
@@ -2185,7 +2167,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
21852167

21862168
- test("SPARK-34212 Parquet should read decimals correctly") {
21872169
+ test("SPARK-34212 Parquet should read decimals correctly",
2188-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2170+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
21892171
def readParquet(schema: String, path: File): DataFrame = {
21902172
spark.read.schema(schema).parquet(path.toString)
21912173
}
@@ -2215,7 +2197,7 @@ index f6472ba3d9d..ce39ebb52e6 100644
22152197

22162198
- test("row group skipping doesn't overflow when reading into larger type") {
22172199
+ test("row group skipping doesn't overflow when reading into larger type",
2218-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2200+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
22192201
withTempPath { path =>
22202202
Seq(0).toDF("a").write.parquet(path.toString)
22212203
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
@@ -2324,7 +2306,7 @@ index 5c0b7def039..151184bc98c 100644
23242306
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
23252307
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
23262308
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
2327-
index 3f47c5e506f..92a5eafec84 100644
2309+
index 3f47c5e506f..f1ce3194279 100644
23282310
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
23292311
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
23302312
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
@@ -2351,7 +2333,7 @@ index 3f47c5e506f..92a5eafec84 100644
23512333

23522334
- test("schema mismatch failure error message for parquet vectorized reader") {
23532335
+ test("schema mismatch failure error message for parquet vectorized reader",
2354-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2336+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
23552337
withTempPath { dir =>
23562338
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
23572339
assert(e.getCause.isInstanceOf[SparkException])
@@ -2361,7 +2343,7 @@ index 3f47c5e506f..92a5eafec84 100644
23612343

23622344
- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>") {
23632345
+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>",
2364-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2346+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
23652347
import testImplicits._
23662348

23672349
withTempPath { dir =>
@@ -2868,39 +2850,6 @@ index aad91601758..201083bd621 100644
28682850
})
28692851
}
28702852

2871-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2872-
index 8f099c31e6b..ce4b7ad25b3 100644
2873-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2874-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2875-
@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
2876-
import org.scalatest.BeforeAndAfter
2877-
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2878-
2879-
-import org.apache.spark.sql.SaveMode
2880-
+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode}
2881-
import org.apache.spark.sql.connector.catalog.Identifier
2882-
import org.apache.spark.sql.execution.streaming.MemoryStream
2883-
import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
2884-
@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
2885-
sqlContext.streams.active.foreach(_.stop())
2886-
}
2887-
2888-
- test("self-union, DSv1, read via DataStreamReader API") {
2889-
+ test("self-union, DSv1, read via DataStreamReader API",
2890-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
2891-
withTempPath { dir =>
2892-
val dataLocation = dir.getAbsolutePath
2893-
spark.range(1, 4).write.format("parquet").save(dataLocation)
2894-
@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
2895-
}
2896-
}
2897-
2898-
- test("self-union, DSv1, read via table API") {
2899-
+ test("self-union, DSv1, read via table API",
2900-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
2901-
withTable("parquet_streaming_tbl") {
2902-
spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")
2903-
29042853
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
29052854
index abe606ad9c1..2d930b64cca 100644
29062855
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -2925,7 +2874,7 @@ index abe606ad9c1..2d930b64cca 100644
29252874
val tblTargetName = "tbl_target"
29262875
val tblSourceQualified = s"default.$tblSourceName"
29272876
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2928-
index e937173a590..7d20538bc68 100644
2877+
index e937173a590..242b20f0745 100644
29292878
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
29302879
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
29312880
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2944,7 +2893,7 @@ index e937173a590..7d20538bc68 100644
29442893
import org.apache.spark.sql.execution.FilterExec
29452894
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
29462895
import org.apache.spark.sql.execution.datasources.DataSourceUtils
2947-
@@ -119,6 +121,34 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
2896+
@@ -119,6 +121,33 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
29482897

29492898
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
29502899
(implicit pos: Position): Unit = {
@@ -2957,8 +2906,7 @@ index e937173a590..7d20538bc68 100644
29572906
+ val cometScanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)
29582907
+ val isNativeIcebergCompat = cometScanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
29592908
+ cometScanImpl == CometConf.SCAN_AUTO
2960-
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION ||
2961-
+ cometScanImpl == CometConf.SCAN_AUTO
2909+
+ val isNativeDataFusion = cometScanImpl == CometConf.SCAN_NATIVE_DATAFUSION
29622910
+ if (isNativeIcebergCompat &&
29632911
+ testTags.exists(_.isInstanceOf[IgnoreCometNativeIcebergCompat])) {
29642912
+ ignore(testName + " (disabled for NATIVE_ICEBERG_COMPAT)", testTags: _*)(testFun)
@@ -2979,7 +2927,7 @@ index e937173a590..7d20538bc68 100644
29792927
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
29802928
super.test(testName, testTags: _*) {
29812929
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2982-
@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
2930+
@@ -242,6 +271,29 @@ private[sql] trait SQLTestUtilsBase
29832931
protected override def _sqlContext: SQLContext = self.spark.sqlContext
29842932
}
29852933

@@ -3009,7 +2957,7 @@ index e937173a590..7d20538bc68 100644
30092957
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
30102958
SparkSession.setActiveSession(spark)
30112959
super.withSQLConf(pairs: _*)(f)
3012-
@@ -435,6 +488,8 @@ private[sql] trait SQLTestUtilsBase
2960+
@@ -435,6 +487,8 @@ private[sql] trait SQLTestUtilsBase
30132961
val schema = df.schema
30142962
val withoutFilters = df.queryExecution.executedPlan.transform {
30152963
case FilterExec(_, child) => child

0 commit comments

Comments
 (0)