Skip to content

Commit 1466cd1

Browse files
authored
Fix deadlocks (#34)
1 parent 3bc4bd9 commit 1466cd1

2 files changed

Lines changed: 41 additions & 10 deletions

File tree

crates/catalog/src/lib.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use error::Result;
2+
use futures::TryFutureExt;
3+
use futures::executor::block_on;
24
use snafu::ResultExt;
3-
use tokio::runtime::Builder;
5+
use tokio::runtime::{Builder, Handle, RuntimeFlavor};
6+
use tokio::task;
47

58
#[allow(clippy::module_inception)]
69
pub mod catalog;
@@ -34,6 +37,23 @@ where
3437
.unwrap_or_else(|_| error::ThreadPanickedWhileExecutingFutureSnafu.fail()?)
3538
}
3639

40+
fn block_on_without_deadlock<F>(future: F) -> F::Output
41+
where
42+
F: Future + Send + 'static,
43+
F::Output: Send + 'static,
44+
{
45+
match Handle::try_current() {
46+
Ok(handle) => match handle.runtime_flavor() {
47+
RuntimeFlavor::CurrentThread => block_on(
48+
task::spawn_blocking(|| block_on(future))
49+
.unwrap_or_else(|err| std::panic::resume_unwind(err.into_panic())),
50+
),
51+
_ => task::block_in_place(|| handle.block_on(future)),
52+
},
53+
Err(_) => block_on(future),
54+
}
55+
}
56+
3757
pub mod test_utils {
3858
use datafusion::arrow::array::{ArrayRef, RecordBatch};
3959
use datafusion::arrow::compute::{

crates/catalog/src/schema.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
use crate::df_error;
21
use crate::snowflake_table::CaseInsensitiveTable;
32
use crate::table::{CachingTable, IcebergTableBuilder};
3+
use crate::{block_on_without_deadlock, df_error};
44
use async_trait::async_trait;
55
use dashmap::DashMap;
66
use datafusion::catalog::{SchemaProvider, TableProvider};
77
use datafusion_common::DataFusionError;
88
use datafusion_expr::TableType;
99
use datafusion_iceberg::DataFusionTable;
10-
use futures::executor::block_on;
1110
use iceberg_rust::catalog::Catalog;
1211
use iceberg_rust::catalog::tabular::Tabular as IcebergTabular;
1312
use iceberg_rust_spec::identifier::Identifier;
1413
use snafu::ResultExt;
15-
use snafu::futures::TryFutureExt;
1614
use std::any::Any;
1715
use std::sync::Arc;
1816

@@ -89,11 +87,15 @@ impl SchemaProvider for CachingSchema {
8987
&& let Some(iceberg_builder) = table.as_any().downcast_ref::<IcebergTableBuilder>()
9088
&& table.table_type() != TableType::View
9189
{
92-
let ident = Identifier::new(std::slice::from_ref(&self.name), &name);
93-
block_on(async move {
94-
let mut builder = iceberg_builder.builder.clone();
90+
let catalog = Arc::clone(catalog);
91+
let mut builder = iceberg_builder.builder.clone();
92+
let namespace = vec![self.name.clone()];
93+
let table_name = name.clone();
94+
95+
block_on_without_deadlock(async move {
96+
let ident = Identifier::new(&namespace, &table_name);
9597
let iceberg_table = builder
96-
.build(ident.namespace(), catalog.clone())
98+
.build(ident.namespace(), catalog)
9799
.await
98100
.context(df_error::IcebergSnafu)?;
99101
let tabular = IcebergTabular::Table(iceberg_table);
@@ -121,8 +123,17 @@ impl SchemaProvider for CachingSchema {
121123
if let Some((_, caching_table)) = table {
122124
if caching_table.table_type() != TableType::View {
123125
if let Some(catalog) = &self.iceberg_catalog {
124-
let ident = Identifier::new(std::slice::from_ref(&self.name), name);
125-
block_on(catalog.drop_table(&ident).context(df_error::IcebergSnafu))?;
126+
let catalog = Arc::clone(catalog);
127+
let namespace = vec![self.name.clone()];
128+
let table_name = name.to_string();
129+
130+
block_on_without_deadlock(async move {
131+
let ident = Identifier::new(&namespace, &table_name);
132+
catalog
133+
.drop_table(&ident)
134+
.await
135+
.context(df_error::IcebergSnafu)
136+
})?;
126137
} else {
127138
return self.schema.deregister_table(name);
128139
}

0 commit comments

Comments
 (0)