Skip to content

Commit f6e1a21

Browse files
committed
add IcebergPartitionedTableProvider
1 parent 7ba6295 commit f6e1a21

3 files changed

Lines changed: 150 additions & 0 deletions

File tree

crates/integrations/datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub mod physical_plan;
2525
mod schema;
2626
pub mod table;
2727
pub use physical_plan::IcebergPartitionedScan;
28+
pub use table::partitioned::IcebergPartitionedTableProvider;
2829
pub use table::table_provider_factory::IcebergTableProviderFactory;
2930
pub use table::*;
3031

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
//! table snapshot. Use for consistent analytical queries or time-travel scenarios.
2727
2828
pub mod metadata_table;
29+
pub mod partitioned;
2930
pub mod table_provider_factory;
3031

3132
use std::any::Any;
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::any::Any;
2+
use std::sync::Arc;
3+
4+
use async_trait::async_trait;
5+
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
6+
use datafusion::catalog::Session;
7+
use datafusion::common::DataFusionError;
8+
use datafusion::datasource::{TableProvider, TableType};
9+
use datafusion::error::Result as DFResult;
10+
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
11+
use datafusion::physical_plan::ExecutionPlan;
12+
use futures::TryStreamExt;
13+
use iceberg::arrow::schema_to_arrow_schema;
14+
use iceberg::scan::FileScanTask;
15+
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
16+
17+
use crate::error::to_datafusion_error;
18+
use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate;
19+
use crate::physical_plan::partitioned_scan::IcebergPartitionedScan;
20+
21+
#[derive(Debug, Clone)]
22+
pub struct IcebergPartitionedTableProvider {
23+
catalog: Arc<dyn Catalog>,
24+
table_ident: TableIdent,
25+
schema: ArrowSchemaRef,
26+
}
27+
28+
impl IcebergPartitionedTableProvider {
29+
pub async fn try_new(
30+
catalog: Arc<dyn Catalog>,
31+
namespace: NamespaceIdent,
32+
name: impl Into<String>,
33+
) -> Result<Self> {
34+
let table_ident = TableIdent::new(namespace, name.into());
35+
let table = catalog.load_table(&table_ident).await?;
36+
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
37+
Ok(Self {
38+
catalog,
39+
table_ident,
40+
schema,
41+
})
42+
}
43+
44+
pub async fn scan_without_session(
45+
&self,
46+
projection: Option<Vec<usize>>,
47+
filters: Vec<Expr>,
48+
limit: Option<usize>,
49+
) -> DFResult<IcebergPartitionedScan> {
50+
let table = self
51+
.catalog
52+
.load_table(&self.table_ident)
53+
.await
54+
.map_err(to_datafusion_error)?;
55+
56+
let col_names = projection.as_ref().map(|indices| {
57+
indices
58+
.iter()
59+
.map(|&i| self.schema.field(i).name().clone())
60+
.collect::<Vec<_>>()
61+
});
62+
63+
let predicate = convert_filters_to_predicate(&filters);
64+
65+
let mut builder = table.scan();
66+
builder = match col_names {
67+
Some(names) => builder.select(names),
68+
None => builder.select_all(),
69+
};
70+
if let Some(pred) = predicate {
71+
builder = builder.with_filter(pred);
72+
}
73+
74+
let tasks: Vec<FileScanTask> = builder
75+
.build()
76+
.map_err(to_datafusion_error)?
77+
.plan_files()
78+
.await
79+
.map_err(to_datafusion_error)?
80+
.try_collect()
81+
.await
82+
.map_err(to_datafusion_error)?;
83+
84+
let output_schema = match &projection {
85+
None => self.schema.clone(),
86+
Some(indices) => Arc::new(self.schema.project(indices).map_err(|e| {
87+
DataFusionError::Internal(format!("schema projection failed: {e}"))
88+
})?),
89+
};
90+
91+
let file_io = table.file_io().clone();
92+
93+
Ok(IcebergPartitionedScan::new(
94+
tasks,
95+
file_io,
96+
output_schema,
97+
limit,
98+
))
99+
}
100+
}
101+
102+
#[async_trait]
103+
impl TableProvider for IcebergPartitionedTableProvider {
104+
fn as_any(&self) -> &dyn Any {
105+
self
106+
}
107+
108+
fn schema(&self) -> ArrowSchemaRef {
109+
self.schema.clone()
110+
}
111+
112+
fn table_type(&self) -> TableType {
113+
TableType::Base
114+
}
115+
116+
async fn scan(
117+
&self,
118+
_state: &dyn Session,
119+
projection: Option<&Vec<usize>>,
120+
filters: &[Expr],
121+
limit: Option<usize>,
122+
) -> DFResult<Arc<dyn ExecutionPlan>> {
123+
let scan = self
124+
.scan_without_session(projection.cloned(), filters.to_vec(), limit)
125+
.await?;
126+
Ok(Arc::new(scan))
127+
}
128+
129+
fn supports_filters_pushdown(
130+
&self,
131+
filters: &[&Expr],
132+
) -> DFResult<Vec<TableProviderFilterPushDown>> {
133+
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
134+
}
135+
136+
async fn insert_into(
137+
&self,
138+
_state: &dyn Session,
139+
_input: Arc<dyn ExecutionPlan>,
140+
_insert_op: datafusion::logical_expr::dml::InsertOp,
141+
) -> DFResult<Arc<dyn ExecutionPlan>> {
142+
Err(DataFusionError::NotImplemented(
143+
"IcebergPartitionedTableProvider does not support writes; \
144+
use IcebergTableProvider instead"
145+
.to_string(),
146+
))
147+
}
148+
}

0 commit comments

Comments
 (0)