Skip to content

Commit d25e1ad

Browse files
committed
Remove compute_statistics free function, let callers manage StatisticsArgs
Callers now create StatisticsArgs directly and call plan.statistics_with_args(). The cache is created in StatisticsArgs::new() and shared through compute_child_statistics calls.
1 parent 7eb0484 commit d25e1ad

29 files changed

Lines changed: 357 additions & 257 deletions

File tree

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ mod tests {
4545
use datafusion_datasource::file_format::FileFormat;
4646
use datafusion_datasource::write::BatchSerializer;
4747
use datafusion_expr::{col, lit};
48-
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};
48+
use datafusion_physical_plan::statistics::StatisticsArgs;
49+
use datafusion_physical_plan::{ExecutionPlan, collect};
4950

5051
use arrow::array::{
5152
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
@@ -216,11 +217,13 @@ mod tests {
216217

217218
// test metadata
218219
assert_eq!(
219-
compute_statistics(exec.as_ref(), None)?.num_rows,
220+
exec.statistics_with_args(&StatisticsArgs::new(None))?
221+
.num_rows,
220222
Precision::Absent
221223
);
222224
assert_eq!(
223-
compute_statistics(exec.as_ref(), None)?.total_byte_size,
225+
exec.statistics_with_args(&StatisticsArgs::new(None))?
226+
.total_byte_size,
224227
Precision::Absent
225228
);
226229

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ mod tests {
3636
BatchDeserializer, DecoderDeserializer, DeserializerOutput,
3737
};
3838
use datafusion_datasource::file_format::FileFormat;
39-
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};
39+
use datafusion_physical_plan::statistics::StatisticsArgs;
40+
use datafusion_physical_plan::{ExecutionPlan, collect};
4041

4142
use arrow::compute::concat_batches;
4243
use arrow::datatypes::{DataType, Field};
@@ -118,11 +119,13 @@ mod tests {
118119

119120
// test metadata
120121
assert_eq!(
121-
compute_statistics(exec.as_ref(), None)?.num_rows,
122+
exec.statistics_with_args(&StatisticsArgs::new(None))?
123+
.num_rows,
122124
Precision::Absent
123125
);
124126
assert_eq!(
125-
compute_statistics(exec.as_ref(), None)?.total_byte_size,
127+
exec.statistics_with_args(&StatisticsArgs::new(None))?
128+
.total_byte_size,
126129
Precision::Absent
127130
);
128131

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,9 @@ mod tests {
141141
use datafusion_execution::object_store::ObjectStoreUrl;
142142
use datafusion_execution::runtime_env::RuntimeEnv;
143143
use datafusion_expr::dml::InsertOp;
144+
use datafusion_physical_plan::statistics::StatisticsArgs;
144145
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
145-
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};
146+
use datafusion_physical_plan::{ExecutionPlan, collect};
146147

147148
use crate::test_util::bounded_stream;
148149
use arrow::array::{
@@ -715,12 +716,14 @@ mod tests {
715716

716717
// test metadata
717718
assert_eq!(
718-
compute_statistics(exec.as_ref(), None)?.num_rows,
719+
exec.statistics_with_args(&StatisticsArgs::new(None))?
720+
.num_rows,
719721
Precision::Exact(8)
720722
);
721723
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
722724
assert_eq!(
723-
compute_statistics(exec.as_ref(), None)?.total_byte_size,
725+
exec.statistics_with_args(&StatisticsArgs::new(None))?
726+
.total_byte_size,
724727
Precision::Absent,
725728
);
726729

@@ -764,11 +767,13 @@ mod tests {
764767

765768
// note: even if the limit is set, the executor rounds up to the batch size
766769
assert_eq!(
767-
compute_statistics(exec.as_ref(), None)?.num_rows,
770+
exec.statistics_with_args(&StatisticsArgs::new(None))?
771+
.num_rows,
768772
Precision::Exact(8)
769773
);
770774
assert_eq!(
771-
compute_statistics(exec.as_ref(), None)?.total_byte_size,
775+
exec.statistics_with_args(&StatisticsArgs::new(None))?
776+
.total_byte_size,
772777
Precision::Absent,
773778
);
774779
let batches = collect(exec, task_ctx).await?;

datafusion/core/src/datasource/listing/table.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,8 @@ mod tests {
144144
use datafusion_physical_expr::expressions::binary;
145145
use datafusion_physical_expr_common::sort_expr::LexOrdering;
146146
use datafusion_physical_plan::empty::EmptyExec;
147-
use datafusion_physical_plan::{
148-
ExecutionPlanProperties, collect, compute_statistics,
149-
};
147+
use datafusion_physical_plan::statistics::StatisticsArgs;
148+
use datafusion_physical_plan::{ExecutionPlanProperties, collect};
150149
use std::collections::HashMap;
151150
use std::io::Write;
152151
use std::sync::Arc;
@@ -249,11 +248,13 @@ mod tests {
249248

250249
// test metadata
251250
assert_eq!(
252-
compute_statistics(exec.as_ref(), None)?.num_rows,
251+
exec.statistics_with_args(&StatisticsArgs::new(None))?
252+
.num_rows,
253253
Precision::Exact(8)
254254
);
255255
assert_eq!(
256-
compute_statistics(exec.as_ref(), None)?.total_byte_size,
256+
exec.statistics_with_args(&StatisticsArgs::new(None))?
257+
.total_byte_size,
257258
Precision::Absent,
258259
);
259260

@@ -1357,13 +1358,17 @@ mod tests {
13571358

13581359
let exec_default = table_default.scan(&state, None, &[], None).await?;
13591360
assert_eq!(
1360-
compute_statistics(exec_default.as_ref(), None)?.num_rows,
1361+
exec_default
1362+
.statistics_with_args(&StatisticsArgs::new(None))?
1363+
.num_rows,
13611364
Precision::Absent
13621365
);
13631366

13641367
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
13651368
assert_eq!(
1366-
compute_statistics(exec_default.as_ref(), None)?.total_byte_size,
1369+
exec_default
1370+
.statistics_with_args(&StatisticsArgs::new(None))?
1371+
.total_byte_size,
13671372
Precision::Absent
13681373
);
13691374

@@ -1378,11 +1383,15 @@ mod tests {
13781383

13791384
let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
13801385
assert_eq!(
1381-
compute_statistics(exec_disabled.as_ref(), None)?.num_rows,
1386+
exec_disabled
1387+
.statistics_with_args(&StatisticsArgs::new(None))?
1388+
.num_rows,
13821389
Precision::Absent
13831390
);
13841391
assert_eq!(
1385-
compute_statistics(exec_disabled.as_ref(), None)?.total_byte_size,
1392+
exec_disabled
1393+
.statistics_with_args(&StatisticsArgs::new(None))?
1394+
.total_byte_size,
13861395
Precision::Absent
13871396
);
13881397

@@ -1397,12 +1406,16 @@ mod tests {
13971406

13981407
let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
13991408
assert_eq!(
1400-
compute_statistics(exec_enabled.as_ref(), None)?.num_rows,
1409+
exec_enabled
1410+
.statistics_with_args(&StatisticsArgs::new(None))?
1411+
.num_rows,
14011412
Precision::Exact(8)
14021413
);
14031414
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
14041415
assert_eq!(
1405-
compute_statistics(exec_enabled.as_ref(), None)?.total_byte_size,
1416+
exec_enabled
1417+
.statistics_with_args(&StatisticsArgs::new(None))?
1418+
.total_byte_size,
14061419
Precision::Absent,
14071420
);
14081421

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use datafusion_catalog::Session;
3636
use datafusion_common::{project_schema, stats::Precision};
3737
use datafusion_physical_expr::EquivalenceProperties;
3838
use datafusion_physical_plan::StatisticsArgs;
39-
use datafusion_physical_plan::compute_statistics;
4039
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4140

4241
use async_trait::async_trait;
@@ -232,7 +231,10 @@ async fn sql_basic() -> Result<()> {
232231
let physical_plan = df.create_physical_plan().await.unwrap();
233232

234233
// the statistics should be those of the source
235-
assert_eq!(stats, *compute_statistics(physical_plan.as_ref(), None)?);
234+
assert_eq!(
235+
stats,
236+
*physical_plan.statistics_with_args(&StatisticsArgs::new(None))?
237+
);
236238

237239
Ok(())
238240
}
@@ -248,7 +250,7 @@ async fn sql_filter() -> Result<()> {
248250
.unwrap();
249251

250252
let physical_plan = df.create_physical_plan().await.unwrap();
251-
let stats = compute_statistics(physical_plan.as_ref(), None)?;
253+
let stats = physical_plan.statistics_with_args(&StatisticsArgs::new(None))?;
252254
assert_eq!(stats.num_rows, Precision::Inexact(7));
253255

254256
Ok(())
@@ -263,7 +265,7 @@ async fn sql_limit() -> Result<()> {
263265
let physical_plan = df.create_physical_plan().await.unwrap();
264266
// when the limit is smaller than the original number of lines we mark the statistics as inexact
265267
// and cap NDV at the new row count
266-
let limit_stats = compute_statistics(physical_plan.as_ref(), None)?;
268+
let limit_stats = physical_plan.statistics_with_args(&StatisticsArgs::new(None))?;
267269
assert_eq!(limit_stats.num_rows, Precision::Exact(5));
268270
// c1: NDV=2 stays at 2 (already below limit of 5)
269271
assert_eq!(
@@ -282,7 +284,10 @@ async fn sql_limit() -> Result<()> {
282284
.unwrap();
283285
let physical_plan = df.create_physical_plan().await.unwrap();
284286
// when the limit is larger than the original number of lines, statistics remain unchanged
285-
assert_eq!(stats, *compute_statistics(physical_plan.as_ref(), None)?);
287+
assert_eq!(
288+
stats,
289+
*physical_plan.statistics_with_args(&StatisticsArgs::new(None))?
290+
);
286291

287292
Ok(())
288293
}
@@ -299,7 +304,7 @@ async fn sql_window() -> Result<()> {
299304

300305
let physical_plan = df.create_physical_plan().await.unwrap();
301306

302-
let result = compute_statistics(physical_plan.as_ref(), None)?;
307+
let result = physical_plan.statistics_with_args(&StatisticsArgs::new(None))?;
303308

304309
assert_eq!(stats.num_rows, result.num_rows);
305310
let col_stats = &result.column_statistics;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use datafusion_common::config::ConfigOptions;
4343
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4444
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4545
use datafusion_physical_plan::ExecutionPlan;
46-
use datafusion_physical_plan::compute_statistics;
4746
use datafusion_physical_plan::filter::FilterExec;
47+
use datafusion_physical_plan::statistics::StatisticsArgs;
4848
use tempfile::tempdir;
4949

5050
#[tokio::test]
@@ -64,7 +64,9 @@ async fn check_stats_precision_with_filter_pushdown() {
6464
// Scan without filter, stats are exact
6565
let exec = table.scan(&state, None, &[], None).await.unwrap();
6666
assert_eq!(
67-
compute_statistics(exec.as_ref(), None).unwrap().num_rows,
67+
exec.statistics_with_args(&StatisticsArgs::new(None))
68+
.unwrap()
69+
.num_rows,
6870
Precision::Exact(8),
6971
"Stats without filter should be exact"
7072
);
@@ -96,7 +98,8 @@ async fn check_stats_precision_with_filter_pushdown() {
9698
);
9799
// Scan with filter pushdown, stats are inexact
98100
assert_eq!(
99-
compute_statistics(optimized_exec.as_ref(), None)
101+
optimized_exec
102+
.statistics_with_args(&StatisticsArgs::new(None))
100103
.unwrap()
101104
.num_rows,
102105
Precision::Inexact(8),
@@ -126,11 +129,15 @@ async fn load_table_stats_with_session_level_cache() {
126129
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
127130

128131
assert_eq!(
129-
compute_statistics(exec1.as_ref(), None).unwrap().num_rows,
132+
exec1
133+
.statistics_with_args(&StatisticsArgs::new(None))
134+
.unwrap()
135+
.num_rows,
130136
Precision::Exact(8)
131137
);
132138
assert_eq!(
133-
compute_statistics(exec1.as_ref(), None)
139+
exec1
140+
.statistics_with_args(&StatisticsArgs::new(None))
134141
.unwrap()
135142
.total_byte_size,
136143
// Byte size is absent because we cannot estimate the output size
@@ -144,11 +151,15 @@ async fn load_table_stats_with_session_level_cache() {
144151
assert_eq!(get_static_cache_size(&state2), 0);
145152
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
146153
assert_eq!(
147-
compute_statistics(exec2.as_ref(), None).unwrap().num_rows,
154+
exec2
155+
.statistics_with_args(&StatisticsArgs::new(None))
156+
.unwrap()
157+
.num_rows,
148158
Precision::Exact(8)
149159
);
150160
assert_eq!(
151-
compute_statistics(exec2.as_ref(), None)
161+
exec2
162+
.statistics_with_args(&StatisticsArgs::new(None))
152163
.unwrap()
153164
.total_byte_size,
154165
// Absent because the data contains variable length columns
@@ -161,11 +172,15 @@ async fn load_table_stats_with_session_level_cache() {
161172
assert_eq!(get_static_cache_size(&state1), 1);
162173
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
163174
assert_eq!(
164-
compute_statistics(exec3.as_ref(), None).unwrap().num_rows,
175+
exec3
176+
.statistics_with_args(&StatisticsArgs::new(None))
177+
.unwrap()
178+
.num_rows,
165179
Precision::Exact(8)
166180
);
167181
assert_eq!(
168-
compute_statistics(exec3.as_ref(), None)
182+
exec3
183+
.statistics_with_args(&StatisticsArgs::new(None))
169184
.unwrap()
170185
.total_byte_size,
171186
// Absent because the data contains variable length columns

0 commit comments

Comments
 (0)