Skip to content

Commit 7c4e63d

Browse files
authored
feat(datafusion): route catalog IO off the caller's runtime in IcebergTableProvider (#20)
Adds `try_new_with_runtime` to `IcebergTableProvider`, `IcebergSchemaProvider`, and `IcebergCatalogProvider`. When a `Runtime` is supplied, the IO-bound work in `scan` (catalog reload, `plan_files`, manifest fetches) and `insert_into` (catalog reload) is spawned on `runtime.io()` via a new `run_on_io` helper, so the calling runtime — typically DataFusion's CPU runtime in a split setup — only awaits a join handle. The existing `try_new` signatures are unchanged.
1 parent f16475e commit 7c4e63d

3 files changed

Lines changed: 125 additions & 50 deletions

File tree

crates/integrations/datafusion/src/catalog.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121

2222
use datafusion::catalog::{CatalogProvider, SchemaProvider};
2323
use futures::future::try_join_all;
24-
use iceberg::{Catalog, NamespaceIdent, Result};
24+
use iceberg::{Catalog, NamespaceIdent, Result, Runtime};
2525

2626
use crate::schema::IcebergSchemaProvider;
2727

@@ -47,6 +47,14 @@ impl IcebergCatalogProvider {
4747
/// attempts to create a schema provider for each namespace, and
4848
/// collects these providers into a `HashMap`.
4949
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
50+
Self::try_new_with_runtime(client, None).await
51+
}
52+
53+
/// Like [`Self::try_new`], propagating `runtime` to all child providers.
54+
pub async fn try_new_with_runtime(
55+
client: Arc<dyn Catalog>,
56+
runtime: Option<Runtime>,
57+
) -> Result<Self> {
5058
// TODO:
5159
// Schemas and providers should be cached and evicted based on time
5260
// As of right now; schemas might become stale.
@@ -61,9 +69,10 @@ impl IcebergCatalogProvider {
6169
schema_names
6270
.iter()
6371
.map(|name| {
64-
IcebergSchemaProvider::try_new(
72+
IcebergSchemaProvider::try_new_with_runtime(
6573
client.clone(),
6674
NamespaceIdent::new(name.clone()),
75+
runtime.clone(),
6776
)
6877
})
6978
.collect::<Vec<_>>(),

crates/integrations/datafusion/src/schema.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use futures::StreamExt;
2929
use futures::future::try_join_all;
3030
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
3131
use iceberg::inspect::MetadataTableType;
32-
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};
32+
use iceberg::{
33+
Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableCreation, TableIdent,
34+
};
3335

3436
use crate::table::IcebergTableProvider;
3537
use crate::to_datafusion_error;
@@ -47,19 +49,21 @@ pub(crate) struct IcebergSchemaProvider {
4749
/// [`TableProvider`] trait.
4850
/// Wrapped in Arc to allow sharing across async boundaries in register_table.
4951
tables: Arc<DashMap<String, Arc<IcebergTableProvider>>>,
52+
/// Propagated to every [`IcebergTableProvider`] created by this provider.
53+
runtime: Option<Runtime>,
5054
}
5155

5256
impl IcebergSchemaProvider {
5357
/// Asynchronously tries to construct a new [`IcebergSchemaProvider`]
5458
/// using the given client to fetch and initialize table providers for
5559
/// the provided namespace in the Iceberg [`Catalog`].
5660
///
57-
/// This method retrieves a list of table names
58-
/// attempts to create a table provider for each table name, and
59-
/// collects these providers into a `HashMap`.
60-
pub(crate) async fn try_new(
61+
/// `runtime` is propagated to every [`IcebergTableProvider`] created by
62+
/// this schema provider.
63+
pub(crate) async fn try_new_with_runtime(
6164
client: Arc<dyn Catalog>,
6265
namespace: NamespaceIdent,
66+
runtime: Option<Runtime>,
6367
) -> Result<Self> {
6468
// TODO:
6569
// Tables and providers should be cached based on table_name
@@ -75,7 +79,14 @@ impl IcebergSchemaProvider {
7579
let providers = try_join_all(
7680
table_names
7781
.iter()
78-
.map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name))
82+
.map(|name| {
83+
IcebergTableProvider::try_new_with_runtime(
84+
client.clone(),
85+
namespace.clone(),
86+
name,
87+
runtime.clone(),
88+
)
89+
})
7990
.collect::<Vec<_>>(),
8091
)
8192
.await?;
@@ -89,6 +100,7 @@ impl IcebergSchemaProvider {
89100
catalog: client,
90101
namespace,
91102
tables,
103+
runtime,
92104
})
93105
}
94106
}
@@ -173,6 +185,7 @@ impl SchemaProvider for IcebergSchemaProvider {
173185
let namespace = self.namespace.clone();
174186
let tables = self.tables.clone();
175187
let name_clone = name.clone();
188+
let runtime = self.runtime.clone();
176189

177190
// Use tokio's spawn_blocking to handle the async work on a blocking thread pool
178191
let result = tokio::task::spawn_blocking(move || {
@@ -190,10 +203,11 @@ impl SchemaProvider for IcebergSchemaProvider {
190203
.map_err(to_datafusion_error)?;
191204

192205
// Create a new table provider using the catalog reference
193-
let table_provider = IcebergTableProvider::try_new(
206+
let table_provider = IcebergTableProvider::try_new_with_runtime(
194207
catalog.clone(),
195208
namespace.clone(),
196209
name_clone.clone(),
210+
runtime,
197211
)
198212
.await
199213
.map_err(to_datafusion_error)?;
@@ -315,7 +329,7 @@ mod tests {
315329
.await
316330
.unwrap();
317331

318-
let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace)
332+
let provider = IcebergSchemaProvider::try_new_with_runtime(Arc::new(catalog), namespace, None)
319333
.await
320334
.unwrap();
321335

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 92 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use iceberg::inspect::MetadataTableType;
5151
use iceberg::scan::FileScanTask;
5252
use iceberg::spec::TableProperties;
5353
use iceberg::table::Table;
54-
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
54+
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableIdent};
5555
use metadata_table::IcebergMetadataTableProvider;
5656

5757
use crate::error::to_datafusion_error;
@@ -71,6 +71,10 @@ use crate::physical_plan::write::IcebergWriteExec;
7171
///
7272
/// For read-only access to a specific snapshot without catalog overhead, use
7373
/// [`IcebergStaticTableProvider`] instead.
74+
///
75+
/// When using a CPU/IO split runtime, pass a [`Runtime`] via
76+
/// [`Self::try_new_with_runtime`] so that catalog IO is dispatched to the IO
77+
/// runtime rather than running on the caller's runtime.
7478
#[derive(Debug, Clone)]
7579
pub struct IcebergTableProvider {
7680
/// The catalog that manages this table
@@ -79,6 +83,8 @@ pub struct IcebergTableProvider {
7983
table_ident: TableIdent,
8084
/// A reference-counted arrow `Schema` (cached at construction)
8185
schema: ArrowSchemaRef,
86+
/// When `Some`, IO in `scan` and `insert_into` is spawned on `runtime.io()`.
87+
runtime: Option<Runtime>,
8288
}
8389

8490
impl IcebergTableProvider {
@@ -90,6 +96,17 @@ impl IcebergTableProvider {
9096
catalog: Arc<dyn Catalog>,
9197
namespace: NamespaceIdent,
9298
name: impl Into<String>,
99+
) -> Result<Self> {
100+
Self::try_new_with_runtime(catalog, namespace, name, None).await
101+
}
102+
103+
/// Like [`Self::try_new`], but routes catalog IO in `scan` and `insert_into`
104+
/// through `runtime.io()` instead of running inline on the caller's runtime.
105+
pub async fn try_new_with_runtime(
106+
catalog: Arc<dyn Catalog>,
107+
namespace: NamespaceIdent,
108+
name: impl Into<String>,
109+
runtime: Option<Runtime>,
93110
) -> Result<Self> {
94111
let table_ident = TableIdent::new(namespace, name.into());
95112

@@ -100,9 +117,25 @@ impl IcebergTableProvider {
100117
catalog,
101118
table_ident,
102119
schema,
120+
runtime,
103121
})
104122
}
105123

124+
/// Runs `fut` on `self.runtime.io()` if set, otherwise runs it inline.
125+
async fn run_on_io<F, T>(&self, fut: F) -> DFResult<T>
126+
where
127+
F: std::future::Future<Output = DFResult<T>> + Send + 'static,
128+
T: Send + 'static,
129+
{
130+
match &self.runtime {
131+
Some(rt) => match rt.io().spawn(fut).await {
132+
Ok(inner) => inner,
133+
Err(e) => Err(to_datafusion_error(e)),
134+
},
135+
None => fut.await,
136+
}
137+
}
138+
106139
pub(crate) async fn metadata_table(
107140
&self,
108141
r#type: MetadataTableType,
@@ -133,43 +166,58 @@ impl TableProvider for IcebergTableProvider {
133166
filters: &[Expr],
134167
limit: Option<usize>,
135168
) -> DFResult<Arc<dyn ExecutionPlan>> {
136-
// Second load: fetch the latest snapshot so scans always reflect current table state.
137-
let table = self
138-
.catalog
139-
.load_table(&self.table_ident)
140-
.await
141-
.map_err(to_datafusion_error)?;
142-
143-
// Build a TableScan mirroring the inputs we'll hand to IcebergTableScan,
144-
// so plan_files() uses the same projection/filters the scan will replay in execute().
145-
let col_names = projection.map(|indices| {
146-
indices
147-
.iter()
148-
.map(|&i| self.schema.field(i).name().clone())
149-
.collect::<Vec<_>>()
150-
});
151-
169+
// Compute the predicate on the caller's runtime (pure CPU, no IO).
152170
let predicate = convert_filters_to_predicate(filters);
153171

154-
let mut builder = table.scan();
155-
builder = match col_names {
156-
Some(names) => builder.select(names),
157-
None => builder.select_all(),
158-
};
159-
if let Some(pred) = predicate {
160-
builder = builder.with_filter(pred);
161-
}
162-
163-
let tasks: Vec<FileScanTask> = builder
164-
.build()
165-
.map_err(to_datafusion_error)?
166-
.plan_files()
167-
.await
168-
.map_err(to_datafusion_error)?
169-
.try_collect::<Vec<_>>()
170-
.await
171-
.map_err(to_datafusion_error)?;
172+
// Capture everything the IO closure needs; Session is not Send.
173+
let target_partitions = state.config().target_partitions();
174+
let projection_owned: Option<Vec<usize>> = projection.cloned();
175+
let catalog = self.catalog.clone();
176+
let table_ident = self.table_ident.clone();
177+
let arrow_schema = self.schema.clone();
178+
179+
// ── IO-bound: reload table + plan files ──────────────────────────────
180+
// Spawned on `self.runtime.io()` when configured, otherwise runs inline.
181+
let (table, tasks) = self
182+
.run_on_io(async move {
183+
// Second load: fetch the latest snapshot so scans always reflect
184+
// current table state.
185+
let table = catalog
186+
.load_table(&table_ident)
187+
.await
188+
.map_err(to_datafusion_error)?;
189+
190+
let col_names = projection_owned.as_ref().map(|indices| {
191+
indices
192+
.iter()
193+
.map(|&i| arrow_schema.field(i).name().clone())
194+
.collect::<Vec<_>>()
195+
});
196+
197+
let mut builder = table.scan();
198+
builder = match col_names {
199+
Some(names) => builder.select(names),
200+
None => builder.select_all(),
201+
};
202+
if let Some(pred) = predicate {
203+
builder = builder.with_filter(pred);
204+
}
205+
206+
let tasks: Vec<FileScanTask> = builder
207+
.build()
208+
.map_err(to_datafusion_error)?
209+
.plan_files()
210+
.await
211+
.map_err(to_datafusion_error)?
212+
.try_collect::<Vec<_>>()
213+
.await
214+
.map_err(to_datafusion_error)?;
215+
216+
DFResult::Ok((table, tasks))
217+
})
218+
.await?;
172219

220+
// ── CPU-bound: schema projection + bucketing ──────────────────────────
173221
// Output schema after projection: column indices in `Hash` exprs and any
174222
// Arrow array we hash must reference this schema, not the full table schema.
175223
let output_schema = match projection {
@@ -179,7 +227,6 @@ impl TableProvider for IcebergTableProvider {
179227
})?),
180228
};
181229

182-
let target_partitions = state.config().target_partitions();
183230
// Always produce at least 1 partition so that DataFusion can schedule
184231
// the plan normally and callers can safely call execute(0). An empty
185232
// bucket simply yields an empty record-batch stream.
@@ -232,11 +279,16 @@ impl TableProvider for IcebergTableProvider {
232279
input: Arc<dyn ExecutionPlan>,
233280
_insert_op: InsertOp,
234281
) -> DFResult<Arc<dyn ExecutionPlan>> {
282+
let catalog = self.catalog.clone();
283+
let table_ident = self.table_ident.clone();
235284
let table = self
236-
.catalog
237-
.load_table(&self.table_ident)
238-
.await
239-
.map_err(to_datafusion_error)?;
285+
.run_on_io(async move {
286+
catalog
287+
.load_table(&table_ident)
288+
.await
289+
.map_err(to_datafusion_error)
290+
})
291+
.await?;
240292

241293
let partition_spec = table.metadata().default_partition_spec();
242294

0 commit comments

Comments
 (0)