Skip to content

Commit 738694a

Browse files
committed
fix: detect and fall back for native_datafusion incompatibilities
- Add detection for case-insensitive duplicate field names in CometScanRule, falling back to native_iceberg_compat when native_datafusion would produce different error messages than Spark. - Fix schema evolution test to account for auto mode now preferring native_datafusion, which always handles type promotion. The metrics test failures (output_rows=0, filter pushdown=0) are pre-existing on main and not caused by this change.
1 parent 627bb85 commit 738694a

2 files changed

Lines changed: 14 additions & 1 deletion

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,19 @@ case class CometScanRule(session: SparkSession)
222222
withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching")
223223
return None
224224
}
225+
// Case-insensitive mode with duplicate field names produces different errors
226+
// in DataFusion vs Spark, so fall back to avoid incompatible error messages
227+
if (!session.sessionState.conf.caseSensitiveAnalysis) {
228+
val fieldNames =
229+
scanExec.requiredSchema.fieldNames.map(_.toLowerCase(java.util.Locale.ROOT))
230+
if (fieldNames.length != fieldNames.distinct.length) {
231+
withInfo(
232+
scanExec,
233+
"Native DataFusion scan does not support " +
234+
"duplicate field names in case-insensitive mode")
235+
return None
236+
}
237+
}
225238
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
226239
return None
227240
}

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ abstract class ParquetReadSuite extends CometTestBase {
984984
withParquetDataFrame(data, schema = Some(readSchema)) { df =>
985985
// TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true'
986986
if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL
987-
.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) {
987+
.get(conf) != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
988988
checkAnswer(df, data.map(Row.fromTuple))
989989
} else {
990990
assertThrows[SparkException](df.collect())

0 commit comments

Comments
 (0)