Skip to content

Commit 4d471e1

Browse files
committed
Merge branch 'main' into codegen_scala_udf
2 parents 2be5f73 + 0cec8f8 commit 4d471e1

6 files changed

Lines changed: 35 additions & 140 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,19 @@ object CometConf extends ShimCometConf {
794794
.booleanConf
795795
.createWithDefault(true)
796796

797+
val COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER: ConfigEntry[Boolean] =
798+
conf("spark.comet.scan.allowDisabledParquetVectorizedReader")
799+
.category(CATEGORY_SCAN)
800+
.doc(
801+
"Whether to allow Comet's native scan to replace the Parquet scan when Spark's " +
802+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key} is set to false. By default " +
803+
"(false), Comet falls back to Spark in that case, because Comet's native readers " +
804+
"mirror Spark's vectorized reader semantics rather than Spark's parquet-mr " +
805+
"(non-vectorized) semantics, which permit silent overflow / null-on-narrowing " +
806+
s"that Comet has no equivalent for. $COMPAT_GUIDE.")
807+
.booleanConf
808+
.createWithDefault(false)
809+
797810
val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
798811
conf("spark.comet.exec.strictFloatingPoint")
799812
.category(CATEGORY_EXEC)

dev/diffs/4.0.2.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2926,18 +2926,9 @@ index 0acb21f3e6f..e7c65429119 100644
29262926
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
29272927
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
29282928
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2929-
index 09ed6955a51..82924c83eb5 100644
2929+
index 09ed6955a51..236a4e99824 100644
29302930
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
29312931
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
2932-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
2933-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2934-
2935-
import org.apache.spark.SparkException
2936-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2937-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
2938-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2939-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
2940-
import org.apache.spark.sql.functions.col
29412932
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
29422933
withClue(
29432934
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -2968,66 +2959,6 @@ index 09ed6955a51..82924c83eb5 100644
29682959
)
29692960
}
29702961
test(s"parquet widening conversion $fromType -> $toType") {
2971-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
2972-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
2973-
)
2974-
}
2975-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2976-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2977-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2978-
checkAllParquetReaders(values, fromType, toType, expectError = true)
2979-
}
2980-
2981-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
2982-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
2983-
)
2984-
}
2985-
- test(s"unsupported parquet conversion $fromType -> $toType") {
2986-
+ test(s"unsupported parquet conversion $fromType -> $toType",
2987-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2988-
checkAllParquetReaders(values, fromType, toType,
2989-
expectError =
2990-
// parquet-mr allows reading decimals into a smaller precision decimal type without
2991-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
2992-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
2993-
outputTimestampType <- ParquetOutputTimestampType.values
2994-
}
2995-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
2996-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
2997-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
2998-
withSQLConf(
2999-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
3000-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
3001-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
3002-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
3003-
}
3004-
test(
3005-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
3006-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
3007-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3008-
checkAllParquetReaders(
3009-
values = Seq("1.23", "10.34"),
3010-
fromType = DecimalType(fromPrecision, 2),
3011-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
3012-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
3013-
}
3014-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
3015-
- s"Decimal($toPrecision, $toScale)"
3016-
+ s"Decimal($toPrecision, $toScale)",
3017-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
3018-
) {
3019-
checkAllParquetReaders(
3020-
values = Seq("1.23", "10.34"),
3021-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
3022-
)
3023-
}
3024-
3025-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
3026-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
3027-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3028-
withTempDir { dir =>
3029-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
3030-
writeParquetFiles(
30312962
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
30322963
index b8f3ea3c6f3..bbd44221288 100644
30332964
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

dev/diffs/4.1.1.diff

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3081,18 +3081,9 @@ index 56076175d60..5872d9962cc 100644
30813081
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
30823082
assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
30833083
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3084-
index 09ed6955a51..82924c83eb5 100644
3084+
index 09ed6955a51..236a4e99824 100644
30853085
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
30863086
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala
3087-
@@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
3088-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
3089-
3090-
import org.apache.spark.SparkException
3091-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
3092-
+import org.apache.spark.sql.{DataFrame, IgnoreCometNativeDataFusion, QueryTest, Row}
3093-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3094-
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
3095-
import org.apache.spark.sql.functions.col
30963087
@@ -65,7 +65,9 @@ class ParquetTypeWideningSuite
30973088
withClue(
30983089
s"with dictionary encoding '$dictionaryEnabled' with timestamp rebase mode " +
@@ -3123,66 +3114,6 @@ index 09ed6955a51..82924c83eb5 100644
31233114
)
31243115
}
31253116
test(s"parquet widening conversion $fromType -> $toType") {
3126-
@@ -231,7 +234,8 @@ class ParquetTypeWideningSuite
3127-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), DateType, TimestampType)
3128-
)
3129-
}
3130-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3131-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3132-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3133-
checkAllParquetReaders(values, fromType, toType, expectError = true)
3134-
}
3135-
3136-
@@ -257,7 +261,8 @@ class ParquetTypeWideningSuite
3137-
(Seq("1", "2"), LongType, DecimalType(LongDecimal.precision, 1))
3138-
)
3139-
}
3140-
- test(s"unsupported parquet conversion $fromType -> $toType") {
3141-
+ test(s"unsupported parquet conversion $fromType -> $toType",
3142-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3143-
checkAllParquetReaders(values, fromType, toType,
3144-
expectError =
3145-
// parquet-mr allows reading decimals into a smaller precision decimal type without
3146-
@@ -271,7 +276,8 @@ class ParquetTypeWideningSuite
3147-
(Seq("2020-01-01", "2020-01-02", "1312-02-27"), TimestampNTZType, DateType))
3148-
outputTimestampType <- ParquetOutputTimestampType.values
3149-
}
3150-
- test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType") {
3151-
+ test(s"unsupported parquet timestamp conversion $fromType ($outputTimestampType) -> $toType",
3152-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3153-
withSQLConf(
3154-
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
3155-
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> LegacyBehaviorPolicy.CORRECTED.toString
3156-
@@ -291,7 +297,8 @@ class ParquetTypeWideningSuite
3157-
Seq(7 -> 5, 10 -> 5, 20 -> 5, 12 -> 10, 20 -> 10, 22 -> 20)
3158-
}
3159-
test(
3160-
- s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)") {
3161-
+ s"parquet decimal precision change Decimal($fromPrecision, 2) -> Decimal($toPrecision, 2)",
3162-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3163-
checkAllParquetReaders(
3164-
values = Seq("1.23", "10.34"),
3165-
fromType = DecimalType(fromPrecision, 2),
3166-
@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite
3167-
Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8))
3168-
}
3169-
test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " +
3170-
- s"Decimal($toPrecision, $toScale)"
3171-
+ s"Decimal($toPrecision, $toScale)",
3172-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")
3173-
) {
3174-
checkAllParquetReaders(
3175-
values = Seq("1.23", "10.34"),
3176-
@@ -336,7 +344,8 @@ class ParquetTypeWideningSuite
3177-
)
3178-
}
3179-
3180-
- test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr") {
3181-
+ test("parquet decimal type change Decimal(5, 2) -> Decimal(3, 2) overflows with parquet-mr",
3182-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4352")) {
3183-
withTempDir { dir =>
3184-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
3185-
writeParquetFiles(
31863117
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
31873118
index 1cc6d3afbee..8275727fbb4 100644
31883119
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala

docs/source/user-guide/latest/compatibility/scans.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b
7979
- Duplicate field names in case-insensitive mode (e.g., a Parquet file with both `B` and `b` columns)
8080
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
8181
matching Spark's behavior.
82+
- `spark.sql.parquet.enableVectorizedReader=false`. Disabling the vectorized reader opts into
83+
Spark's parquet-mr semantics (silent overflow, null-on-narrowing), which Comet's native reader
84+
does not replicate. By default Comet falls back to Spark in this case. Set
85+
`spark.comet.scan.allowDisabledParquetVectorizedReader=true` to opt in to running the
86+
`native_datafusion` scan regardless. See
87+
[#4352](https://github.com/apache/datafusion-comet/issues/4352).
8288

8389
The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
8490
without falling back to Spark:

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,19 @@ case class CometScanRule(session: SparkSession)
207207
s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
208208
return None
209209
}
210+
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
211+
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
212+
// backend, so by default fall back to Spark. Users can opt in to letting Comet
213+
// replace the scan via COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.
214+
if (!conf.parquetVectorizedReaderEnabled &&
215+
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
216+
withInfo(
217+
scanExec,
218+
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
219+
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
220+
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
221+
return None
222+
}
210223
if (!CometNativeScan.isSupported(scanExec)) {
211224
return None
212225
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ abstract class CometTestBase
8686
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8787
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
8888
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
89+
conf.set(CometConf.COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key, "true")
8990
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
9091
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
9192
// SortOrder is incompatible for mixed zero and negative zero floating point values, but

0 commit comments

Comments
 (0)