Skip to content

Commit acf0bbe

Browse files
authored
Refactor error handling to use boxed errors for DataFusionError variants (apache#16672)
- Refactored the `DataFusionError` enum to use `Box<T>` for: - `ArrowError` - `ParquetError` - `AvroError` - `object_store::Error` - `ParserError` - `SchemaError` - `JoinError` - Updated all relevant match arms and constructors to handle boxed errors. - Refactored error-related macros (`arrow_datafusion_err!`, `sql_datafusion_err!`, etc.) to use `Box<T>`. - Adjusted test cases and error assertions for boxed variants. - Documentation update to the upgrade guide to explain the required changes and rationale.
1 parent 3118b81 commit acf0bbe

19 files changed

Lines changed: 184 additions & 108 deletions

File tree

datafusion-cli/src/exec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ impl StatementExecutor {
330330

331331
let df = match ctx.execute_logical_plan(plan).await {
332332
Ok(df) => Ok(df),
333-
Err(DataFusionError::ObjectStore(Generic { store, source: _ }))
334-
if "S3".eq_ignore_ascii_case(store)
333+
Err(DataFusionError::ObjectStore(err))
334+
if matches!(err.as_ref(), Generic { store, source: _ } if "S3".eq_ignore_ascii_case(store))
335335
&& self.statement_for_retry.is_some() =>
336336
{
337337
warn!("S3 region is incorrect, auto-detecting the correct region (this may be slow). Consider updating your region configuration.");

datafusion-cli/src/object_storage.rs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,35 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
19-
use std::error::Error;
20-
use std::fmt::{Debug, Display};
21-
use std::sync::Arc;
22-
23-
use datafusion::common::config::{
24-
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit,
25-
};
26-
use datafusion::common::{config_err, exec_datafusion_err, exec_err};
27-
use datafusion::error::{DataFusionError, Result};
28-
use datafusion::execution::context::SessionState;
29-
3018
use async_trait::async_trait;
3119
use aws_config::BehaviorVersion;
32-
use aws_credential_types::provider::error::CredentialsError;
33-
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
20+
use aws_credential_types::provider::{
21+
error::CredentialsError, ProvideCredentials, SharedCredentialsProvider,
22+
};
23+
use datafusion::{
24+
common::{
25+
config::ConfigEntry, config::ConfigExtension, config::ConfigField,
26+
config::ExtensionOptions, config::TableOptions, config::Visit, config_err,
27+
exec_datafusion_err, exec_err,
28+
},
29+
error::{DataFusionError, Result},
30+
execution::context::SessionState,
31+
};
3432
use log::debug;
35-
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential};
36-
use object_store::gcp::GoogleCloudStorageBuilder;
37-
use object_store::http::HttpBuilder;
38-
use object_store::{ClientOptions, CredentialProvider, ObjectStore};
33+
use object_store::{
34+
aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential},
35+
gcp::GoogleCloudStorageBuilder,
36+
http::HttpBuilder,
37+
ClientOptions, CredentialProvider,
38+
Error::Generic,
39+
ObjectStore,
40+
};
41+
use std::{
42+
any::Any,
43+
error::Error,
44+
fmt::{Debug, Display},
45+
sync::Arc,
46+
};
3947
use url::Url;
4048

4149
#[cfg(not(test))]
@@ -153,10 +161,10 @@ impl CredentialsFromConfig {
153161
let credentials = config
154162
.credentials_provider()
155163
.ok_or_else(|| {
156-
DataFusionError::ObjectStore(object_store::Error::Generic {
164+
DataFusionError::ObjectStore(Box::new(Generic {
157165
store: "S3",
158166
source: "Failed to get S3 credentials aws_config".into(),
159-
})
167+
}))
160168
})?
161169
.clone();
162170

@@ -183,10 +191,10 @@ impl CredentialsFromConfig {
183191
"Error getting credentials from provider: {e}{source_message}",
184192
);
185193

186-
return Err(DataFusionError::ObjectStore(object_store::Error::Generic {
194+
return Err(DataFusionError::ObjectStore(Box::new(Generic {
187195
store: "S3",
188196
source: message.into(),
189-
}));
197+
})));
190198
}
191199
};
192200
Ok(Self {
@@ -206,12 +214,14 @@ impl CredentialProvider for S3CredentialProvider {
206214
type Credential = AwsCredential;
207215

208216
async fn get_credential(&self) -> object_store::Result<Arc<Self::Credential>> {
209-
let creds = self.credentials.provide_credentials().await.map_err(|e| {
210-
object_store::Error::Generic {
211-
store: "S3",
212-
source: Box::new(e),
213-
}
214-
})?;
217+
let creds =
218+
self.credentials
219+
.provide_credentials()
220+
.await
221+
.map_err(|e| Generic {
222+
store: "S3",
223+
source: Box::new(e),
224+
})?;
215225
Ok(Arc::new(AwsCredential {
216226
key_id: creds.access_key_id().to_string(),
217227
secret_key: creds.secret_access_key().to_string(),

datafusion/catalog/src/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,6 @@ impl DataSink for StreamWrite {
435435
write_task
436436
.join_unwind()
437437
.await
438-
.map_err(DataFusionError::ExecutionJoin)?
438+
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))?
439439
}
440440
}

datafusion/common/src/error.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,22 +53,22 @@ pub enum DataFusionError {
5353
/// Error returned by arrow.
5454
///
5555
/// 2nd argument is for optional backtrace
56-
ArrowError(ArrowError, Option<String>),
56+
ArrowError(Box<ArrowError>, Option<String>),
5757
/// Error when reading / writing Parquet data.
5858
#[cfg(feature = "parquet")]
59-
ParquetError(ParquetError),
59+
ParquetError(Box<ParquetError>),
6060
/// Error when reading Avro data.
6161
#[cfg(feature = "avro")]
6262
AvroError(Box<AvroError>),
6363
/// Error when reading / writing to / from an object_store (e.g. S3 or LocalFile)
6464
#[cfg(feature = "object_store")]
65-
ObjectStore(object_store::Error),
65+
ObjectStore(Box<object_store::Error>),
6666
/// Error when an I/O operation fails
6767
IoError(io::Error),
6868
/// Error when SQL is syntactically incorrect.
6969
///
7070
/// 2nd argument is for optional backtrace
71-
SQL(ParserError, Option<String>),
71+
SQL(Box<ParserError>, Option<String>),
7272
/// Error when a feature is not yet implemented.
7373
///
7474
/// These errors are sometimes returned for features that are still in
@@ -107,7 +107,7 @@ pub enum DataFusionError {
107107
///
108108
/// 2nd argument is for optional backtrace
109109
/// Boxing the optional backtrace to prevent <https://rust-lang.github.io/rust-clippy/master/index.html#/result_large_err>
110-
SchemaError(SchemaError, Box<Option<String>>),
110+
SchemaError(Box<SchemaError>, Box<Option<String>>),
111111
/// Error during execution of the query.
112112
///
113113
/// This error is returned when an error happens during execution due to a
@@ -118,7 +118,7 @@ pub enum DataFusionError {
118118
/// [`JoinError`] during execution of the query.
119119
///
120120
/// This error can't occur for unjoined tasks, such as execution shutdown.
121-
ExecutionJoin(JoinError),
121+
ExecutionJoin(Box<JoinError>),
122122
/// Error when resources (such as memory of scratch disk space) are exhausted.
123123
///
124124
/// This error is thrown when a consumer cannot acquire additional memory
@@ -276,14 +276,14 @@ impl From<io::Error> for DataFusionError {
276276

277277
impl From<ArrowError> for DataFusionError {
278278
fn from(e: ArrowError) -> Self {
279-
DataFusionError::ArrowError(e, None)
279+
DataFusionError::ArrowError(Box::new(e), None)
280280
}
281281
}
282282

283283
impl From<DataFusionError> for ArrowError {
284284
fn from(e: DataFusionError) -> Self {
285285
match e {
286-
DataFusionError::ArrowError(e, _) => e,
286+
DataFusionError::ArrowError(e, _) => *e,
287287
DataFusionError::External(e) => ArrowError::ExternalError(e),
288288
other => ArrowError::ExternalError(Box::new(other)),
289289
}
@@ -304,7 +304,7 @@ impl From<&Arc<DataFusionError>> for DataFusionError {
304304
#[cfg(feature = "parquet")]
305305
impl From<ParquetError> for DataFusionError {
306306
fn from(e: ParquetError) -> Self {
307-
DataFusionError::ParquetError(e)
307+
DataFusionError::ParquetError(Box::new(e))
308308
}
309309
}
310310

@@ -318,20 +318,20 @@ impl From<AvroError> for DataFusionError {
318318
#[cfg(feature = "object_store")]
319319
impl From<object_store::Error> for DataFusionError {
320320
fn from(e: object_store::Error) -> Self {
321-
DataFusionError::ObjectStore(e)
321+
DataFusionError::ObjectStore(Box::new(e))
322322
}
323323
}
324324

325325
#[cfg(feature = "object_store")]
326326
impl From<object_store::path::Error> for DataFusionError {
327327
fn from(e: object_store::path::Error) -> Self {
328-
DataFusionError::ObjectStore(e.into())
328+
DataFusionError::ObjectStore(Box::new(e.into()))
329329
}
330330
}
331331

332332
impl From<ParserError> for DataFusionError {
333333
fn from(e: ParserError) -> Self {
334-
DataFusionError::SQL(e, None)
334+
DataFusionError::SQL(Box::new(e), None)
335335
}
336336
}
337337

@@ -361,22 +361,22 @@ impl Display for DataFusionError {
361361
impl Error for DataFusionError {
362362
fn source(&self) -> Option<&(dyn Error + 'static)> {
363363
match self {
364-
DataFusionError::ArrowError(e, _) => Some(e),
364+
DataFusionError::ArrowError(e, _) => Some(e.as_ref()),
365365
#[cfg(feature = "parquet")]
366-
DataFusionError::ParquetError(e) => Some(e),
366+
DataFusionError::ParquetError(e) => Some(e.as_ref()),
367367
#[cfg(feature = "avro")]
368-
DataFusionError::AvroError(e) => Some(e),
368+
DataFusionError::AvroError(e) => Some(e.as_ref()),
369369
#[cfg(feature = "object_store")]
370-
DataFusionError::ObjectStore(e) => Some(e),
370+
DataFusionError::ObjectStore(e) => Some(e.as_ref()),
371371
DataFusionError::IoError(e) => Some(e),
372-
DataFusionError::SQL(e, _) => Some(e),
372+
DataFusionError::SQL(e, _) => Some(e.as_ref()),
373373
DataFusionError::NotImplemented(_) => None,
374374
DataFusionError::Internal(_) => None,
375375
DataFusionError::Configuration(_) => None,
376376
DataFusionError::Plan(_) => None,
377-
DataFusionError::SchemaError(e, _) => Some(e),
377+
DataFusionError::SchemaError(e, _) => Some(e.as_ref()),
378378
DataFusionError::Execution(_) => None,
379-
DataFusionError::ExecutionJoin(e) => Some(e),
379+
DataFusionError::ExecutionJoin(e) => Some(e.as_ref()),
380380
DataFusionError::ResourcesExhausted(_) => None,
381381
DataFusionError::External(e) => Some(e.as_ref()),
382382
DataFusionError::Context(_, e) => Some(e.as_ref()),
@@ -828,7 +828,7 @@ make_error!(resources_err, resources_datafusion_err, ResourcesExhausted);
828828
#[macro_export]
829829
macro_rules! sql_datafusion_err {
830830
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
831-
let err = DataFusionError::SQL($ERR, Some(DataFusionError::get_back_trace()));
831+
let err = DataFusionError::SQL(Box::new($ERR), Some(DataFusionError::get_back_trace()));
832832
$(
833833
let err = err.with_diagnostic($DIAG);
834834
)?
@@ -852,7 +852,7 @@ macro_rules! sql_err {
852852
#[macro_export]
853853
macro_rules! arrow_datafusion_err {
854854
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
855-
let err = DataFusionError::ArrowError($ERR, Some(DataFusionError::get_back_trace()));
855+
let err = DataFusionError::ArrowError(Box::new($ERR), Some(DataFusionError::get_back_trace()));
856856
$(
857857
let err = err.with_diagnostic($DIAG);
858858
)?
@@ -878,7 +878,7 @@ macro_rules! arrow_err {
878878
macro_rules! schema_datafusion_err {
879879
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
880880
let err = $crate::error::DataFusionError::SchemaError(
881-
$ERR,
881+
Box::new($ERR),
882882
Box::new(Some($crate::error::DataFusionError::get_back_trace())),
883883
);
884884
$(
@@ -893,7 +893,7 @@ macro_rules! schema_datafusion_err {
893893
macro_rules! schema_err {
894894
($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
895895
let err = $crate::error::DataFusionError::SchemaError(
896-
$ERR,
896+
Box::new($ERR),
897897
Box::new(Some($crate::error::DataFusionError::get_back_trace())),
898898
);
899899
$(
@@ -951,11 +951,17 @@ pub fn add_possible_columns_to_diag(
951951

952952
#[cfg(test)]
953953
mod test {
954+
use std::mem::size_of;
954955
use std::sync::Arc;
955956

956957
use crate::error::{DataFusionError, GenericError};
957958
use arrow::error::ArrowError;
958959

960+
#[test]
961+
fn test_datafusion_error_size() {
962+
assert_eq!(size_of::<DataFusionError>(), 40);
963+
}
964+
959965
#[test]
960966
fn datafusion_error_to_arrow() {
961967
let res = return_arrow_error().unwrap_err();
@@ -1020,8 +1026,8 @@ mod test {
10201026

10211027
do_root_test(
10221028
DataFusionError::ArrowError(
1023-
ArrowError::ExternalError(Box::new(DataFusionError::ResourcesExhausted(
1024-
"foo".to_string(),
1029+
Box::new(ArrowError::ExternalError(Box::new(
1030+
DataFusionError::ResourcesExhausted("foo".to_string()),
10251031
))),
10261032
None,
10271033
),
@@ -1044,9 +1050,11 @@ mod test {
10441050

10451051
do_root_test(
10461052
DataFusionError::ArrowError(
1047-
ArrowError::ExternalError(Box::new(ArrowError::ExternalError(Box::new(
1048-
DataFusionError::ResourcesExhausted("foo".to_string()),
1049-
)))),
1053+
Box::new(ArrowError::ExternalError(Box::new(
1054+
ArrowError::ExternalError(Box::new(
1055+
DataFusionError::ResourcesExhausted("foo".to_string()),
1056+
)),
1057+
))),
10501058
None,
10511059
),
10521060
DataFusionError::ResourcesExhausted("foo".to_string()),

datafusion/common/src/scalar/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2640,7 +2640,7 @@ impl ScalarValue {
26402640
value_offsets,
26412641
child_arrays,
26422642
)
2643-
.map_err(|e| DataFusionError::ArrowError(e, None))?;
2643+
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
26442644
Arc::new(ar)
26452645
}
26462646
None => {
@@ -6679,10 +6679,7 @@ mod tests {
66796679
let err = value.arithmetic_negate().expect_err("Should receive overflow error on negating {value:?}");
66806680
let root_err = err.find_root();
66816681
match root_err{
6682-
DataFusionError::ArrowError(
6683-
ArrowError::ArithmeticOverflow(_),
6684-
_,
6685-
) => {}
6682+
DataFusionError::ArrowError(err, _) if matches!(err.as_ref(), ArrowError::ArithmeticOverflow(_)) => {}
66866683
_ => return Err(err),
66876684
};
66886685
}

datafusion/core/src/dataframe/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2036,10 +2036,11 @@ impl DataFrame {
20362036
match self.plan.schema().qualified_field_from_column(&old_column) {
20372037
Ok(qualifier_and_field) => qualifier_and_field,
20382038
// no-op if field not found
2039-
Err(DataFusionError::SchemaError(
2040-
SchemaError::FieldNotFound { .. },
2041-
_,
2042-
)) => return Ok(self),
2039+
Err(DataFusionError::SchemaError(e, _))
2040+
if matches!(*e, SchemaError::FieldNotFound { .. }) =>
2041+
{
2042+
return Ok(self);
2043+
}
20432044
Err(err) => return Err(err),
20442045
};
20452046
let projection = self

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ impl FileSink for ArrowFileSink {
294294
demux_task
295295
.join_unwind()
296296
.await
297-
.map_err(DataFusionError::ExecutionJoin)??;
297+
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
298298
Ok(row_count as u64)
299299
}
300300
}

datafusion/core/src/datasource/memory_test.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,15 @@ mod tests {
130130
.scan(&session_ctx.state(), Some(&projection), &[], None)
131131
.await
132132
{
133-
Err(DataFusionError::ArrowError(ArrowError::SchemaError(e), _)) => {
134-
assert_eq!(
135-
"\"project index 4 out of bounds, max field 3\"",
136-
format!("{e:?}")
137-
)
138-
}
133+
Err(DataFusionError::ArrowError(err, _)) => match err.as_ref() {
134+
ArrowError::SchemaError(e) => {
135+
assert_eq!(
136+
"\"project index 4 out of bounds, max field 3\"",
137+
format!("{e:?}")
138+
)
139+
}
140+
_ => panic!("unexpected error"),
141+
},
139142
res => panic!("Scan should failed on invalid projection, got {res:?}"),
140143
};
141144

0 commit comments

Comments
 (0)