Skip to content

Commit ba03224

Browse files
committed
feat: [ANSI] Ansi sql error messages
1 parent bd42649 commit ba03224

33 files changed

Lines changed: 3594 additions & 156 deletions

File tree

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.exceptions;
21+
22+
import org.apache.comet.CometNativeException;
23+
24+
/**
25+
* Exception thrown from Comet native execution containing JSON-encoded error information. The
26+
* message contains a JSON object with the following structure:
27+
*
28+
* <pre>
29+
* {
30+
* "errorType": "DivideByZero",
31+
* "errorClass": "DIVIDE_BY_ZERO",
32+
* "params": { ... },
33+
* "context": { "sqlText": "...", "startOffset": 0, "stopOffset": 10 },
34+
* "hint": "Use `try_divide` to tolerate divisor being 0"
35+
* }
36+
* </pre>
37+
*
38+
* CometExecIterator parses this JSON and converts it to the appropriate Spark exception by calling
39+
* the corresponding QueryExecutionErrors.* method.
40+
*/
41+
public final class CometQueryExecutionException extends CometNativeException {
42+
43+
/**
44+
* Creates a new CometQueryExecutionException with a JSON-encoded error message.
45+
*
46+
* @param jsonMessage JSON string containing error information
47+
*/
48+
public CometQueryExecutionException(String jsonMessage) {
49+
super(jsonMessage);
50+
}
51+
52+
/**
53+
* Returns true if the message appears to be JSON-formatted. This is used to distinguish between
54+
* JSON-encoded errors and legacy error messages.
55+
*
56+
* @return true if message starts with '{' and ends with '}'
57+
*/
58+
public boolean isJsonMessage() {
59+
String msg = getMessage();
60+
if (msg == null) return false;
61+
String trimmed = msg.trim();
62+
return trimmed.startsWith("{") && trimmed.endsWith("}");
63+
}
64+
}

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/errors.rs

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use jni::sys::{jboolean, jbyte, jchar, jdouble, jfloat, jint, jlong, jobject, js
3939

4040
use crate::execution::operators::ExecutionError;
4141
use datafusion_comet_spark_expr::SparkError;
42-
use jni::objects::{GlobalRef, JThrowable, JValue};
42+
use jni::objects::{GlobalRef, JThrowable};
4343
use jni::JNIEnv;
4444
use lazy_static::lazy_static;
4545
use parquet::errors::ParquetError;
@@ -223,9 +223,9 @@ impl jni::errors::ToException for CometError {
223223
class: "java/lang/NullPointerException".to_string(),
224224
msg: self.to_string(),
225225
},
226-
CometError::Spark { .. } => Exception {
227-
class: "org/apache/spark/SparkException".to_string(),
228-
msg: self.to_string(),
226+
CometError::Spark(spark_err) => Exception {
227+
class: spark_err.exception_class().to_string(),
228+
msg: spark_err.to_string(),
229229
},
230230
CometError::NumberIntFormat { source: s } => Exception {
231231
class: "java/lang/NumberFormatException".to_string(),
@@ -392,33 +392,37 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
392392
throwable,
393393
},
394394
} => env.throw(<&JThrowable>::from(throwable.as_obj())),
395+
// Handle DataFusion errors containing SparkError - serialize to JSON
395396
CometError::DataFusion {
396397
msg: _,
397398
source: DataFusionError::External(e),
398-
} if matches!(e.downcast_ref(), Some(SparkError::CastOverFlow { .. })) => {
399-
match e.downcast_ref() {
400-
Some(SparkError::CastOverFlow {
401-
value,
402-
from_type,
403-
to_type,
404-
}) => {
405-
let throwable: JThrowable = env
406-
.new_object(
407-
"org/apache/spark/sql/comet/CastOverflowException",
408-
"(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V",
409-
&[
410-
JValue::Object(&env.new_string(value).unwrap()),
411-
JValue::Object(&env.new_string(from_type).unwrap()),
412-
JValue::Object(&env.new_string(to_type).unwrap()),
413-
],
414-
)
415-
.unwrap()
416-
.into();
417-
env.throw(throwable)
399+
} => {
400+
// Try SparkErrorWithContext first (includes context)
401+
if let Some(spark_error_with_ctx) =
402+
e.downcast_ref::<datafusion_comet_spark_expr::SparkErrorWithContext>()
403+
{
404+
let json_message = spark_error_with_ctx.to_json();
405+
env.throw_new(
406+
"org/apache/comet/exceptions/CometQueryExecutionException",
407+
json_message,
408+
)
409+
} else if let Some(spark_error) = e.downcast_ref::<SparkError>() {
410+
// Fall back to plain SparkError (no context)
411+
throw_spark_error_as_json(env, spark_error)
412+
} else {
413+
// Not a SparkError, use generic exception
414+
let exception = error.to_exception();
415+
match backtrace {
416+
Some(backtrace_string) => env.throw_new(
417+
exception.class,
418+
to_stacktrace_string(exception.msg, backtrace_string).unwrap(),
419+
),
420+
_ => env.throw_new(exception.class, exception.msg),
418421
}
419-
_ => unreachable!(),
420422
}
421423
}
424+
// Handle direct SparkError - serialize to JSON
425+
CometError::Spark(spark_error) => throw_spark_error_as_json(env, spark_error),
422426
_ => {
423427
let exception = error.to_exception();
424428
match backtrace {
@@ -434,6 +438,21 @@ fn throw_exception(env: &mut JNIEnv, error: &CometError, backtrace: Option<Strin
434438
}
435439
}
436440

441+
/// Throws a CometQueryExecutionException with JSON-encoded SparkError
442+
fn throw_spark_error_as_json(
443+
env: &mut JNIEnv,
444+
spark_error: &SparkError,
445+
) -> jni::errors::Result<()> {
446+
// Serialize error to JSON
447+
let json_message = spark_error.to_json();
448+
449+
// Throw CometQueryExecutionException with JSON message
450+
env.throw_new(
451+
"org/apache/comet/exceptions/CometQueryExecutionException",
452+
json_message,
453+
)
454+
}
455+
437456
#[derive(Debug, Error)]
438457
enum StacktraceError {
439458
#[error("Unable to initialize message: {0}")]

native/core/src/execution/expressions/arithmetic.rs

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,122 @@
1717

1818
//! Arithmetic expression builders
1919
20+
use std::any::Any;
21+
use std::fmt::{Display, Formatter};
22+
use std::hash::{Hash, Hasher};
23+
24+
use arrow::datatypes::{DataType, Schema};
25+
use arrow::record_batch::RecordBatch;
26+
use datafusion::common::DataFusionError;
27+
use datafusion::logical_expr::ColumnarValue;
28+
use datafusion::physical_expr::PhysicalExpr;
29+
use datafusion_comet_spark_expr::{QueryContext, SparkError, SparkErrorWithContext};
30+
31+
/// Wrapper expression that catches and wraps SparkError with QueryContext
32+
/// for binary arithmetic operations.
33+
#[derive(Debug)]
34+
pub struct CheckedBinaryExpr {
35+
/// The underlying physical expression (typically a ScalarFunctionExpr)
36+
child: Arc<dyn PhysicalExpr>,
37+
/// Optional query context to attach to errors
38+
query_context: Option<Arc<QueryContext>>,
39+
}
40+
41+
impl CheckedBinaryExpr {
42+
pub fn new(child: Arc<dyn PhysicalExpr>, query_context: Option<Arc<QueryContext>>) -> Self {
43+
Self {
44+
child,
45+
query_context,
46+
}
47+
}
48+
}
49+
50+
impl Display for CheckedBinaryExpr {
51+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52+
write!(f, "CheckedBinaryExpr({})", self.child)
53+
}
54+
}
55+
56+
impl PartialEq for CheckedBinaryExpr {
57+
fn eq(&self, other: &Self) -> bool {
58+
self.child.eq(&other.child)
59+
}
60+
}
61+
62+
impl Eq for CheckedBinaryExpr {}
63+
64+
impl PartialEq<dyn Any> for CheckedBinaryExpr {
65+
fn eq(&self, other: &dyn Any) -> bool {
66+
other
67+
.downcast_ref::<Self>()
68+
.map(|x| self.eq(x))
69+
.unwrap_or(false)
70+
}
71+
}
72+
73+
impl Hash for CheckedBinaryExpr {
74+
fn hash<H: Hasher>(&self, state: &mut H) {
75+
self.child.hash(state);
76+
}
77+
}
78+
79+
impl PhysicalExpr for CheckedBinaryExpr {
80+
fn as_any(&self) -> &dyn Any {
81+
self
82+
}
83+
84+
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85+
self.child.fmt_sql(f)
86+
}
87+
88+
fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> {
89+
self.child.data_type(input_schema)
90+
}
91+
92+
fn nullable(&self, input_schema: &Schema) -> datafusion::common::Result<bool> {
93+
self.child.nullable(input_schema)
94+
}
95+
96+
fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> {
97+
let result = self.child.evaluate(batch);
98+
99+
// If there's an error and we have query_context, wrap it
100+
match result {
101+
Err(DataFusionError::External(e)) if self.query_context.is_some() => {
102+
if let Some(spark_err) = e.downcast_ref::<SparkError>() {
103+
let wrapped = SparkErrorWithContext::with_context(
104+
spark_err.clone(),
105+
Arc::clone(self.query_context.as_ref().unwrap()),
106+
);
107+
Err(DataFusionError::External(Box::new(wrapped)))
108+
} else {
109+
Err(DataFusionError::External(e))
110+
}
111+
}
112+
other => other,
113+
}
114+
}
115+
116+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
117+
vec![&self.child]
118+
}
119+
120+
fn with_new_children(
121+
self: Arc<Self>,
122+
children: Vec<Arc<dyn PhysicalExpr>>,
123+
) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> {
124+
match children.len() {
125+
1 => Ok(Arc::new(CheckedBinaryExpr::new(
126+
Arc::clone(&children[0]),
127+
self.query_context.clone(),
128+
))),
129+
_ => Err(DataFusionError::Internal(
130+
"CheckedBinaryExpr should have exactly one child".to_string(),
131+
)),
132+
}
133+
}
134+
}
135+
20136
/// Macro to generate arithmetic expression builders that need eval_mode handling
21137
#[macro_export]
22138
macro_rules! arithmetic_expr_builder {
@@ -37,6 +153,7 @@ macro_rules! arithmetic_expr_builder {
37153
let eval_mode =
38154
$crate::execution::planner::from_protobuf_eval_mode(expr.eval_mode)?;
39155
planner.create_binary_expr(
156+
spark_expr, // Pass the full spark_expr for query_context lookup
40157
expr.left.as_ref().unwrap(),
41158
expr.right.as_ref().unwrap(),
42159
expr.return_type.as_ref(),
@@ -53,7 +170,6 @@ use std::sync::Arc;
53170

54171
use arrow::datatypes::SchemaRef;
55172
use datafusion::logical_expr::Operator as DataFusionOperator;
56-
use datafusion::physical_expr::PhysicalExpr;
57173
use datafusion_comet_proto::spark_expression::Expr;
58174
use datafusion_comet_spark_expr::{create_modulo_expr, create_negate_expr, EvalMode};
59175

@@ -95,6 +211,7 @@ impl ExpressionBuilder for IntegralDivideBuilder {
95211
let expr = extract_expr!(spark_expr, IntegralDivide);
96212
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
97213
planner.create_binary_expr_with_options(
214+
spark_expr, // Pass the full spark_expr for query_context lookup
98215
expr.left.as_ref().unwrap(),
99216
expr.right.as_ref().unwrap(),
100217
expr.return_type.as_ref(),

0 commit comments

Comments
 (0)