Skip to content

Commit 82acd9c

Browse files
committed
fix: enable native_datafusion Spark SQL tests for #3320, #3401, #3719
- Remove IgnoreCometNativeDataFusion tags from 5 tests that now pass: - ParquetFilterSuite: SPARK-31026 and row group level filter pushdown - StreamingSelfUnionSuite: DSv1 self-union tests - FileBasedDataSourceSuite: caseSensitive test - Add SparkError::DuplicateFieldCaseInsensitive to convert DataFusion's "Unable to get field named" schema error to SparkRuntimeException with error class _LEGACY_ERROR_TEMP_2093, matching Spark's behavior - Re-link remaining #3311 tests to specific issues #3719, #3720
1 parent cb6d5b6 commit 82acd9c

7 files changed

Lines changed: 146 additions & 90 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 55 additions & 86 deletions
Large diffs are not rendered by default.

docs/source/contributor-guide/parquet_scans.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ cause Comet to fall back to Spark.
6363
The `native_datafusion` scan does not use Spark's `FileScanRDD`, so these functions cannot populate their values.
6464
- No support for `ignoreMissingFiles` or `ignoreCorruptFiles` being set to `true`
6565
- No support for duplicate field names in case-insensitive mode. When the required or data schema contains
66-
field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Note that duplicates
67-
in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time,
68-
so DataFusion may produce a different error message than Spark in that case.
66+
field names that differ only by case (e.g., `B` and `b`), Comet falls back to Spark. Duplicates
67+
in the physical Parquet file that are not reflected in the table schema cannot be detected at plan time;
68+
in that case DataFusion will throw a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
69+
matching Spark's behavior.
6970

7071
The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
7172
without falling back to Spark:

native/core/src/errors.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,13 +436,17 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
436436
// Handle direct SparkError - serialize to JSON
437437
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
438438
_ => {
439-
// Check for file-not-found errors that may arrive through other wrapping paths
440439
let error_msg = error.to_string();
440+
// Check for file-not-found errors that may arrive through other wrapping paths
441441
if error_msg.contains("not found")
442442
&& error_msg.contains("No such file or directory")
443443
{
444444
let spark_error = SparkError::FileNotFound { message: error_msg };
445445
throw_spark_error_as_json(env, &spark_error)
446+
} else if let Some(spark_error) =
447+
try_convert_duplicate_field_error(&error_msg)
448+
{
449+
throw_spark_error_as_json(env, &spark_error)
446450
} else {
447451
let exception = error.to_exception();
448452
match backtrace {
@@ -474,6 +478,42 @@ fn throw_spark_error_as_json(
474478
)
475479
}
476480

481+
/// Try to convert a DataFusion "Unable to get field named" error into a SparkError.
482+
/// DataFusion produces this error when reading Parquet files with duplicate field names
483+
/// in case-insensitive mode (e.g., file has columns "b" and "B", query requests "b").
484+
fn try_convert_duplicate_field_error(error_msg: &str) -> Option<SparkError> {
485+
// Match: Schema error: Unable to get field named "X". Valid fields: [...]
486+
lazy_static! {
487+
static ref FIELD_RE: Regex =
488+
Regex::new(r#"Unable to get field named "([^"]+)"\. Valid fields: \[(.+)\]"#)
489+
.unwrap();
490+
}
491+
if let Some(caps) = FIELD_RE.captures(error_msg) {
492+
let requested_field = caps.get(1)?.as_str();
493+
// Parse field names from the Valid fields list: ["b"] or ["b", "B"]
494+
let valid_fields_raw = caps.get(2)?.as_str();
495+
let mut fields: Vec<String> = valid_fields_raw
496+
.split(',')
497+
.map(|s| s.trim().trim_matches('"').to_string())
498+
.collect();
499+
// DataFusion only reports fields it found; add the requested name if not present
500+
// to match Spark's behavior of listing all ambiguous fields
501+
if !fields.iter().any(|f| f == requested_field) {
502+
fields.push(requested_field.to_string());
503+
}
504+
// Spark uses lowercase required field name
505+
let required_field_name = requested_field.to_lowercase();
506+
// Format as Spark expects: [b, B]
507+
let matched_fields = format!("[{}]", fields.join(", "));
508+
Some(SparkError::DuplicateFieldCaseInsensitive {
509+
required_field_name,
510+
matched_fields,
511+
})
512+
} else {
513+
None
514+
}
515+
}
516+
477517
#[derive(Debug, Error)]
478518
enum StacktraceError {
479519
#[error("Unable to initialize message: {0}")]

native/spark-expr/src/error.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ pub enum SparkError {
169169
#[error("{message}")]
170170
FileNotFound { message: String },
171171

172+
#[error("[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) \"{required_field_name}\": [{matched_fields}] in case-insensitive mode")]
173+
DuplicateFieldCaseInsensitive {
174+
required_field_name: String,
175+
matched_fields: String,
176+
},
177+
172178
#[error("ArrowError: {0}.")]
173179
Arrow(Arc<ArrowError>),
174180

@@ -240,6 +246,9 @@ impl SparkError {
240246
SparkError::DatatypeCannotOrder { .. } => "DatatypeCannotOrder",
241247
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
242248
SparkError::FileNotFound { .. } => "FileNotFound",
249+
SparkError::DuplicateFieldCaseInsensitive { .. } => {
250+
"DuplicateFieldCaseInsensitive"
251+
}
243252
SparkError::Arrow(_) => "Arrow",
244253
SparkError::Internal(_) => "Internal",
245254
}
@@ -430,6 +439,15 @@ impl SparkError {
430439
"message": message,
431440
})
432441
}
442+
SparkError::DuplicateFieldCaseInsensitive {
443+
required_field_name,
444+
matched_fields,
445+
} => {
446+
serde_json::json!({
447+
"requiredFieldName": required_field_name,
448+
"matchedOrcFields": matched_fields,
449+
})
450+
}
433451
SparkError::Arrow(e) => {
434452
serde_json::json!({
435453
"message": e.to_string(),
@@ -499,6 +517,11 @@ impl SparkError {
499517
// FileNotFound - will be converted to SparkFileNotFoundException by the shim
500518
SparkError::FileNotFound { .. } => "org/apache/spark/SparkException",
501519

520+
// DuplicateFieldCaseInsensitive - converted to SparkRuntimeException by the shim
521+
SparkError::DuplicateFieldCaseInsensitive { .. } => {
522+
"org/apache/spark/SparkRuntimeException"
523+
}
524+
502525
// Generic errors
503526
SparkError::Arrow(_) | SparkError::Internal(_) => "org/apache/spark/SparkException",
504527
}
@@ -574,6 +597,11 @@ impl SparkError {
574597
// File not found
575598
SparkError::FileNotFound { .. } => Some("_LEGACY_ERROR_TEMP_2055"),
576599

600+
// Duplicate field in case-insensitive mode
601+
SparkError::DuplicateFieldCaseInsensitive { .. } => {
602+
Some("_LEGACY_ERROR_TEMP_2093")
603+
}
604+
577605
// Generic errors (no error class)
578606
SparkError::Arrow(_) | SparkError::Internal(_) => None,
579607
}

spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,12 @@ trait ShimSparkErrorConverter {
251251
QueryExecutionErrors
252252
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))
253253

254+
case "DuplicateFieldCaseInsensitive" =>
255+
Some(
256+
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
257+
params("requiredFieldName").toString,
258+
params("matchedOrcFields").toString))
259+
254260
case "FileNotFound" =>
255261
val msg = params("message").toString
256262
// Extract file path from native error message and format like Hadoop's

spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,12 @@ trait ShimSparkErrorConverter {
247247
QueryExecutionErrors
248248
.intervalArithmeticOverflowError("Interval arithmetic overflow", "", sqlCtx(context)))
249249

250+
case "DuplicateFieldCaseInsensitive" =>
251+
Some(
252+
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
253+
params("requiredFieldName").toString,
254+
params("matchedOrcFields").toString))
255+
250256
case "FileNotFound" =>
251257
val msg = params("message").toString
252258
// Extract file path from native error message and format like Hadoop's

spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ trait ShimSparkErrorConverter {
258258
QueryExecutionErrors.withoutSuggestionIntervalArithmeticOverflowError(
259259
context.headOption.orNull))
260260

261+
case "DuplicateFieldCaseInsensitive" =>
262+
Some(
263+
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
264+
params("requiredFieldName").toString,
265+
params("matchedOrcFields").toString))
266+
261267
case "FileNotFound" =>
262268
val msg = params("message").toString
263269
// Extract file path from native error message and format like Hadoop's

0 commit comments

Comments
 (0)