Skip to content

Commit 8e81368

Browse files
committed
refactor: extract spark-errors crate to replace callback pattern
Move SparkError, SparkErrorWithContext, QueryContext, and QueryContextMap into a new spark-errors crate that both jvm-bridge and spark-expr depend on. This allows jvm-bridge to directly downcast SparkError variants in throw_exception, eliminating the ExternalErrorHandler callback, OnceCell, and init-time registration that were previously needed.
1 parent 8c0e115 commit 8e81368

12 files changed

Lines changed: 1354 additions & 1290 deletions

File tree

native/Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
# under the License.
1717

1818
[workspace]
19-
default-members = ["core", "spark-expr", "proto", "jvm-bridge"]
20-
members = ["core", "spark-expr", "proto", "jvm-bridge", "hdfs", "fs-hdfs"]
19+
default-members = ["core", "spark-expr", "spark-errors", "proto", "jvm-bridge"]
20+
members = ["core", "spark-expr", "spark-errors", "proto", "jvm-bridge", "hdfs", "fs-hdfs"]
2121
resolver = "2"
2222

2323
[workspace.package]
@@ -43,6 +43,7 @@ datafusion-datasource = { version = "52.2.0" }
4343
datafusion-physical-expr-adapter = { version = "52.2.0" }
4444
datafusion-spark = { version = "52.2.0" }
4545
datafusion-comet-spark-expr = { path = "spark-expr" }
46+
datafusion-comet-spark-errors = { path = "spark-errors" }
4647
datafusion-comet-jvm-bridge = { path = "jvm-bridge" }
4748
datafusion-comet-proto = { path = "proto" }
4849
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/src/lib.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_init(
9191
// Initialize the error handling to capture panic backtraces
9292
errors::init();
9393

94-
// Register SparkError handler for JNI exception throwing
95-
errors::register_external_error_handler(handle_spark_error);
96-
9794
try_unwrap_or_throw(&e, |mut env| {
9895
let path: String = env.get_string(&log_conf_path)?.into();
9996

@@ -122,33 +119,6 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_init(
122119
})
123120
}
124121

125-
/// Handle SparkError variants in DataFusionError::External for JNI exception throwing.
126-
/// Returns true if the error was handled.
127-
fn handle_spark_error(
128-
error: &(dyn std::error::Error + Send + Sync + 'static),
129-
env: &mut JNIEnv,
130-
) -> bool {
131-
use datafusion_comet_spark_expr::{SparkError, SparkErrorWithContext};
132-
133-
if let Some(spark_error_with_ctx) = error.downcast_ref::<SparkErrorWithContext>() {
134-
let json_message = spark_error_with_ctx.to_json();
135-
let _ = env.throw_new(
136-
"org/apache/comet/exceptions/CometQueryExecutionException",
137-
json_message,
138-
);
139-
return true;
140-
}
141-
if let Some(spark_error) = error.downcast_ref::<SparkError>() {
142-
let json_message = spark_error.to_json();
143-
let _ = env.throw_new(
144-
"org/apache/comet/exceptions/CometQueryExecutionException",
145-
json_message,
146-
);
147-
return true;
148-
}
149-
false
150-
}
151-
152122
const LOG_PATTERN: &str = "{d(%y/%m/%d %H:%M:%S)} {l} {f}: {m}{n}";
153123

154124
/// JNI method to check if a specific feature is enabled in the native Rust code.

native/jvm-bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ lazy_static = "1.4.0"
3939
once_cell = "1.18.0"
4040
paste = "1.0.14"
4141
prost = "0.14.3"
42+
datafusion-comet-spark-errors = { workspace = true }
4243

4344
[dev-dependencies]
4445
jni = { version = "0.21", features = ["invocation"] }

native/jvm-bridge/src/errors.rs

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
use arrow::error::ArrowError;
2121
use datafusion::common::DataFusionError;
22+
use datafusion_comet_spark_errors::{SparkError, SparkErrorWithContext};
2223
use jni::errors::{Exception, ToException};
2324
use regex::Regex;
2425

@@ -40,25 +41,9 @@ use jni::sys::{jboolean, jbyte, jchar, jdouble, jfloat, jint, jlong, jobject, js
4041
use jni::objects::{GlobalRef, JThrowable};
4142
use jni::JNIEnv;
4243
use lazy_static::lazy_static;
43-
use once_cell::sync::OnceCell;
4444
use parquet::errors::ParquetError;
4545
use thiserror::Error;
4646

47-
/// Handler for DataFusionError::External errors during JNI exception throwing.
48-
/// Returns true if the error was handled (exception thrown), false to fall through
49-
/// to the default handler.
50-
pub type ExternalErrorHandler =
51-
fn(error: &(dyn std::error::Error + Send + Sync + 'static), env: &mut JNIEnv) -> bool;
52-
53-
static EXTERNAL_ERROR_HANDLER: OnceCell<ExternalErrorHandler> = OnceCell::new();
54-
55-
/// Register a handler for DataFusionError::External errors.
56-
/// This allows the core crate to register SparkError-specific handling
57-
/// without the jvm-bridge crate needing to know about SparkError.
58-
pub fn register_external_error_handler(handler: ExternalErrorHandler) {
59-
let _ = EXTERNAL_ERROR_HANDLER.set(handler);
60-
}
61-
6247
lazy_static! {
6348
static ref PANIC_BACKTRACE: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
6449
}
@@ -489,26 +474,34 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
489474
throwable,
490475
},
491476
} => env.throw(<&JThrowable>::from(throwable.as_obj())),
492-
// Handle DataFusion errors containing external errors (e.g. SparkError)
477+
// Handle DataFusion errors containing SparkError or SparkErrorWithContext
493478
CometError::DataFusion {
494479
msg: _,
495480
source: DataFusionError::External(e),
496481
} => {
497-
// Try registered handler first (e.g. SparkError handling from core)
498-
if let Some(handler) = EXTERNAL_ERROR_HANDLER.get() {
499-
if handler(e.as_ref(), env) {
500-
return;
482+
if let Some(spark_error_with_ctx) = e.downcast_ref::<SparkErrorWithContext>() {
483+
let json_message = spark_error_with_ctx.to_json();
484+
env.throw_new(
485+
"org/apache/comet/exceptions/CometQueryExecutionException",
486+
json_message,
487+
)
488+
} else if let Some(spark_error) = e.downcast_ref::<SparkError>() {
489+
let json_message = spark_error.to_json();
490+
env.throw_new(
491+
"org/apache/comet/exceptions/CometQueryExecutionException",
492+
json_message,
493+
)
494+
} else {
495+
// Fall through to generic exception
496+
let exception = error.to_exception();
497+
match backtrace {
498+
Some(backtrace_string) => env.throw_new(
499+
exception.class,
500+
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
501+
),
502+
_ => env.throw_new(exception.class, exception.msg),
501503
}
502504
}
503-
// Fall through to generic exception
504-
let exception = error.to_exception();
505-
match backtrace {
506-
Some(backtrace_string) => env.throw_new(
507-
exception.class,
508-
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
509-
),
510-
_ => env.throw_new(exception.class, exception.msg),
511-
}
512505
}
513506
_ => {
514507
let exception = error.to_exception();

native/spark-errors/Cargo.toml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-comet-spark-errors"
20+
description = "Apache DataFusion Comet: Spark error types"
21+
version = { workspace = true }
22+
homepage = { workspace = true }
23+
repository = { workspace = true }
24+
authors = { workspace = true }
25+
readme = { workspace = true }
26+
license = { workspace = true }
27+
edition = { workspace = true }
28+
29+
publish = false
30+
31+
[dependencies]
32+
arrow = { workspace = true }
33+
datafusion = { workspace = true }
34+
serde = { version = "1.0", features = ["derive"] }
35+
serde_json = "1.0"
36+
thiserror = { workspace = true }
37+
38+
[lib]
39+
name = "datafusion_comet_spark_errors"
40+
path = "src/lib.rs"

0 commit comments

Comments
 (0)