Skip to content

Commit 4215bd3

Browse files
authored
fix(datafusion): return single row with count 0 for empty inserts (#2712)
Which issue does this PR close? ------------------------------- * Closes #2713. What changes are included in this PR? ------------------------------------- Fix empty inserts in the DataFusion integration. When an `INSERT` produces no data files (e.g. `INSERT INTO ... SELECT ... WHERE false`), `IcebergCommitExec` previously returned an empty `RecordBatch`. DataFusion expects a single-row count result for DML statements, so this PR changes the empty-data path to return a batch with count `0`. for empty inserts, the code returns early when there are no data files, before it ever starts a transaction, so it will not create snapshot. Are these changes tested? ------------------------- Yes. Added tests verifying that: * Empty inserts return a single-row `UInt64` count batch with value `0`. * Existing insert tests continue to pass. Local verification: * `cargo test -p iceberg-datafusion` * `cargo clippy -p iceberg-datafusion --all-targets -- -D warnings` * `cargo fmt --all -- --check` * `git diff --check`
1 parent 6c3dec8 commit 4215bd3

2 files changed

Lines changed: 109 additions & 3 deletions

File tree

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ impl ExecutionPlan for IcebergCommitExec {
182182

183183
let table = self.table.clone();
184184
let input_plan = self.input.clone();
185-
let count_schema = Arc::clone(&self.count_schema);
186185

187186
// todo revisit this
188187
let spec_id = self.table.metadata().default_partition_spec_id();
@@ -240,9 +239,9 @@ impl ExecutionPlan for IcebergCommitExec {
240239
data_files.extend(batch_files);
241240
}
242241

243-
// If no data files were collected, return an empty result
242+
// If no data files were collected, return a single row with count = 0
244243
if data_files.is_empty() {
245-
return Ok(RecordBatch::new_empty(count_schema));
244+
return Self::make_count_batch(0);
246245
}
247246

248247
// Create a transaction and commit the data files
@@ -530,6 +529,83 @@ mod tests {
530529
Ok(())
531530
}
532531

532+
#[tokio::test]
533+
async fn test_iceberg_commit_exec_empty_insert() -> Result<(), Box<dyn std::error::Error>> {
534+
let catalog = Arc::new(
535+
MemoryCatalogBuilder::default()
536+
.load(
537+
"memory",
538+
HashMap::from([(
539+
MEMORY_CATALOG_WAREHOUSE.to_string(),
540+
"memory://root".to_string(),
541+
)]),
542+
)
543+
.await
544+
.unwrap(),
545+
);
546+
547+
let namespace = NamespaceIdent::new("test_empty_insert".to_string());
548+
catalog.create_namespace(&namespace, HashMap::new()).await?;
549+
550+
let schema = Schema::builder()
551+
.with_schema_id(1)
552+
.with_fields(vec![
553+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
554+
])
555+
.build()?;
556+
557+
let table_creation = TableCreation::builder()
558+
.name("empty_insert_table".to_string())
559+
.schema(schema)
560+
.location("memory://root/empty_insert_table".to_string())
561+
.properties(HashMap::new())
562+
.build();
563+
564+
let table = catalog.create_table(&namespace, table_creation).await?;
565+
let snapshot_count_before = table.metadata().snapshots().len();
566+
567+
// Mock write plan produces no data files
568+
let input_exec = Arc::new(MockWriteExec::new(vec![]));
569+
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
570+
DATA_FILES_COL_NAME,
571+
DataType::Utf8,
572+
false,
573+
)]));
574+
let commit_exec =
575+
IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema);
576+
577+
let task_ctx = Arc::new(TaskContext::default());
578+
let stream = commit_exec.execute(0, task_ctx)?;
579+
let batches = collect(stream).await?;
580+
581+
// Must return exactly one batch with one row and one column
582+
assert_eq!(batches.len(), 1);
583+
let batch = &batches[0];
584+
assert_eq!(batch.num_rows(), 1);
585+
assert_eq!(batch.num_columns(), 1);
586+
587+
// The count column must be UInt64 with value 0
588+
let count_array = batch.column(0);
589+
assert_eq!(count_array.data_type(), &DataType::UInt64);
590+
let count = count_array.as_any().downcast_ref::<UInt64Array>().unwrap();
591+
assert_eq!(count.value(0), 0);
592+
593+
// No new snapshot should be created for an empty insert
594+
let updated_table = catalog
595+
.load_table(
596+
&TableIdent::from_strs(["test_empty_insert", "empty_insert_table"]).unwrap(),
597+
)
598+
.await?;
599+
let snapshot_count_after = updated_table.metadata().snapshots().len();
600+
assert_eq!(
601+
snapshot_count_after, snapshot_count_before,
602+
"Empty insert must not create a new snapshot"
603+
);
604+
assert!(updated_table.metadata().current_snapshot().is_none());
605+
606+
Ok(())
607+
}
608+
533609
#[tokio::test]
534610
async fn test_datafusion_execution_partitioned_source() -> Result<(), Box<dyn std::error::Error>>
535611
{

crates/sqllogictest/testdata/slts/df_test/insert_into.slt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,21 @@ query IT rowsort
2424
SELECT * FROM default.default.test_unpartitioned_table
2525
----
2626

27+
# Insert zero rows and verify it returns count 0 without creating a snapshot
28+
query I
29+
INSERT INTO default.default.test_unpartitioned_table SELECT * FROM (VALUES (1, 'Nobody')) AS t(id, name) WHERE false
30+
----
31+
0
32+
33+
query I
34+
SELECT COUNT(*) FROM default.default.test_unpartitioned_table
35+
----
36+
0
37+
38+
query ??????
39+
SELECT * FROM default.default.test_unpartitioned_table$snapshots
40+
----
41+
2742
# Insert a single row and verify the count
2843
query I
2944
INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice')
@@ -70,6 +85,21 @@ query ITT rowsort
7085
SELECT * FROM default.default.test_partitioned_table
7186
----
7287

88+
# Insert zero rows into a partitioned table and verify it returns count 0 without creating a snapshot
89+
query I
90+
INSERT INTO default.default.test_partitioned_table SELECT * FROM (VALUES (1, 'electronics', 'laptop')) AS t(id, category, value) WHERE false
91+
----
92+
0
93+
94+
query I
95+
SELECT COUNT(*) FROM default.default.test_partitioned_table
96+
----
97+
0
98+
99+
query ??????
100+
SELECT * FROM default.default.test_partitioned_table$snapshots
101+
----
102+
73103
# Insert single row into partitioned table
74104
query I
75105
INSERT INTO default.default.test_partitioned_table VALUES (1, 'electronics', 'laptop')

0 commit comments

Comments
 (0)