Skip to content

Commit 06d565a

Browse files
authored
fix: [Spark 4.1.1] preserve stored allowDecimalPrecisionLoss in DecimalPrecision rule (#4179)
1 parent 6746f47 commit 06d565a

6 files changed

Lines changed: 110 additions & 172 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ jobs:
370370
org.apache.spark.sql.comet.CometTaskMetricsSuite
371371
org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite
372372
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
373+
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
373374
org.apache.comet.objectstore.NativeConfigSuite
374375
- name: "expressions"
375376
value: |

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ jobs:
209209
org.apache.spark.sql.comet.CometTaskMetricsSuite
210210
org.apache.spark.sql.comet.CometDppFallbackRepro3949Suite
211211
org.apache.spark.sql.comet.CometShuffleFallbackStickinessSuite
212+
org.apache.spark.sql.comet.CometDecimalArithmeticViewSuite
212213
org.apache.comet.objectstore.NativeConfigSuite
213214
- name: "expressions"
214215
value: |

dev/diffs/4.1.1.diff

Lines changed: 16 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ index 6df8d66ee7f..35e270c7241 100644
721721
assert(exchanges.size == 2)
722722
}
723723
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
724-
index e1a2fd33c7c..9a93daa8f5a 100644
724+
index e1a2fd33c7c..632f4b695df 100644
725725
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
726726
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
727727
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -781,43 +781,15 @@ index e1a2fd33c7c..9a93daa8f5a 100644
781781

782782
assert(countSubqueryBroadcasts == 1)
783783
assert(countReusedSubqueryBroadcasts == 1)
784-
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
785-
}
786-
787-
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
788-
- "canonicalization and exchange reuse") {
789-
+ "canonicalization and exchange reuse",
790-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
791-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
792-
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
793-
SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
794-
@@ -1331,6 +1348,7 @@ abstract class DynamicPartitionPruningSuiteBase
795-
}
796-
797-
test("Subquery reuse across the whole plan",
798-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
799-
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
800-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
801-
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
802-
@@ -1425,7 +1443,8 @@ abstract class DynamicPartitionPruningSuiteBase
803-
}
804-
}
805-
806-
- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
807-
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
808-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
809-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
810-
val df = sql(
811-
""" WITH v as (
812-
@@ -1579,6 +1598,7 @@ abstract class DynamicPartitionPruningSuiteBase
784+
@@ -1579,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase
813785

814786
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
815787
case s: SubqueryBroadcastExec => s
816788
+ case s: CometSubqueryBroadcastExec => s
817789
}
818790
assert(subqueryBroadcastExecs.size === 1)
819791
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
820-
@@ -1731,6 +1751,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
792+
@@ -1731,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
821793
case s: BatchScanExec =>
822794
// we use f1 col for v2 tables due to schema pruning
823795
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
@@ -854,41 +826,18 @@ index b27122a8de2..a4c5aac8212 100644
854826

855827
test("SPARK-35884: Explain Formatted") {
856828
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
857-
index 95e86fe4311..0f7ed3271d4 100644
829+
index 95e86fe4311..fb2b6363af6 100644
858830
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
859831
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
860-
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
832+
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
861833
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
862834
import org.apache.spark.sql.catalyst.plans.logical.Filter
863835
import org.apache.spark.sql.catalyst.types.DataTypeUtils
864-
+import org.apache.spark.sql.catalyst.util.quietly
865836
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
866837
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
867838
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
868839
import org.apache.spark.sql.execution.datasources.FilePartition
869-
@@ -204,7 +206,11 @@ class FileBasedDataSourceSuite extends QueryTest
870-
}
871-
872-
allFileBasedDataSources.foreach { format =>
873-
- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
874-
+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
875-
+ Seq(IgnoreCometNativeDataFusion(
876-
+ "https://github.com/apache/datafusion-comet/issues/3314"))
877-
+ } else Seq.empty
878-
+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly {
879-
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
880-
withTempDir { dir =>
881-
val basePath = dir.getCanonicalPath
882-
@@ -264,7 +270,7 @@ class FileBasedDataSourceSuite extends QueryTest
883-
}
884-
}
885-
}
886-
- }
887-
+ }}
888-
}
889-
890-
Seq("json", "orc").foreach { format =>
891-
@@ -655,18 +661,25 @@ class FileBasedDataSourceSuite extends QueryTest
840+
@@ -655,18 +656,25 @@ class FileBasedDataSourceSuite extends QueryTest
892841
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
893842

894843
// RuntimeException is triggered at executor side, which is then wrapped as
@@ -921,31 +870,31 @@ index 95e86fe4311..0f7ed3271d4 100644
921870
condition = "_LEGACY_ERROR_TEMP_2093",
922871
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
923872
)
924-
@@ -954,6 +967,7 @@ class FileBasedDataSourceSuite extends QueryTest
873+
@@ -954,6 +962,7 @@ class FileBasedDataSourceSuite extends QueryTest
925874
assert(bJoinExec.isEmpty)
926875
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
927876
case smJoin: SortMergeJoinExec => smJoin
928877
+ case smJoin: CometSortMergeJoinExec => smJoin
929878
}
930879
assert(smJoinExec.nonEmpty)
931880
}
932-
@@ -1014,6 +1028,7 @@ class FileBasedDataSourceSuite extends QueryTest
881+
@@ -1014,6 +1023,7 @@ class FileBasedDataSourceSuite extends QueryTest
933882

934883
val fileScan = df.queryExecution.executedPlan collectFirst {
935884
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
936885
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
937886
}
938887
assert(fileScan.nonEmpty)
939888
assert(fileScan.get.partitionFilters.nonEmpty)
940-
@@ -1055,6 +1070,7 @@ class FileBasedDataSourceSuite extends QueryTest
889+
@@ -1055,6 +1065,7 @@ class FileBasedDataSourceSuite extends QueryTest
941890

942891
val fileScan = df.queryExecution.executedPlan collectFirst {
943892
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
944893
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
945894
}
946895
assert(fileScan.nonEmpty)
947896
assert(fileScan.get.partitionFilters.isEmpty)
948-
@@ -1239,6 +1255,9 @@ class FileBasedDataSourceSuite extends QueryTest
897+
@@ -1239,6 +1250,9 @@ class FileBasedDataSourceSuite extends QueryTest
949898
val filters = df.queryExecution.executedPlan.collect {
950899
case f: FileSourceScanLike => f.dataFilters
951900
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -2020,30 +1969,6 @@ index 47679ed7865..9ffbaecb98e 100644
20201969
}.length == hashAggCount)
20211970
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
20221971
}
2023-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2024-
index 050a004a935..96d982f2829 100644
2025-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2026-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
2027-
@@ -1054,7 +1054,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
2028-
}
2029-
}
2030-
2031-
- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
2032-
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
2033-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
2034-
withTable("t") {
2035-
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
2036-
withView("v1") {
2037-
@@ -1334,7 +1335,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
2038-
}
2039-
}
2040-
2041-
- test("SPARK-53968 reading the view after allowPrecisionLoss is changed") {
2042-
+ test("SPARK-53968 reading the view after allowPrecisionLoss is changed",
2043-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4124")) {
2044-
import org.apache.spark.sql.internal.SQLConf
2045-
val partsTableName = "parts_tbl"
2046-
val ordersTableName = "orders_tbl"
20471972
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
20481973
index aed11badb71..1a365b5aacf 100644
20491974
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -3101,7 +3026,7 @@ index 3072657a095..b2293ccab17 100644
31013026
checkAnswer(
31023027
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
31033028
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
3104-
index c530dc0d3df..abf36a7ab09 100644
3029+
index c530dc0d3df..418d5ea4b4d 100644
31053030
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
31063031
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
31073032
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -3122,17 +3047,7 @@ index c530dc0d3df..abf36a7ab09 100644
31223047
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
31233048

31243049
Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
3125-
@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3126-
}
3127-
}
3128-
3129-
- test("Enabling/disabling ignoreCorruptFiles") {
3130-
+ test("Enabling/disabling ignoreCorruptFiles",
3131-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
3132-
def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
3133-
withTempDir { dir =>
3134-
val basePath = dir.getCanonicalPath
3135-
@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3050+
@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31363051
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
31373052
.write.parquet(path.getAbsolutePath)
31383053
val df = spark.read.parquet(path.getAbsolutePath)
@@ -3145,7 +3060,7 @@ index c530dc0d3df..abf36a7ab09 100644
31453060
}
31463061
}
31473062
}
3148-
@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3063+
@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31493064
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
31503065
}
31513066

@@ -3155,7 +3070,7 @@ index c530dc0d3df..abf36a7ab09 100644
31553070
def readParquet(schema: String, path: File): DataFrame = {
31563071
spark.read.schema(schema).parquet(path.toString)
31573072
}
3158-
@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3073+
@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31593074
checkAnswer(readParquet(schema2, path), df)
31603075
}
31613076

@@ -3165,7 +3080,7 @@ index c530dc0d3df..abf36a7ab09 100644
31653080
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
31663081
checkAnswer(readParquet(schema1, path), df)
31673082
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
3168-
@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3083+
@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31693084
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
31703085
df.write.parquet(path.toString)
31713086

@@ -3175,7 +3090,7 @@ index c530dc0d3df..abf36a7ab09 100644
31753090
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
31763091
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
31773092
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
3178-
@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
3093+
@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
31793094
}
31803095
}
31813096

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,9 +585,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
585585
inputs: Seq[Attribute],
586586
binding: Boolean = true): Option[Expr] = {
587587

588-
val conf = SQLConf.get
589-
val newExpr =
590-
DecimalPrecision.promote(conf.decimalOperationsAllowPrecisionLoss, expr, !conf.ansiEnabled)
588+
val newExpr = DecimalPrecision.promote(expr, !SQLConf.get.ansiEnabled)
591589
exprToProtoInternal(newExpr, inputs, binding)
592590
}
593591

0 commit comments

Comments
 (0)