Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@ use arrow::array::Int32Array;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::MemTable;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::Result;
use datafusion_common::assert_batches_eq;
use datafusion_common::cast::as_int64_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result, Statistics};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_execution::TaskContext;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::Operator;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{self, cast};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
Expand Down Expand Up @@ -402,3 +409,142 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> {

Ok(())
}

fn mock_data_with_distinct_count(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to combine these two functions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not exactly understand but changed it to one table driven test

distinct_count: Precision<usize>,
) -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]));

let statistics = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
distinct_count,
null_count: Precision::Exact(10),
..Default::default()
},
ColumnStatistics::default(),
],
};

let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(ParquetSource::new(Arc::clone(&schema))),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_statistics(statistics)
.build();

DataSourceExec::from_data_source(config)
}

fn optimize_count_distinct(
distinct_count: Precision<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = mock_data_with_distinct_count(distinct_count);
let schema = source.schema();

let count_distinct_expr =
AggregateExprBuilder::new(count_udaf(), vec![expressions::col("a", &schema)?])
.schema(Arc::clone(&schema))
.alias("COUNT(DISTINCT a)")
.distinct()
.build()?;

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![Arc::new(count_distinct_expr.clone())],
vec![None],
source,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![Arc::new(count_distinct_expr)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)
}

#[tokio::test]
async fn test_count_distinct_with_exact_statistics() -> Result<()> {
let optimized = optimize_count_distinct(Precision::Exact(42))?;

assert!(optimized.as_any().is::<ProjectionExec>());

let task_ctx = Arc::new(TaskContext::default());
let result = common::collect(optimized.execute(0, task_ctx)?).await?;
assert_eq!(result.len(), 1);
assert_eq!(as_int64_array(result[0].column(0)).unwrap().values(), &[42]);

Ok(())
}

#[tokio::test]
async fn test_count_distinct_with_absent_statistics() -> Result<()> {
let optimized = optimize_count_distinct(Precision::Absent)?;
assert!(optimized.as_any().is::<AggregateExec>());
Ok(())
}

#[tokio::test]
async fn test_count_distinct_with_inexact_statistics() -> Result<()> {
let optimized = optimize_count_distinct(Precision::Inexact(42))?;
assert!(optimized.as_any().is::<AggregateExec>());
Ok(())
}

#[tokio::test]
async fn test_count_distinct_with_non_column_expr() -> Result<()> {
let source = mock_data_with_distinct_count(Precision::Exact(42));
let schema = source.schema();

let expr = expressions::binary(
expressions::col("a", &schema)?,
Operator::Plus,
expressions::col("b", &schema)?,
&schema,
)?;

let count_distinct_expr = AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::clone(&schema))
.alias("COUNT(DISTINCT a + b)")
.distinct()
.build()?;

let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![Arc::new(count_distinct_expr.clone())],
vec![None],
source,
Arc::clone(&schema),
)?;

let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![Arc::new(count_distinct_expr)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

assert!(optimized.as_any().is::<AggregateExec>());

Ok(())
}
46 changes: 26 additions & 20 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,31 +365,37 @@ impl AggregateUDFImpl for Count {
}

fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> {
let [expr] = statistics_args.exprs else {
return None;
};
let col_stats = &statistics_args.statistics.column_statistics;

if statistics_args.is_distinct {
// Only column references can be resolved from statistics;
// expressions like casts or literals are not supported.
let col_expr = expr.as_any().downcast_ref::<expressions::Column>()?;
if let Precision::Exact(dc) = col_stats[col_expr.index()].distinct_count {
return Some(ScalarValue::Int64(Some(dc as i64)));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes NULL is not counted in distinct_count column. AFAIS from the repository it is not counted so this should be ok but wanted to put a comment here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distinct count is usize, we should deal with the case where it could overflow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since other paths were casting as well I did not do it actually but let me do it for all paths

}
return None;
}
if let Precision::Exact(num_rows) = statistics_args.statistics.num_rows
&& statistics_args.exprs.len() == 1
{
// TODO optimize with exprs other than Column
if let Some(col_expr) = statistics_args.exprs[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
let current_val = &statistics_args.statistics.column_statistics
[col_expr.index()]
.null_count;
if let &Precision::Exact(val) = current_val {
return Some(ScalarValue::Int64(Some((num_rows - val) as i64)));
}
} else if let Some(lit_expr) = statistics_args.exprs[0]
.as_any()
.downcast_ref::<expressions::Literal>()
&& lit_expr.value() == &COUNT_STAR_EXPANSION
{
return Some(ScalarValue::Int64(Some(num_rows as i64)));

let Precision::Exact(num_rows) = statistics_args.statistics.num_rows else {
return None;
};

// TODO optimize with exprs other than Column
if let Some(col_expr) = expr.as_any().downcast_ref::<expressions::Column>() {
if let Precision::Exact(val) = col_stats[col_expr.index()].null_count {
return Some(ScalarValue::Int64(Some((num_rows - val) as i64)));
}
} else if let Some(lit_expr) =
expr.as_any().downcast_ref::<expressions::Literal>()
&& lit_expr.value() == &COUNT_STAR_EXPANSION
{
return Some(ScalarValue::Int64(Some(num_rows as i64)));
}

None
}

Expand Down