Skip to content

Commit b72ca68

Browse files
committed
big insertion
1 parent bc469c3 commit b72ca68

2 files changed

Lines changed: 110 additions & 2 deletions

File tree

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@ use std::sync::Arc;
2424
use async_trait::async_trait;
2525
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2626
use datafusion::catalog::Session;
27+
use datafusion::common::DataFusionError;
2728
use datafusion::datasource::{TableProvider, TableType};
2829
use datafusion::error::Result as DFResult;
30+
use datafusion::logical_expr::dml::InsertOp;
2931
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
3032
use datafusion::physical_plan::ExecutionPlan;
33+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3134
use iceberg::arrow::schema_to_arrow_schema;
3235
use iceberg::inspect::MetadataTableType;
3336
use iceberg::table::Table;
3437
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
3538
use metadata_table::IcebergMetadataTableProvider;
3639

40+
use crate::physical_plan::commit::IcebergCommitExec;
3741
use crate::physical_plan::scan::IcebergTableScan;
42+
use crate::physical_plan::write::IcebergWriteExec;
3843

3944
/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
4045
/// managing access to a [`Table`].
@@ -46,6 +51,8 @@ pub struct IcebergTableProvider {
4651
snapshot_id: Option<i64>,
4752
/// A reference-counted arrow `Schema`.
4853
schema: ArrowSchemaRef,
54+
/// The catalog that the table belongs to.
55+
catalog: Option<Arc<dyn Catalog>>,
4956
}
5057

5158
impl IcebergTableProvider {
@@ -54,6 +61,7 @@ impl IcebergTableProvider {
5461
table,
5562
snapshot_id: None,
5663
schema,
64+
catalog: None,
5765
}
5866
}
5967
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
@@ -73,6 +81,7 @@ impl IcebergTableProvider {
7381
table,
7482
snapshot_id: None,
7583
schema,
84+
catalog: Some(client),
7685
})
7786
}
7887

@@ -84,6 +93,7 @@ impl IcebergTableProvider {
8493
table,
8594
snapshot_id: None,
8695
schema,
96+
catalog: None,
8797
})
8898
}
8999

@@ -108,6 +118,7 @@ impl IcebergTableProvider {
108118
table,
109119
snapshot_id: Some(snapshot_id),
110120
schema,
121+
catalog: None,
111122
})
112123
}
113124

@@ -152,11 +163,52 @@ impl TableProvider for IcebergTableProvider {
152163
fn supports_filters_pushdown(
153164
&self,
154165
filters: &[&Expr],
155-
) -> std::result::Result<Vec<TableProviderFilterPushDown>, datafusion::error::DataFusionError>
156-
{
166+
) -> std::result::Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
157167
// Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
158168
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
159169
}
170+
171+
async fn insert_into(
172+
&self,
173+
_state: &dyn Session,
174+
input: Arc<dyn ExecutionPlan>,
175+
_insert_op: InsertOp,
176+
) -> DFResult<Arc<dyn ExecutionPlan>> {
177+
if !self
178+
.table
179+
.metadata()
180+
.default_partition_spec()
181+
.is_unpartitioned()
182+
{
183+
// TODO add insert into support for partitioned tables
184+
return Err(DataFusionError::NotImplemented(
185+
"IcebergTableProvider::insert_into does not support partitioned tables yet"
186+
.to_string(),
187+
));
188+
}
189+
190+
let Some(catalog) = self.catalog.clone() else {
191+
return Err(DataFusionError::Execution(
192+
"Catalog cannot be none for insert_into".to_string(),
193+
));
194+
};
195+
196+
let write_plan = Arc::new(IcebergWriteExec::new(
197+
self.table.clone(),
198+
input,
199+
self.schema.clone(),
200+
));
201+
202+
// Merge the outputs of write_plan into one so we can commit all files together
203+
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
204+
205+
Ok(Arc::new(IcebergCommitExec::new(
206+
self.table.clone(),
207+
catalog,
208+
coalesce_partitions,
209+
self.schema.clone(),
210+
)))
211+
}
160212
}
161213

162214
#[cfg(test)]

crates/integrations/datafusion/tests/integration_datafusion_test.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
2626
use datafusion::execution::context::SessionContext;
2727
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2828
use expect_test::expect;
29+
use futures::StreamExt;
2930
use iceberg::io::FileIOBuilder;
3031
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
3132
use iceberg::test_utils::check_record_batches;
@@ -432,3 +433,58 @@ async fn test_metadata_table() -> Result<()> {
432433

433434
Ok(())
434435
}
436+
437+
#[tokio::test]
438+
async fn test_insert_into() -> Result<()> {
439+
let iceberg_catalog = get_iceberg_catalog();
440+
let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string());
441+
set_test_namespace(&iceberg_catalog, &namespace).await?;
442+
443+
let creation = get_table_creation(temp_path(), "my_table", None)?;
444+
iceberg_catalog.create_table(&namespace, creation).await?;
445+
446+
let client = Arc::new(iceberg_catalog);
447+
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
448+
449+
let ctx = SessionContext::new();
450+
ctx.register_catalog("catalog", catalog);
451+
452+
let provider = ctx.catalog("catalog").unwrap();
453+
let schema = provider.schema("test_provider_get_table_schema").unwrap();
454+
455+
let table = schema.table("my_table").await.unwrap().unwrap();
456+
let table_schema = table.schema();
457+
458+
let expected = [("foo1", &DataType::Int32), ("foo2", &DataType::Utf8)];
459+
460+
for (field, exp) in table_schema.fields().iter().zip(expected.iter()) {
461+
assert_eq!(field.name(), exp.0);
462+
assert_eq!(field.data_type(), exp.1);
463+
assert!(!field.is_nullable())
464+
}
465+
466+
let df = ctx
467+
.sql("insert into catalog.test_provider_get_table_schema.my_table values (1, 'alan'),(2, 'turing')")
468+
.await
469+
.unwrap();
470+
471+
let task_ctx = Arc::new(df.task_ctx());
472+
let plan = df.create_physical_plan().await.unwrap();
473+
let mut stream = plan.execute(0, task_ctx).unwrap();
474+
475+
while let Some(batch_result) = stream.next().await {
476+
match batch_result {
477+
Ok(batch) => {
478+
println!("Got RecordBatch with {} rows", batch.num_rows());
479+
for column in batch.columns() {
480+
println!("{:?}", column);
481+
}
482+
}
483+
Err(e) => {
484+
eprintln!("Error reading batch: {:?}", e);
485+
}
486+
}
487+
}
488+
489+
Ok(())
490+
}

0 commit comments

Comments
 (0)