Skip to content

Commit aa9520e

Browse files
authored
feat: Add partition_stats() for EmptyExec (apache#20203)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Part of apache#15873 . ## Rationale for this change Add partition stats for `EmptyExec`. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Added integration test.
1 parent a544b8f commit aa9520e

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,4 +1294,64 @@ mod test {
12941294

12951295
Ok(())
12961296
}
1297+
1298+
#[tokio::test]
1299+
async fn test_statistics_by_partition_of_empty_exec() -> Result<()> {
1300+
let schema = Arc::new(Schema::new(vec![
1301+
Field::new("id", DataType::Int32, false),
1302+
Field::new("name", DataType::Utf8, true),
1303+
]));
1304+
1305+
// Try to test with single partition
1306+
let empty_single = Arc::new(EmptyExec::new(Arc::clone(&schema)));
1307+
1308+
let stats = empty_single.partition_statistics(Some(0))?;
1309+
assert_eq!(stats.num_rows, Precision::Exact(0));
1310+
assert_eq!(stats.total_byte_size, Precision::Exact(0));
1311+
assert_eq!(stats.column_statistics.len(), 2);
1312+
1313+
for col_stat in &stats.column_statistics {
1314+
assert_eq!(col_stat.null_count, Precision::Exact(0));
1315+
assert_eq!(col_stat.distinct_count, Precision::Exact(0));
1316+
assert_eq!(col_stat.byte_size, Precision::Exact(0));
1317+
assert_eq!(col_stat.min_value, Precision::<ScalarValue>::Absent);
1318+
assert_eq!(col_stat.max_value, Precision::<ScalarValue>::Absent);
1319+
assert_eq!(col_stat.sum_value, Precision::<ScalarValue>::Absent);
1320+
assert_eq!(col_stat.byte_size, Precision::Exact(0));
1321+
}
1322+
1323+
let overall_stats = empty_single.partition_statistics(None)?;
1324+
assert_eq!(stats, overall_stats);
1325+
1326+
validate_statistics_with_data(empty_single, vec![ExpectedStatistics::Empty], 0)
1327+
.await?;
1328+
1329+
// Test with multiple partitions
1330+
let empty_multi: Arc<dyn ExecutionPlan> =
1331+
Arc::new(EmptyExec::new(Arc::clone(&schema)).with_partitions(3));
1332+
1333+
let statistics = (0..empty_multi.output_partitioning().partition_count())
1334+
.map(|idx| empty_multi.partition_statistics(Some(idx)))
1335+
.collect::<Result<Vec<_>>>()?;
1336+
1337+
assert_eq!(statistics.len(), 3);
1338+
1339+
for stat in &statistics {
1340+
assert_eq!(stat.num_rows, Precision::Exact(0));
1341+
assert_eq!(stat.total_byte_size, Precision::Exact(0));
1342+
}
1343+
1344+
validate_statistics_with_data(
1345+
empty_multi,
1346+
vec![
1347+
ExpectedStatistics::Empty,
1348+
ExpectedStatistics::Empty,
1349+
ExpectedStatistics::Empty,
1350+
],
1351+
0,
1352+
)
1353+
.await?;
1354+
1355+
Ok(())
1356+
}
12971357
}

datafusion/physical-plan/src/empty.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ use std::any::Any;
2121
use std::sync::Arc;
2222

2323
use crate::memory::MemoryStream;
24-
use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics, common};
24+
use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
2525
use crate::{
2626
DisplayFormatType, ExecutionPlan, Partitioning,
2727
execution_plan::{Boundedness, EmissionType},
2828
};
2929

3030
use arrow::datatypes::SchemaRef;
3131
use arrow::record_batch::RecordBatch;
32-
use datafusion_common::{Result, assert_or_internal_err};
32+
use datafusion_common::stats::Precision;
33+
use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err};
3334
use datafusion_execution::TaskContext;
3435
use datafusion_physical_expr::EquivalenceProperties;
3536

@@ -169,20 +170,31 @@ impl ExecutionPlan for EmptyExec {
169170
);
170171
}
171172

172-
let batch = self
173-
.data()
174-
.expect("Create empty RecordBatch should not fail");
175-
Ok(common::compute_record_batch_statistics(
176-
&[batch],
177-
&self.schema,
178-
None,
179-
))
173+
// Build explicit stats: exact zero rows and bytes, with explicit known column stats
174+
let mut stats = Statistics::default()
175+
.with_num_rows(Precision::Exact(0))
176+
.with_total_byte_size(Precision::Exact(0));
177+
178+
// Add explicit column stats for each field in schema
179+
for _ in self.schema.fields() {
180+
stats = stats.add_column_statistics(ColumnStatistics {
181+
null_count: Precision::Exact(0),
182+
distinct_count: Precision::Exact(0),
183+
min_value: Precision::<ScalarValue>::Absent,
184+
max_value: Precision::<ScalarValue>::Absent,
185+
sum_value: Precision::<ScalarValue>::Absent,
186+
byte_size: Precision::Exact(0),
187+
});
188+
}
189+
190+
Ok(stats)
180191
}
181192
}
182193

183194
#[cfg(test)]
184195
mod tests {
185196
use super::*;
197+
use crate::common;
186198
use crate::test;
187199
use crate::with_new_children_if_necessary;
188200

0 commit comments

Comments
 (0)