Skip to content

Commit 22b95b4

Browse files
committed
df
1 parent 53a835b commit 22b95b4

2 files changed

Lines changed: 124 additions & 37 deletions

File tree

  • crates/integrations/datafusion/src

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 114 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use std::any::Any;
1919
use std::pin::Pin;
2020
use std::sync::Arc;
21-
use std::vec;
2221

2322
use datafusion::arrow::array::RecordBatch;
2423
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
@@ -30,7 +29,10 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3029
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
3130
use datafusion::prelude::Expr;
3231
use futures::{Stream, TryStreamExt};
32+
use iceberg::arrow::ArrowReaderBuilder;
3333
use iceberg::expr::Predicate;
34+
use iceberg::io::FileIO;
35+
use iceberg::scan::CombinedScanTask;
3436
use iceberg::table::Table;
3537

3638
use super::expr_to_predicate::convert_filters_to_predicate;
@@ -53,10 +55,18 @@ pub struct IcebergTableScan {
5355
predicates: Option<Predicate>,
5456
/// Optional limit on the number of rows to return
5557
limit: Option<usize>,
58+
/// Pre-planned combined scan tasks, one per DataFusion partition.
59+
/// When `None`, falls back to single-partition planning via `plan_files()`.
60+
combined_tasks: Option<Arc<Vec<CombinedScanTask>>>,
61+
/// FileIO for reading data files.
62+
file_io: FileIO,
5663
}
5764

5865
impl IcebergTableScan {
59-
/// Creates a new [`IcebergTableScan`] object.
66+
/// Creates a new [`IcebergTableScan`] without eagerly planning tasks.
67+
///
68+
/// This produces a single-partition scan. Call [`plan`] after construction
69+
/// to enable multi-partition execution via `plan_tasks()`.
6070
pub(crate) fn new(
6171
table: Table,
6272
snapshot_id: Option<i64>,
@@ -69,9 +79,10 @@ impl IcebergTableScan {
6979
None => schema.clone(),
7080
Some(projection) => Arc::new(schema.project(projection).unwrap()),
7181
};
72-
let plan_properties = Self::compute_properties(output_schema.clone());
82+
let plan_properties = Self::compute_properties(output_schema, 1);
7383
let projection = get_column_names(schema.clone(), projection);
7484
let predicates = convert_filters_to_predicate(filters);
85+
let file_io = table.file_io().clone();
7586

7687
Self {
7788
table,
@@ -80,9 +91,47 @@ impl IcebergTableScan {
8091
projection,
8192
predicates,
8293
limit,
94+
combined_tasks: None,
95+
file_io,
8396
}
8497
}
8598

99+
/// Eagerly plans scan tasks via `plan_tasks()`, enabling multi-partition
100+
/// parallel execution in DataFusion.
101+
///
102+
/// If planning fails (e.g. manifests are unreachable), returns `self`
103+
/// unchanged in single-partition mode. Errors will surface at `execute()` time.
104+
pub(crate) async fn plan(mut self) -> Self {
105+
let combined_tasks = self.try_plan_tasks().await;
106+
if let Ok(tasks) = combined_tasks {
107+
let num_partitions = tasks.len().max(1);
108+
self.plan_properties = Self::compute_properties(self.schema(), num_partitions);
109+
self.combined_tasks = Some(Arc::new(tasks));
110+
}
111+
self
112+
}
113+
114+
async fn try_plan_tasks(&self) -> Result<Vec<CombinedScanTask>, iceberg::Error> {
115+
let scan_builder = match self.snapshot_id {
116+
Some(snapshot_id) => self.table.scan().snapshot_id(snapshot_id),
117+
None => self.table.scan(),
118+
};
119+
let mut scan_builder = match &self.projection {
120+
Some(column_names) => scan_builder.select(column_names.clone()),
121+
None => scan_builder.select_all(),
122+
};
123+
if let Some(ref pred) = self.predicates {
124+
scan_builder = scan_builder.with_filter(pred.clone());
125+
}
126+
let table_scan = scan_builder.build()?;
127+
let combined_tasks: Vec<CombinedScanTask> = table_scan
128+
.plan_tasks()
129+
.await?
130+
.try_collect()
131+
.await?;
132+
Ok(combined_tasks)
133+
}
134+
86135
pub fn table(&self) -> &Table {
87136
&self.table
88137
}
@@ -103,14 +152,10 @@ impl IcebergTableScan {
103152
self.limit
104153
}
105154

106-
/// Computes [`PlanProperties`] used in query optimization.
107-
fn compute_properties(schema: ArrowSchemaRef) -> Arc<PlanProperties> {
108-
// TODO:
109-
// This is more or less a placeholder, to be replaced
110-
// once we support output-partitioning
155+
fn compute_properties(schema: ArrowSchemaRef, num_partitions: usize) -> Arc<PlanProperties> {
111156
Arc::new(PlanProperties::new(
112157
EquivalenceProperties::new(schema),
113-
Partitioning::UnknownPartitioning(1),
158+
Partitioning::UnknownPartitioning(num_partitions),
114159
EmissionType::Incremental,
115160
Boundedness::Bounded,
116161
))
@@ -143,9 +188,41 @@ impl ExecutionPlan for IcebergTableScan {
143188

144189
fn execute(
145190
&self,
146-
_partition: usize,
191+
partition: usize,
147192
_context: Arc<TaskContext>,
148193
) -> DFResult<SendableRecordBatchStream> {
194+
let schema = self.schema();
195+
196+
// If we have pre-planned tasks, use them for partition-aware execution
197+
if let Some(ref combined_tasks) = self.combined_tasks {
198+
let Some(combined_task) = combined_tasks.get(partition) else {
199+
return Ok(Box::pin(RecordBatchStreamAdapter::new(
200+
schema,
201+
futures::stream::empty(),
202+
)));
203+
};
204+
205+
let tasks = combined_task.tasks().to_vec();
206+
let file_io = self.file_io.clone();
207+
208+
let fut = async move {
209+
let task_stream = Box::pin(futures::stream::iter(tasks.into_iter().map(Ok)));
210+
let reader = ArrowReaderBuilder::new(file_io).build();
211+
let stream = reader
212+
.read(task_stream)
213+
.map_err(to_datafusion_error)?
214+
.map_err(to_datafusion_error);
215+
Ok::<_, datafusion::error::DataFusionError>(stream)
216+
};
217+
218+
let stream = futures::stream::once(fut).try_flatten();
219+
return Ok(Box::pin(RecordBatchStreamAdapter::new(
220+
schema,
221+
apply_limit(stream, self.limit),
222+
)));
223+
}
224+
225+
// Fallback: single-partition mode using plan_files() + to_arrow()
149226
let fut = get_batch_stream(
150227
self.table.clone(),
151228
self.snapshot_id,
@@ -154,33 +231,37 @@ impl ExecutionPlan for IcebergTableScan {
154231
);
155232
let stream = futures::stream::once(fut).try_flatten();
156233

157-
// Apply limit if specified
158-
let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
159-
if let Some(limit) = self.limit {
160-
let mut remaining = limit;
161-
Box::pin(stream.try_filter_map(move |batch| {
162-
futures::future::ready(if remaining == 0 {
163-
Ok(None)
164-
} else if batch.num_rows() <= remaining {
165-
remaining -= batch.num_rows();
166-
Ok(Some(batch))
167-
} else {
168-
let limited_batch = batch.slice(0, remaining);
169-
remaining = 0;
170-
Ok(Some(limited_batch))
171-
})
172-
}))
173-
} else {
174-
Box::pin(stream)
175-
};
176-
177234
Ok(Box::pin(RecordBatchStreamAdapter::new(
178-
self.schema(),
179-
limited_stream,
235+
schema,
236+
apply_limit(stream, self.limit),
180237
)))
181238
}
182239
}
183240

241+
/// Apply an optional row limit to a stream of record batches.
242+
fn apply_limit(
243+
stream: impl Stream<Item = DFResult<RecordBatch>> + Send + 'static,
244+
limit: Option<usize>,
245+
) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> {
246+
if let Some(limit) = limit {
247+
let mut remaining = limit;
248+
Box::pin(stream.try_filter_map(move |batch| {
249+
futures::future::ready(if remaining == 0 {
250+
Ok(None)
251+
} else if batch.num_rows() <= remaining {
252+
remaining -= batch.num_rows();
253+
Ok(Some(batch))
254+
} else {
255+
let limited_batch = batch.slice(0, remaining);
256+
remaining = 0;
257+
Ok(Some(limited_batch))
258+
})
259+
}))
260+
} else {
261+
Box::pin(stream)
262+
}
263+
}
264+
184265
impl DisplayAs for IcebergTableScan {
185266
fn fmt_as(
186267
&self,
@@ -195,7 +276,7 @@ impl DisplayAs for IcebergTableScan {
195276
.map_or(String::new(), |v| v.join(",")),
196277
self.predicates
197278
.clone()
198-
.map_or(String::from(""), |p| format!("{p}"))
279+
.map_or(String::from(""), |p| format!("{p}")),
199280
)
200281
}
201282
}

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,17 @@ impl TableProvider for IcebergTableProvider {
137137
.map_err(to_datafusion_error)?;
138138

139139
// Create scan with fresh metadata (always use current snapshot)
140-
Ok(Arc::new(IcebergTableScan::new(
140+
let scan = IcebergTableScan::new(
141141
table,
142142
None, // Always use current snapshot for catalog-backed provider
143143
self.schema.clone(),
144144
projection,
145145
filters,
146146
limit,
147-
)))
147+
)
148+
.plan()
149+
.await;
150+
Ok(Arc::new(scan))
148151
}
149152

150153
fn supports_filters_pushdown(
@@ -315,14 +318,17 @@ impl TableProvider for IcebergStaticTableProvider {
315318
limit: Option<usize>,
316319
) -> DFResult<Arc<dyn ExecutionPlan>> {
317320
// Use cached table (no refresh)
318-
Ok(Arc::new(IcebergTableScan::new(
321+
let scan = IcebergTableScan::new(
319322
self.table.clone(),
320323
self.snapshot_id,
321324
self.schema.clone(),
322325
projection,
323326
filters,
324327
limit,
325-
)))
328+
)
329+
.plan()
330+
.await;
331+
Ok(Arc::new(scan))
326332
}
327333

328334
fn supports_filters_pushdown(

0 commit comments

Comments
 (0)