Skip to content

Commit 85126d1

Browse files
committed
fix: enable native_datafusion Spark SQL tests for issues #3320 and #3401
Remove IgnoreCometNativeDataFusion tags from 4 tests that now pass: - ParquetFilterSuite: SPARK-31026 (dotted field names) and row group level filter pushdown - conditional assertions already guard the stripSparkFilter checks for native execution - StreamingSelfUnionSuite: DSv1 self-union via DataStreamReader and table API - streaming works correctly with native_datafusion
1 parent cb6d5b6 commit 85126d1

1 file changed

Lines changed: 11 additions & 62 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 11 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index edd2ad57880..77a975ea48f 100644
2+
index edd2ad57880..837b95d1ada 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -152,6 +152,8 @@
@@ -1969,7 +1969,7 @@ index 07e2849ce6f..3e73645b638 100644
19691969
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19701970
)
19711971
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1972-
index 8e88049f51e..6150a556f9b 100644
1972+
index 8e88049f51e..c85cf751871 100644
19731973
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19741974
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19751975
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -1995,17 +1995,7 @@ index 8e88049f51e..6150a556f9b 100644
19951995
import testImplicits._
19961996

19971997
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
1998-
@@ -1548,7 +1553,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1999-
}
2000-
}
2001-
2002-
- test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") {
2003-
+ test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names",
2004-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
2005-
import testImplicits._
2006-
2007-
withAllParquetReaders {
2008-
@@ -1580,13 +1586,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
1998+
@@ -1580,7 +1585,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20091999
// than the total length but should not be a single record.
20102000
// Note that, if record level filtering is enabled, it should be a single record.
20112001
// If no filter is pushed down to Parquet, it should be the total length of data.
@@ -2018,15 +2008,7 @@ index 8e88049f51e..6150a556f9b 100644
20182008
}
20192009
}
20202010
}
2021-
}
2022-
2023-
- test("Filters should be pushed down for Parquet readers at row group level") {
2024-
+ test("Filters should be pushed down for Parquet readers at row group level",
2025-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3320")) {
2026-
import testImplicits._
2027-
2028-
withSQLConf(
2029-
@@ -1607,7 +1618,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2011+
@@ -1607,7 +1616,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20302012
// than the total length but should not be a single record.
20312013
// Note that, if record level filtering is enabled, it should be a single record.
20322014
// If no filter is pushed down to Parquet, it should be the total length of data.
@@ -2039,7 +2021,7 @@ index 8e88049f51e..6150a556f9b 100644
20392021
}
20402022
}
20412023
}
2042-
@@ -1699,7 +1714,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2024+
@@ -1699,7 +1712,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20432025
(attr, value) => sources.StringContains(attr, value))
20442026
}
20452027

@@ -2048,7 +2030,7 @@ index 8e88049f51e..6150a556f9b 100644
20482030
import testImplicits._
20492031
// keep() should take effect on StartsWith/EndsWith/Contains
20502032
Seq(
2051-
@@ -1743,7 +1758,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2033+
@@ -1743,7 +1756,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20522034
}
20532035
}
20542036

@@ -2058,7 +2040,7 @@ index 8e88049f51e..6150a556f9b 100644
20582040
val schema = StructType(Seq(
20592041
StructField("a", IntegerType, nullable = false)
20602042
))
2061-
@@ -1952,8 +1968,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2043+
@@ -1952,8 +1966,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20622044
val e = intercept[SparkException] {
20632045
sql(s"select a from $tableName where b > 0").collect()
20642046
}
@@ -2075,7 +2057,7 @@ index 8e88049f51e..6150a556f9b 100644
20752057
}
20762058

20772059
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2078-
@@ -1984,7 +2006,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2060+
@@ -1984,7 +2004,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20792061
}
20802062
}
20812063

@@ -2085,7 +2067,7 @@ index 8e88049f51e..6150a556f9b 100644
20852067
// block 1:
20862068
// null count min max
20872069
// page-0 0 0 99
2088-
@@ -2044,7 +2067,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2070+
@@ -2044,7 +2065,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20892071
}
20902072
}
20912073

@@ -2095,7 +2077,7 @@ index 8e88049f51e..6150a556f9b 100644
20952077
withTempPath { dir =>
20962078
val path = dir.getCanonicalPath
20972079
spark.range(100).selectExpr("id * 2 AS id")
2098-
@@ -2276,7 +2300,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2080+
@@ -2276,7 +2298,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
20992081
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21002082
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21012083

@@ -2108,7 +2090,7 @@ index 8e88049f51e..6150a556f9b 100644
21082090
} else {
21092091
assert(selectedFilters.isEmpty, "There is filter pushed down")
21102092
}
2111-
@@ -2336,7 +2364,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2093+
@@ -2336,7 +2362,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
21122094
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21132095
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21142096

@@ -2868,39 +2850,6 @@ index aad91601758..201083bd621 100644
28682850
})
28692851
}
28702852

2871-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2872-
index 8f099c31e6b..ce4b7ad25b3 100644
2873-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2874-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
2875-
@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
2876-
import org.scalatest.BeforeAndAfter
2877-
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2878-
2879-
-import org.apache.spark.sql.SaveMode
2880-
+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, SaveMode}
2881-
import org.apache.spark.sql.connector.catalog.Identifier
2882-
import org.apache.spark.sql.execution.streaming.MemoryStream
2883-
import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog}
2884-
@@ -42,7 +42,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
2885-
sqlContext.streams.active.foreach(_.stop())
2886-
}
2887-
2888-
- test("self-union, DSv1, read via DataStreamReader API") {
2889-
+ test("self-union, DSv1, read via DataStreamReader API",
2890-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
2891-
withTempPath { dir =>
2892-
val dataLocation = dir.getAbsolutePath
2893-
spark.range(1, 4).write.format("parquet").save(dataLocation)
2894-
@@ -66,7 +67,8 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
2895-
}
2896-
}
2897-
2898-
- test("self-union, DSv1, read via table API") {
2899-
+ test("self-union, DSv1, read via table API",
2900-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3401")) {
2901-
withTable("parquet_streaming_tbl") {
2902-
spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet")
2903-
29042853
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
29052854
index abe606ad9c1..2d930b64cca 100644
29062855
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala

0 commit comments

Comments
 (0)