@@ -697,7 +697,7 @@ index 6df8d66ee7f..35e270c7241 100644
697697 assert(exchanges.size == 2)
698698 }
699699diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
700- index e1a2fd33c7c..9a93daa8f5a 100644
700+ index e1a2fd33c7c..632f4b695df 100644
701701--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
702702+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
703703@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -757,43 +757,15 @@ index e1a2fd33c7c..9a93daa8f5a 100644
757757
758758 assert(countSubqueryBroadcasts == 1)
759759 assert(countReusedSubqueryBroadcasts == 1)
760- @@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
761- }
762-
763- test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
764- - "canonicalization and exchange reuse") {
765- + "canonicalization and exchange reuse",
766- + IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
767- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
768- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
769- SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
770- @@ -1331,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
771- }
772-
773- test("Subquery reuse across the whole plan",
774- + IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
775- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
776- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
777- SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
778- @@ -1425,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
779- }
780- }
781-
782- - test("SPARK-34637: DPP side broadcast query stage is created firstly") {
783- + test("SPARK-34637: DPP side broadcast query stage is created firstly",
784- + IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
785- withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
786- val df = sql(
787- """ WITH v as (
788- @@ -1579,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
760+ @@ -1579,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase
789761
790762 val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
791763 case s: SubqueryBroadcastExec => s
792764+ case s: CometSubqueryBroadcastExec => s
793765 }
794766 assert(subqueryBroadcastExecs.size === 1)
795767 subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
796- @@ -1731,6 +1751 ,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
768+ @@ -1731,6 +1748 ,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
797769 case s: BatchScanExec =>
798770 // we use f1 col for v2 tables due to schema pruning
799771 s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
@@ -830,41 +802,18 @@ index b27122a8de2..a4c5aac8212 100644
830802
831803 test("SPARK-35884: Explain Formatted") {
832804diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
833- index 95e86fe4311..0f7ed3271d4 100644
805+ index 95e86fe4311..fb2b6363af6 100644
834806--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
835807+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
836- @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
808+ @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
837809 import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
838810 import org.apache.spark.sql.catalyst.plans.logical.Filter
839811 import org.apache.spark.sql.catalyst.types.DataTypeUtils
840- + import org.apache.spark.sql.catalyst.util.quietly
841812+ import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
842813 import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
843814 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
844815 import org.apache.spark.sql.execution.datasources.FilePartition
845- @@ -204,7 +206,11 @@ class FileBasedDataSourceSuite extends QueryTest
846- }
847-
848- allFileBasedDataSources.foreach { format =>
849- - testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
850- + val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
851- + Seq(IgnoreCometNativeDataFusion(
852- + "https://github.com/apache/datafusion-comet/issues/3314"))
853- + } else Seq.empty
854- + test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly {
855- def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
856- withTempDir { dir =>
857- val basePath = dir.getCanonicalPath
858- @@ -264,7 +270,7 @@ class FileBasedDataSourceSuite extends QueryTest
859- }
860- }
861- }
862- - }
863- + }}
864- }
865-
866- Seq("json", "orc").foreach { format =>
867- @@ -655,18 +661,25 @@ class FileBasedDataSourceSuite extends QueryTest
816+ @@ -655,18 +656,25 @@ class FileBasedDataSourceSuite extends QueryTest
868817 checkAnswer(sql(s"select A from $tableName"), data.select("A"))
869818
870819 // RuntimeException is triggered at executor side, which is then wrapped as
@@ -897,31 +846,31 @@ index 95e86fe4311..0f7ed3271d4 100644
897846 condition = "_LEGACY_ERROR_TEMP_2093",
898847 parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
899848 )
900- @@ -954,6 +967 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
849+ @@ -954,6 +962 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
901850 assert(bJoinExec.isEmpty)
902851 val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
903852 case smJoin: SortMergeJoinExec => smJoin
904853+ case smJoin: CometSortMergeJoinExec => smJoin
905854 }
906855 assert(smJoinExec.nonEmpty)
907856 }
908- @@ -1014,6 +1028 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
857+ @@ -1014,6 +1023 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
909858
910859 val fileScan = df.queryExecution.executedPlan collectFirst {
911860 case BatchScanExec(_, f: FileScan, _, _, _, _) => f
912861+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
913862 }
914863 assert(fileScan.nonEmpty)
915864 assert(fileScan.get.partitionFilters.nonEmpty)
916- @@ -1055,6 +1070 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
865+ @@ -1055,6 +1065 ,7 @@ class FileBasedDataSourceSuite extends QueryTest
917866
918867 val fileScan = df.queryExecution.executedPlan collectFirst {
919868 case BatchScanExec(_, f: FileScan, _, _, _, _) => f
920869+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
921870 }
922871 assert(fileScan.nonEmpty)
923872 assert(fileScan.get.partitionFilters.isEmpty)
924- @@ -1239,6 +1255 ,9 @@ class FileBasedDataSourceSuite extends QueryTest
873+ @@ -1239,6 +1250 ,9 @@ class FileBasedDataSourceSuite extends QueryTest
925874 val filters = df.queryExecution.executedPlan.collect {
926875 case f: FileSourceScanLike => f.dataFilters
927876 case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -2020,20 +1969,6 @@ index 47679ed7865..9ffbaecb98e 100644
20201969 }.length == hashAggCount)
20211970 assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
20221971 }
2023- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2024- index 050a004a935..96d982f2829 100644
2025- --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2026- +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2027- @@ -1054,7 +1054,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
2028- }
2029- }
2030-
2031- - test("alter temporary view should follow current storeAnalyzedPlanForView config") {
2032- + test("alter temporary view should follow current storeAnalyzedPlanForView config",
2033- + IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
2034- withTable("t") {
2035- Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
2036- withView("v1") {
20371972diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
20381973index aed11badb71..1a365b5aacf 100644
20391974--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -3089,7 +3024,7 @@ index 3072657a095..b2293ccab17 100644
30893024 checkAnswer(
30903025 // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
30913026diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
3092- index c530dc0d3df..abf36a7ab09 100644
3027+ index c530dc0d3df..418d5ea4b4d 100644
30933028--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
30943029+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
30953030@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -3110,17 +3045,7 @@ index c530dc0d3df..abf36a7ab09 100644
31103045 val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
31113046
31123047 Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
3113- @@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3114- }
3115- }
3116-
3117- - test("Enabling/disabling ignoreCorruptFiles") {
3118- + test("Enabling/disabling ignoreCorruptFiles",
3119- + IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
3120- def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
3121- withTempDir { dir =>
3122- val basePath = dir.getCanonicalPath
3123- @@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3048+ @@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31243049 Seq(Some("A"), Some("A"), None).toDF().repartition(1)
31253050 .write.parquet(path.getAbsolutePath)
31263051 val df = spark.read.parquet(path.getAbsolutePath)
@@ -3133,7 +3058,7 @@ index c530dc0d3df..abf36a7ab09 100644
31333058 }
31343059 }
31353060 }
3136- @@ -1042,7 +1049 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3061+ @@ -1042,7 +1048 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31373062 testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
31383063 }
31393064
@@ -3143,7 +3068,7 @@ index c530dc0d3df..abf36a7ab09 100644
31433068 def readParquet(schema: String, path: File): DataFrame = {
31443069 spark.read.schema(schema).parquet(path.toString)
31453070 }
3146- @@ -1060,7 +1068 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3071+ @@ -1060,7 +1067 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31473072 checkAnswer(readParquet(schema2, path), df)
31483073 }
31493074
@@ -3153,7 +3078,7 @@ index c530dc0d3df..abf36a7ab09 100644
31533078 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
31543079 checkAnswer(readParquet(schema1, path), df)
31553080 val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
3156- @@ -1084,7 +1093 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3081+ @@ -1084,7 +1092 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31573082 val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
31583083 df.write.parquet(path.toString)
31593084
@@ -3163,7 +3088,7 @@ index c530dc0d3df..abf36a7ab09 100644
31633088 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
31643089 checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
31653090 checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
3166- @@ -1131,7 +1141 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3091+ @@ -1131,7 +1140 ,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31673092 }
31683093 }
31693094
0 commit comments