Skip to content

Commit deeb6b8

Browse files
author
Альберт Скальт
committed
feat(datafusion): add table provider builder
At the moment, the provider is only available through the Iceberg implementation of the DF catalog, which can be inconvenient when using a custom catalog implementation. This patch adds a public builder, allowing external users to create the provider.
1 parent 50b3740 commit deeb6b8

1 file changed

Lines changed: 52 additions & 11 deletions

File tree

  • crates/integrations/datafusion/src/table

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,55 @@ use crate::physical_plan::scan::IcebergTableScan;
5757
use crate::physical_plan::sort::sort_by_partition;
5858
use crate::physical_plan::write::IcebergWriteExec;
5959

60+
/// Helps to build [`IcebergTableProvider`].
61+
pub struct IcebergTableProviderBuilder {
62+
catalog: Arc<dyn Catalog>,
63+
table_ident: TableIdent,
64+
schema: Option<ArrowSchemaRef>,
65+
}
66+
67+
impl IcebergTableProviderBuilder {
68+
/// Make a new [`IcebergTableProviderBuilder`].
69+
pub fn new(catalog: Arc<dyn Catalog>, table_ident: TableIdent) -> Self {
70+
Self {
71+
catalog,
72+
table_ident,
73+
schema: None,
74+
}
75+
}
76+
77+
/// Explicitly schema a table schema if it is known.
78+
/// If schema it not set explicitly, then builder will automatically
79+
/// load the schema when [`IcebergTableProviderBuilder::build`] is called.
80+
pub fn with_schema(mut self, schema: Option<ArrowSchemaRef>) -> Self {
81+
self.schema = schema;
82+
self
83+
}
84+
85+
/// Build table provider.
86+
pub async fn build(self) -> Result<IcebergTableProvider> {
87+
let Self {
88+
catalog,
89+
table_ident,
90+
schema,
91+
} = self;
92+
93+
let schema = if let Some(schema) = schema {
94+
schema
95+
} else {
96+
// Load table once to get initial schema.
97+
let table = catalog.load_table(&table_ident).await?;
98+
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?)
99+
};
100+
101+
Ok(IcebergTableProvider {
102+
catalog,
103+
table_ident,
104+
schema,
105+
})
106+
}
107+
}
108+
60109
/// Catalog-backed table provider with automatic metadata refresh.
61110
///
62111
/// This provider loads fresh table metadata from the catalog on every scan and write
@@ -85,17 +134,9 @@ impl IcebergTableProvider {
85134
namespace: NamespaceIdent,
86135
name: impl Into<String>,
87136
) -> Result<Self> {
88-
let table_ident = TableIdent::new(namespace, name.into());
89-
90-
// Load table once to get initial schema
91-
let table = catalog.load_table(&table_ident).await?;
92-
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
93-
94-
Ok(IcebergTableProvider {
95-
catalog,
96-
table_ident,
97-
schema,
98-
})
137+
IcebergTableProviderBuilder::new(catalog, TableIdent::new(namespace, name.into()))
138+
.build()
139+
.await
99140
}
100141

101142
pub(crate) async fn metadata_table(

0 commit comments

Comments
 (0)