Skip to content

Commit cdb64fe

Browse files
committed
fix: fall back from native_datafusion when schema evolution is disabled
DataFusion's native Parquet reader always enables type promotion regardless of the Comet schema evolution config. When schema evolution is disabled, fall back to native_iceberg_compat in auto mode to enforce strict type matching.
1 parent 0342137 commit cdb64fe

3 files changed

Lines changed: 18 additions & 4 deletions

File tree

docs/source/contributor-guide/parquet_scans.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ Comet currently has two distinct implementations of the Parquet scan operator.
2828

2929
The configuration property
3030
`spark.comet.scan.impl` is used to select an implementation. The default setting is `spark.comet.scan.impl=auto`, which
31-
currently always uses the `native_iceberg_compat` implementation. Most users should not need to change this setting.
32-
However, it is possible to force Comet to use a particular implementation for all scan operations by setting
33-
this configuration property to one of the following implementations. For example: `--conf spark.comet.scan.impl=native_datafusion`.
31+
selects the best implementation based on query characteristics. In auto mode, Comet prefers `native_datafusion` when
32+
possible and falls back to `native_iceberg_compat` when it detects incompatibilities (such as row indexes, metadata
33+
columns, `input_file_name()` usage, or when `spark.comet.schemaEvolution.enabled` is disabled). Most users should
34+
not need to change this setting. However, it is possible to force Comet to use a particular implementation for all
35+
scan operations by setting this configuration property to one of the following implementations. For example:
36+
`--conf spark.comet.scan.impl=native_datafusion`.
3437

3538
The following features are not supported by either scan implementation, and Comet will fall back to Spark in these scenarios:
3639

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,17 @@ case class CometScanRule(session: SparkSession)
240240
return None
241241
}
242242
}
243+
// DataFusion's native Parquet reader always enables type promotion
244+
// (e.g., int->long, float->double) regardless of the Comet schema
245+
// evolution config. When schema evolution is disabled, fall back so
246+
// that native_iceberg_compat can enforce strict type matching.
247+
if (!COMET_SCHEMA_EVOLUTION_ENABLED.get()) {
248+
withInfo(
249+
scanExec,
250+
"Native DataFusion scan always enables schema evolution " +
251+
s"but ${COMET_SCHEMA_EVOLUTION_ENABLED.key} is disabled")
252+
return None
253+
}
243254
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
244255
return None
245256
}

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ abstract class ParquetReadSuite extends CometTestBase {
984984
withParquetDataFrame(data, schema = Some(readSchema)) { df =>
985985
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
986986
if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL
987-
.get(conf) != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
987+
.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) {
988988
checkAnswer(df, data.map(Row.fromTuple))
989989
} else {
990990
assertThrows[SparkException](df.collect())

0 commit comments

Comments
 (0)