Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -982,36 +982,6 @@ index 525d97e4998..aded8906d75 100644
val sc = spark.sparkContext
val hiveVersion = "2.3.9"
// transitive=false, only download specified jar
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 2dabcf01be7..8fcec0d1ce4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -491,8 +491,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
s"Schema did not match for query #$i\n${expected.sql}: $output") {
output.schema
}
- assertResult(expected.output, s"Result did not match" +
- s" for query #$i\n${expected.sql}") { output.output }
+ // Comet may surface errors as `CometNativeException` instead of the matching Spark
+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a
+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it).
+ // Same category, different surface. Collapse both sides to a placeholder when this happens
+ // so the literal compare passes.
+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517
+ val (expectedOut, actualOut) = if (isCometEnabled &&
+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") &&
+ expected.output.contains("\"DIVIDE_BY_ZERO\"") &&
+ output.output.startsWith("org.apache.comet.CometNativeException") &&
+ output.output.contains("DivideByZero")) {
+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]")
+ } else {
+ (expected.output, output.output)
+ }
+ assertResult(expectedOut, s"Result did not match" +
+ s" for query #$i\n${expected.sql}") { actualOut }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..51d1ee65422 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
31 changes: 0 additions & 31 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -989,37 +989,6 @@ index 3cf2bfd17ab..5bcf9478e9b 100644
val sc = spark.sparkContext
val hiveVersion = "2.3.9"
// transitive=false, only download specified jar
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 71af1fd69c3..81a04c93c9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -872,9 +872,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
s"Schema did not match for query #$i\n${expected.sql}: $output") {
output.schema
}
- assertResult(expected.output, s"Result did not match" +
+ // Comet may surface errors as `CometNativeException` instead of the matching Spark
+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a
+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it).
+ // Same category, different surface. Collapse both sides to a placeholder when this happens
+ // so the literal compare passes.
+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517
+ val (expectedOut, actualOut) = if (isCometEnabled &&
+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") &&
+ expected.output.contains("\"DIVIDE_BY_ZERO\"") &&
+ output.output.startsWith("org.apache.comet.CometNativeException") &&
+ output.output.contains("DivideByZero")) {
+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]")
+ } else {
+ (expected.output, output.output)
+ }
+ assertResult(expectedOut, s"Result did not match" +
s" for query #$i\n${expected.sql}") {
- output.output
+ actualOut
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 8b4ac474f87..3f79f20822f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
31 changes: 0 additions & 31 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1126,37 +1126,6 @@ index f294ff81021..02d72be8d29 100644

verifyCallCount(
df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 575a4ae69d1..129d9f27232 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -679,9 +679,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
s"Schema did not match for query #$i\n${expected.sql}: $output") {
output.schema
}
- assertResult(expected.output, s"Result did not match" +
+ // Comet may surface errors as `CometNativeException` instead of the matching Spark
+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a
+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it).
+ // Same category, different surface. Collapse both sides to a placeholder when this happens
+ // so the literal compare passes.
+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517
+ val (expectedOut, actualOut) = if (isCometEnabled &&
+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") &&
+ expected.output.contains("\"DIVIDE_BY_ZERO\"") &&
+ output.output.startsWith("org.apache.comet.CometNativeException") &&
+ output.output.contains("DivideByZero")) {
+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]")
+ } else {
+ (expected.output, output.output)
+ }
+ assertResult(expectedOut, s"Result did not match" +
s" for query #$i\n${expected.sql}") {
- output.output
+ actualOut
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index c1c041509c3..7d463e4b85e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
27 changes: 0 additions & 27 deletions dev/diffs/4.1.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1218,33 +1218,6 @@ index 23f0144dcec..2586d93d630 100644
) ++ otherIgnoreList
/** List of test cases that require TPCDS table schemas to be loaded. */
private def requireTPCDSCases: Seq[String] = Seq("pipe-operators.sql")
@@ -682,9 +691,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
s"Schema did not match for query #$i\n${expected.sql}: $output") {
output.schema
}
- assertResult(expected.output, s"Result did not match" +
+ // Comet may surface errors as `CometNativeException` instead of the matching Spark
+ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a
+ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it).
+ // Same category, different surface. Collapse both sides to a placeholder when this happens
+ // so the literal compare passes.
+ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517
+ val (expectedOut, actualOut) = if (isCometEnabled &&
+ expected.output.startsWith("org.apache.spark.SparkArithmeticException") &&
+ expected.output.contains("\"DIVIDE_BY_ZERO\"") &&
+ output.output.startsWith("org.apache.comet.CometNativeException") &&
+ output.output.contains("DivideByZero")) {
+ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]")
+ } else {
+ (expected.output, output.output)
+ }
+ assertResult(expectedOut, s"Result did not match" +
s" for query #$i\n${expected.sql}") {
- output.output
+ actualOut
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 66826a9ca76..ab4265a5fb9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
Expand Down
29 changes: 22 additions & 7 deletions native/core/src/execution/expressions/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,16 @@ impl PhysicalExpr for CheckedBinaryExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> {
let result = self.child.evaluate(batch);

// If there's an error and we have query_context, wrap it
match result {
Err(DataFusionError::External(e)) if self.query_context.is_some() => {
if let Some(spark_err) = e.downcast_ref::<SparkError>() {
match self.child.evaluate(batch) {
Err(e) if self.query_context.is_some() => {
if let Some(spark_err) = extract_spark_error(&e) {
let wrapped = SparkErrorWithContext::with_context(
spark_err.clone(),
Arc::clone(self.query_context.as_ref().unwrap()),
);
Err(DataFusionError::External(Box::new(wrapped)))
} else {
Err(DataFusionError::External(e))
Err(e)
}
}
other => other,
Expand Down Expand Up @@ -173,6 +170,24 @@ use datafusion::logical_expr::Operator as DataFusionOperator;
use datafusion_comet_proto::spark_expression::Expr;
use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode};

/// Recursively unwrap `DataFusionError::Context` / nested `External` layers to find
/// a bare `SparkError`. Mirrors `extract_spark_payload` in `jni-bridge/src/errors.rs`.
fn extract_spark_error(err: &DataFusionError) -> Option<&SparkError> {
match err {
DataFusionError::External(e) => {
if let Some(spark) = e.downcast_ref::<SparkError>() {
return Some(spark);
}
if let Some(inner_df) = e.downcast_ref::<DataFusionError>() {
return extract_spark_error(inner_df);
}
None
}
DataFusionError::Context(_, inner) => extract_spark_error(inner),
_ => None,
}
}

use crate::execution::{
expressions::extract_expr,
operators::ExecutionError,
Expand Down
137 changes: 91 additions & 46 deletions native/jni-bridge/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,47 @@ pub fn unwrap_or_throw_default<T: JNIDefault>(
}
}

/// Payload recovered from a DataFusionError chain.
enum SparkPayload<'a> {
JavaException(&'a Global<JThrowable<'static>>),
WithContext(&'a SparkErrorWithContext),
Bare(&'a SparkError),
}

/// Recursively unwrap `DataFusionError::Context` and `DataFusionError::External` layers
/// until a Spark-typed payload is found, or return `None`.
///
/// DataFusion 53+ wraps errors with `.context(...)` which produces
/// `DataFusionError::Context(description, Box<inner>)`. The JNI bridge must look
/// through this extra layer — and through any doubly-nested `External` — to reach
/// the `SparkError` / `SparkErrorWithContext` that carries the structured exception.
fn extract_spark_payload(err: &DataFusionError) -> Option<SparkPayload<'_>> {
match err {
DataFusionError::External(e) => {
if let Some(CometError::JavaException { throwable, .. }) =
e.downcast_ref::<CometError>()
{
return Some(SparkPayload::JavaException(throwable));
}
if let Some(ctx) = e.downcast_ref::<SparkErrorWithContext>() {
return Some(SparkPayload::WithContext(ctx));
}
if let Some(spark) = e.downcast_ref::<SparkError>() {
return Some(SparkPayload::Bare(spark));
}
// Recurse: External may wrap another DataFusionError (double-wrapping).
if let Some(inner_df) = e.downcast_ref::<DataFusionError>() {
return extract_spark_payload(inner_df);
}
None
}
// DataFusion 53 adds context via `.context(description)` which wraps the error in
// Context(description, Box<inner>). Strip the context wrapper and recurse.
DataFusionError::Context(_, inner) => extract_spark_payload(inner),
_ => None,
}
}

fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option<String>) {
// If there isn't already an exception?
if !env.exception_check() {
Expand All @@ -492,53 +533,57 @@ fn throw_exception(env: &mut Env, error: &CometError, backtrace: Option<String>)
throwable,
},
} => env.throw(throwable),
// Handle DataFusion errors containing SparkError or SparkErrorWithContext
CometError::DataFusion {
msg: _,
source: DataFusionError::External(e),
} => {
if let Some(CometError::JavaException { throwable, .. }) =
e.downcast_ref::<CometError>()
{
// A Java exception captured inside a JVM UDF kernel (e.g. Spark codegen
// raising INVALID_REGEXP_REPLACE). Re-throw the original throwable so callers
// see the exact Spark exception type rather than a wrapped CometNativeException.
env.throw(throwable)
} else if let Some(spark_error_with_ctx) = e.downcast_ref::<SparkErrorWithContext>()
{
let json_message = spark_error_with_ctx.to_json();
env.throw_new(
jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"),
JNIString::new(json_message),
)
} else if let Some(spark_error) = e.downcast_ref::<SparkError>() {
let json_message = spark_error.to_json();
env.throw_new(
jni::jni_str!("org/apache/comet/exceptions/CometQueryExecutionException"),
JNIString::new(json_message),
)
} else {
// Check for file-not-found errors from object store
let error_msg = e.to_string();
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
// Not a SparkError, use generic exception
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
JNIString::new(exception.class),
JNIString::new(
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
),
),
_ => env.throw_new(
JNIString::new(exception.class),
JNIString::new(exception.msg),
// Handle all DataFusion errors, including Context-wrapped chains.
// `extract_spark_payload` recurses through Context / nested External layers to
// find the Spark-typed payload, so this arm covers:
// - DataFusionError::External(SparkErrorWithContext) (normal path)
// - DataFusionError::External(SparkError) (no query context)
// - DataFusionError::Context(_, External(SparkError)) (DF53 context wrapping)
// - DataFusionError::External(External(SparkError)) (double wrapping)
CometError::DataFusion { msg: _, source } => {
match extract_spark_payload(source) {
Some(SparkPayload::JavaException(throwable)) => {
// A Java exception captured inside a JVM UDF kernel (e.g. Spark codegen
// raising INVALID_REGEXP_REPLACE). Re-throw the original throwable so
// callers see the exact Spark exception type.
env.throw(throwable)
}
Some(SparkPayload::WithContext(ctx)) => {
let json_message = ctx.to_json();
env.throw_new(
jni::jni_str!(
"org/apache/comet/exceptions/CometQueryExecutionException"
),
JNIString::new(json_message),
)
}
Some(SparkPayload::Bare(spark_error)) => {
throw_spark_error_as_json(env, spark_error)
}
None => {
// Check for file-not-found errors from object store
let error_msg = source.to_string();
if error_msg.contains("not found")
&& error_msg.contains("No such file or directory")
{
let spark_error = SparkError::FileNotFound { message: error_msg };
throw_spark_error_as_json(env, &spark_error)
} else {
// Not a SparkError, use generic exception
let exception = error.to_exception();
match backtrace {
Some(backtrace_string) => env.throw_new(
JNIString::new(exception.class),
JNIString::new(
to_stacktrace_string(exception.msg, backtrace_string)
.unwrap(),
),
),
_ => env.throw_new(
JNIString::new(exception.class),
JNIString::new(exception.msg),
),
}
}
}
}
Expand Down
Loading
Loading