From 7b9be769a7ea764bc68d350f6e0e0d9c28bef71e Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Sun, 14 Jun 2026 01:13:49 +0800 Subject: [PATCH 1/3] surface SparkArithmeticException(DIVIDE_BY_ZERO) for divide-by-zero in dispatched ScalaUDF path --- .../src/execution/expressions/arithmetic.rs | 29 +++- native/jni-bridge/src/errors.rs | 137 ++++++++++++------ .../org/apache/comet/CometCodegenSuite.scala | 36 +++++ 3 files changed, 149 insertions(+), 53 deletions(-) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 320532d773..1d2ce3875d 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -94,19 +94,16 @@ impl PhysicalExpr for CheckedBinaryExpr { } fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result { - let result = self.child.evaluate(batch); - - // If there's an error and we have query_context, wrap it - match result { - Err(DataFusionError::External(e)) if self.query_context.is_some() => { - if let Some(spark_err) = e.downcast_ref::() { + match self.child.evaluate(batch) { + Err(e) if self.query_context.is_some() => { + if let Some(spark_err) = extract_spark_error(&e) { let wrapped = SparkErrorWithContext::with_context( spark_err.clone(), Arc::clone(self.query_context.as_ref().unwrap()), ); Err(DataFusionError::External(Box::new(wrapped))) } else { - Err(DataFusionError::External(e)) + Err(e) } } other => other, @@ -173,6 +170,24 @@ use datafusion::logical_expr::Operator as DataFusionOperator; use datafusion_comet_proto::spark_expression::Expr; use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode}; +/// Recursively unwrap `DataFusionError::Context` / nested `External` layers to find +/// a bare `SparkError`. Mirrors `extract_spark_payload` in `jni-bridge/src/errors.rs`. +fn extract_spark_error(err: &DataFusionError) -> Option<&SparkError> { + match err { + DataFusionError::External(e) => { + if let Some(spark) = e.downcast_ref::() { + return Some(spark); + } + if let Some(inner_df) = e.downcast_ref::() { + return extract_spark_error(inner_df); + } + None + } + DataFusionError::Context(_, inner) => extract_spark_error(inner), + _ => None, + } +} + use crate::execution::{ expressions::extract_expr, operators::ExecutionError, diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 5bc1aad8cd..80aa37eedd 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -470,6 +470,47 @@ pub fn unwrap_or_throw_default( } } +/// Payload recovered from a DataFusionError chain. +enum SparkPayload<'a> { + JavaException(&'a Global>), + WithContext(&'a SparkErrorWithContext), + Bare(&'a SparkError), +} + +/// Recursively unwrap `DataFusionError::Context` and `DataFusionError::External` layers +/// until a Spark-typed payload is found, or return `None`. +/// +/// DataFusion 53+ wraps errors with `.context(...)` which produces +/// `DataFusionError::Context(description, Box)`. The JNI bridge must look +/// through this extra layer — and through any doubly-nested `External` — to reach +/// the `SparkError` / `SparkErrorWithContext` that carries the structured exception. +fn extract_spark_payload(err: &DataFusionError) -> Option> { + match err { + DataFusionError::External(e) => { + if let Some(comet_err) = e.downcast_ref::() { + if let CometError::JavaException { throwable, .. } = comet_err { + return Some(SparkPayload::JavaException(throwable)); + } + } + if let Some(ctx) = e.downcast_ref::() { + return Some(SparkPayload::WithContext(ctx)); + } + if let Some(spark) = e.downcast_ref::() { + return Some(SparkPayload::Bare(spark)); + } + // Recurse: External may wrap another DataFusionError (double-wrapping). + if let Some(inner_df) = e.downcast_ref::() { + return extract_spark_payload(inner_df); + } + None + } + // DataFusion 53 adds context via `.context(description)` which wraps the error in + // Context(description, Box). Strip the context wrapper and recurse. + DataFusionError::Context(_, inner) => extract_spark_payload(inner), + _ => None, + } +} + fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) { // If there isn't already an exception? if !env.exception_check() { @@ -492,53 +533,57 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option) throwable, }, } => env.throw(throwable), - // Handle DataFusion errors containing SparkError or SparkErrorWithContext - CometError::DataFusion { - msg: _, - source: DataFusionError::External(e), - } => { - if let Some(CometError::JavaException { throwable, .. }) = - e.downcast_ref::() - { - // A Java exception captured inside a JVM UDF kernel (e.g. Spark codegen - // raising INVALID_REGEXP_REPLACE). Re-throw the original throwable so callers - // see the exact Spark exception type rather than a wrapped CometNativeException. - env.throw(throwable) - } else if let Some(spark_error_with_ctx) = e.downcast_ref::() - { - let json_message = spark_error_with_ctx.to_json(); - env.throw_new( - jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"), - JNIString::new(json_message), - ) - } else if let Some(spark_error) = e.downcast_ref::() { - let json_message = spark_error.to_json(); - env.throw_new( - jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"), - JNIString::new(json_message), - ) - } else { - // Check for file-not-found errors from object store - let error_msg = e.to_string(); - if error_msg.contains("not found") - && error_msg.contains("No such file or directory") - { - let spark_error = SparkError::FileNotFound { message: error_msg }; - throw_spark_error_as_json(env, &spark_error) - } else { - // Not a SparkError, use generic exception - let exception = error.to_exception(); - match backtrace { - Some(backtrace_string) => env.throw_new( - JNIString::new(exception.class), - JNIString::new( - to_stacktrace_string(exception.msg, backtrace_string).unwrap(), - ), - ), - _ => env.throw_new( - JNIString::new(exception.class), - JNIString::new(exception.msg), + // Handle all DataFusion errors, including Context-wrapped chains. + // `extract_spark_payload` recurses through Context / nested External layers to + // find the Spark-typed payload, so this arm covers: + // - DataFusionError::External(SparkErrorWithContext) (normal path) + // - DataFusionError::External(SparkError) (no query context) + // - DataFusionError::Context(_, External(SparkError)) (DF53 context wrapping) + // - DataFusionError::External(External(SparkError)) (double wrapping) + CometError::DataFusion { msg: _, source } => { + match extract_spark_payload(source) { + Some(SparkPayload::JavaException(throwable)) => { + // A Java exception captured inside a JVM UDF kernel (e.g. Spark codegen + // raising INVALID_REGEXP_REPLACE). Re-throw the original throwable so + // callers see the exact Spark exception type. + env.throw(throwable) + } + Some(SparkPayload::WithContext(ctx)) => { + let json_message = ctx.to_json(); + env.throw_new( + jni::jni_str!( + "org/apache/comet/exceptions/CometQueryExecutionException" ), + JNIString::new(json_message), + ) + } + Some(SparkPayload::Bare(spark_error)) => { + throw_spark_error_as_json(env, spark_error) + } + None => { + // Check for file-not-found errors from object store + let error_msg = source.to_string(); + if error_msg.contains("not found") + && error_msg.contains("No such file or directory") + { + let spark_error = SparkError::FileNotFound { message: error_msg }; + throw_spark_error_as_json(env, &spark_error) + } else { + // Not a SparkError, use generic exception + let exception = error.to_exception(); + match backtrace { + Some(backtrace_string) => env.throw_new( + JNIString::new(exception.class), + JNIString::new( + to_stacktrace_string(exception.msg, backtrace_string) + .unwrap(), + ), + ), + _ => env.throw_new( + JNIString::new(exception.class), + JNIString::new(exception.msg), + ), + } } } } diff --git a/spark/src/test/scala/org/apache/comet/CometCodegenSuite.scala b/spark/src/test/scala/org/apache/comet/CometCodegenSuite.scala index 76ffbc8c99..1bca4f4130 100644 --- a/spark/src/test/scala/org/apache/comet/CometCodegenSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCodegenSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateMap, CreateNamedStruct, Expression, Literal, MapConcat} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -1396,6 +1397,41 @@ class CometCodegenSuite } } } + + test("divide-by-zero through dispatched ScalaUDF surfaces SparkArithmeticException (#4517)") { + // When a ScalaUDF is dispatched into the native plan and the outer expression divides + // by the UDF result, the error must surface as SparkArithmeticException(DIVIDE_BY_ZERO), + // not as CometNativeException. Verifies the JNI bridge correctly unwraps the + // DataFusion error chain even when DataFusion 53+ wraps it in Context/External layers. + spark.udf.register("identity_int", (i: java.lang.Integer) => i) + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t") { + sql("CREATE TABLE t (a INT) USING parquet") + sql("INSERT INTO t VALUES (0), (1)") + CometScalaUDFCodegen.resetStats() + val e = intercept[Throwable](sql("SELECT 1 / identity_int(a) FROM t").collect()) + val after = CometScalaUDFCodegen.stats() + assert( + after.compileCount + after.cacheHitCount >= 1, + s"expected codegen dispatcher activity, got $after") + val chain = Iterator.iterate(e)(_.getCause).takeWhile(_ != null).toList + val names = chain.map(_.getClass.getName) + assert( + names.exists(_.contains("SparkArithmeticException")), + s"expected SparkArithmeticException in cause chain, got: $names\n${e.getMessage}") + assert( + !names.exists(_.contains("CometNativeException")), + s"CometNativeException leaked across the JNI boundary: $names\n${e.getMessage}") + // Verify the DIVIDE_BY_ZERO error class is preserved end-to-end through the + // SparkErrorWithContext → CometQueryExecutionException → SparkErrorConverter pipeline. + val sparkEx = chain.find(_.getClass.getName.contains("SparkArithmeticException")).get + assert( + sparkEx.getMessage.contains("DIVIDE_BY_ZERO"), + s"expected DIVIDE_BY_ZERO error class in exception message, got: ${sparkEx.getMessage}") + } + } + } + } /** From d96ea14eea64ac141a44e93f628a0abee0c13caa Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Sun, 14 Jun 2026 14:48:06 +0800 Subject: [PATCH 2/3] fix clippy in error.rs --- native/jni-bridge/src/errors.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native/jni-bridge/src/errors.rs b/native/jni-bridge/src/errors.rs index 80aa37eedd..33896f7f6d 100644 --- a/native/jni-bridge/src/errors.rs +++ b/native/jni-bridge/src/errors.rs @@ -487,10 +487,10 @@ enum SparkPayload<'a> { fn extract_spark_payload(err: &DataFusionError) -> Option> { match err { DataFusionError::External(e) => { - if let Some(comet_err) = e.downcast_ref::() { - if let CometError::JavaException { throwable, .. } = comet_err { - return Some(SparkPayload::JavaException(throwable)); - } + if let Some(CometError::JavaException { throwable, .. }) = + e.downcast_ref::() + { + return Some(SparkPayload::JavaException(throwable)); } if let Some(ctx) = e.downcast_ref::() { return Some(SparkPayload::WithContext(ctx)); From 5ebe171d89a662a72b4e88cb78b404d01519454b Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Mon, 15 Jun 2026 14:56:27 +0800 Subject: [PATCH 3/3] revert the Spark diff changes made in #4514 --- dev/diffs/3.4.3.diff | 30 ------------------------------ dev/diffs/3.5.8.diff | 31 ------------------------------- dev/diffs/4.0.2.diff | 31 ------------------------------- dev/diffs/4.1.2.diff | 27 --------------------------- 4 files changed, 119 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 937a57b79b..987594da17 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -982,36 +982,6 @@ index 525d97e4998..aded8906d75 100644 val sc = spark.sparkContext val hiveVersion = "2.3.9" // transitive=false, only download specified jar -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -index 2dabcf01be7..8fcec0d1ce4 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -@@ -491,8 +491,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper - s"Schema did not match for query #$i\n${expected.sql}: $output") { - output.schema - } -- assertResult(expected.output, s"Result did not match" + -- s" for query #$i\n${expected.sql}") { output.output } -+ // Comet may surface errors as `CometNativeException` instead of the matching Spark -+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a -+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). -+ // Same category, different surface. Collapse both sides to a placeholder when this happens -+ // so the literal compare passes. -+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 -+ val (expectedOut, actualOut) = if (isCometEnabled && -+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && -+ expected.output.contains("\"DIVIDE_BY_ZERO\"") && -+ output.output.startsWith("org.apache.comet.CometNativeException") && -+ output.output.contains("DivideByZero")) { -+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") -+ } else { -+ (expected.output, output.output) -+ } -+ assertResult(expectedOut, s"Result did not match" + -+ s" for query #$i\n${expected.sql}") { actualOut } - } - } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 48ad10992c5..51d1ee65422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 62ed8922db..511fc336a2 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -989,37 +989,6 @@ index 3cf2bfd17ab..5bcf9478e9b 100644 val sc = spark.sparkContext val hiveVersion = "2.3.9" // transitive=false, only download specified jar -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -index 71af1fd69c3..81a04c93c9c 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -@@ -872,9 +872,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper - s"Schema did not match for query #$i\n${expected.sql}: $output") { - output.schema - } -- assertResult(expected.output, s"Result did not match" + -+ // Comet may surface errors as `CometNativeException` instead of the matching Spark -+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a -+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). -+ // Same category, different surface. Collapse both sides to a placeholder when this happens -+ // so the literal compare passes. -+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 -+ val (expectedOut, actualOut) = if (isCometEnabled && -+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && -+ expected.output.contains("\"DIVIDE_BY_ZERO\"") && -+ output.output.startsWith("org.apache.comet.CometNativeException") && -+ output.output.contains("DivideByZero")) { -+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") -+ } else { -+ (expected.output, output.output) -+ } -+ assertResult(expectedOut, s"Result did not match" + - s" for query #$i\n${expected.sql}") { -- output.output -+ actualOut - } - } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 8b4ac474f87..3f79f20822f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 037f15398f..b4fbb815d9 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -1126,37 +1126,6 @@ index f294ff81021..02d72be8d29 100644 verifyCallCount( df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -index 575a4ae69d1..129d9f27232 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -@@ -679,9 +679,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper - s"Schema did not match for query #$i\n${expected.sql}: $output") { - output.schema - } -- assertResult(expected.output, s"Result did not match" + -+ // Comet may surface errors as `CometNativeException` instead of the matching Spark -+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a -+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). -+ // Same category, different surface. Collapse both sides to a placeholder when this happens -+ // so the literal compare passes. -+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 -+ val (expectedOut, actualOut) = if (isCometEnabled && -+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && -+ expected.output.contains("\"DIVIDE_BY_ZERO\"") && -+ output.output.startsWith("org.apache.comet.CometNativeException") && -+ output.output.contains("DivideByZero")) { -+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") -+ } else { -+ (expected.output, output.output) -+ } -+ assertResult(expectedOut, s"Result did not match" + - s" for query #$i\n${expected.sql}") { -- output.output -+ actualOut - } - } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index c1c041509c3..7d463e4b85e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/4.1.2.diff b/dev/diffs/4.1.2.diff index c68af22744..2242368e94 100644 --- a/dev/diffs/4.1.2.diff +++ b/dev/diffs/4.1.2.diff @@ -1218,33 +1218,6 @@ index 23f0144dcec..2586d93d630 100644 ) ++ otherIgnoreList /** List of test cases that require TPCDS table schemas to be loaded. */ private def requireTPCDSCases: Seq[String] = Seq("pipe-operators.sql") -@@ -682,9 +691,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper - s"Schema did not match for query #$i\n${expected.sql}: $output") { - output.schema - } -- assertResult(expected.output, s"Result did not match" + -+ // Comet may surface errors as `CometNativeException` instead of the matching Spark -+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a -+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). -+ // Same category, different surface. Collapse both sides to a placeholder when this happens -+ // so the literal compare passes. -+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 -+ val (expectedOut, actualOut) = if (isCometEnabled && -+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && -+ expected.output.contains("\"DIVIDE_BY_ZERO\"") && -+ output.output.startsWith("org.apache.comet.CometNativeException") && -+ output.output.contains("DivideByZero")) { -+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") -+ } else { -+ (expected.output, output.output) -+ } -+ assertResult(expectedOut, s"Result did not match" + - s" for query #$i\n${expected.sql}") { -- output.output -+ actualOut - } - } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 66826a9ca76..ab4265a5fb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala