@@ -412,7 +412,7 @@ async fn utf8_grouping_min_max_limit_fallbacks() -> Result<()> {
412412
413413fn mock_data_with_distinct_count (
414414 distinct_count : Precision < usize > ,
415- ) -> Arc < DataSourceExec > {
415+ ) -> Arc < dyn ExecutionPlan > {
416416 let schema = Arc :: new ( Schema :: new ( vec ! [
417417 Field :: new( "a" , DataType :: Int32 , true ) ,
418418 Field :: new( "b" , DataType :: Int32 , true ) ,
@@ -442,9 +442,10 @@ fn mock_data_with_distinct_count(
442442 DataSourceExec :: from_data_source ( config)
443443}
444444
445- #[ tokio:: test]
446- async fn test_count_distinct_with_exact_statistics ( ) -> Result < ( ) > {
447- let source = mock_data_with_distinct_count ( Precision :: Exact ( 42 ) ) ;
445+ fn optimize_count_distinct (
446+ distinct_count : Precision < usize > ,
447+ ) -> Result < Arc < dyn ExecutionPlan > > {
448+ let source = mock_data_with_distinct_count ( distinct_count) ;
448449 let schema = source. schema ( ) ;
449450
450451 let count_distinct_expr =
@@ -473,7 +474,12 @@ async fn test_count_distinct_with_exact_statistics() -> Result<()> {
473474 ) ?;
474475
475476 let conf = ConfigOptions :: new ( ) ;
476- let optimized = AggregateStatistics :: new ( ) . optimize ( Arc :: new ( final_agg) , & conf) ?;
477+ AggregateStatistics :: new ( ) . optimize ( Arc :: new ( final_agg) , & conf)
478+ }
479+
480+ #[ tokio:: test]
481+ async fn test_count_distinct_with_exact_statistics ( ) -> Result < ( ) > {
482+ let optimized = optimize_count_distinct ( Precision :: Exact ( 42 ) ) ?;
477483
478484 assert ! ( optimized. as_any( ) . is:: <ProjectionExec >( ) ) ;
479485
@@ -487,53 +493,35 @@ async fn test_count_distinct_with_exact_statistics() -> Result<()> {
487493
488494#[ tokio:: test]
489495async fn test_count_distinct_with_absent_statistics ( ) -> Result < ( ) > {
490- let source = mock_data_with_distinct_count ( Precision :: Absent ) ;
491- let schema = source. schema ( ) ;
492-
493- let count_distinct_expr =
494- AggregateExprBuilder :: new ( count_udaf ( ) , vec ! [ expressions:: col( "a" , & schema) ?] )
495- . schema ( Arc :: clone ( & schema) )
496- . alias ( "COUNT(DISTINCT a)" )
497- . distinct ( )
498- . build ( ) ?;
499-
500- let partial_agg = AggregateExec :: try_new (
501- AggregateMode :: Partial ,
502- PhysicalGroupBy :: default ( ) ,
503- vec ! [ Arc :: new( count_distinct_expr. clone( ) ) ] ,
504- vec ! [ None ] ,
505- source,
506- Arc :: clone ( & schema) ,
507- ) ?;
508-
509- let final_agg = AggregateExec :: try_new (
510- AggregateMode :: Final ,
511- PhysicalGroupBy :: default ( ) ,
512- vec ! [ Arc :: new( count_distinct_expr) ] ,
513- vec ! [ None ] ,
514- Arc :: new ( partial_agg) ,
515- Arc :: clone ( & schema) ,
516- ) ?;
517-
518- let conf = ConfigOptions :: new ( ) ;
519- let optimized = AggregateStatistics :: new ( ) . optimize ( Arc :: new ( final_agg) , & conf) ?;
520-
496+ let optimized = optimize_count_distinct ( Precision :: Absent ) ?;
521497 assert ! ( optimized. as_any( ) . is:: <AggregateExec >( ) ) ;
522-
523498 Ok ( ( ) )
524499}
525500
526501#[ tokio:: test]
527502async fn test_count_distinct_with_inexact_statistics ( ) -> Result < ( ) > {
528- let source = mock_data_with_distinct_count ( Precision :: Inexact ( 42 ) ) ;
503+ let optimized = optimize_count_distinct ( Precision :: Inexact ( 42 ) ) ?;
504+ assert ! ( optimized. as_any( ) . is:: <AggregateExec >( ) ) ;
505+ Ok ( ( ) )
506+ }
507+
508+ #[ tokio:: test]
509+ async fn test_count_distinct_with_non_column_expr ( ) -> Result < ( ) > {
510+ let source = mock_data_with_distinct_count ( Precision :: Exact ( 42 ) ) ;
529511 let schema = source. schema ( ) ;
530512
531- let count_distinct_expr =
532- AggregateExprBuilder :: new ( count_udaf ( ) , vec ! [ expressions:: col( "a" , & schema) ?] )
533- . schema ( Arc :: clone ( & schema) )
534- . alias ( "COUNT(DISTINCT a)" )
535- . distinct ( )
536- . build ( ) ?;
513+ let expr = expressions:: binary (
514+ expressions:: col ( "a" , & schema) ?,
515+ Operator :: Plus ,
516+ expressions:: col ( "b" , & schema) ?,
517+ & schema,
518+ ) ?;
519+
520+ let count_distinct_expr = AggregateExprBuilder :: new ( count_udaf ( ) , vec ! [ expr] )
521+ . schema ( Arc :: clone ( & schema) )
522+ . alias ( "COUNT(DISTINCT a + b)" )
523+ . distinct ( )
524+ . build ( ) ?;
537525
538526 let partial_agg = AggregateExec :: try_new (
539527 AggregateMode :: Partial ,
0 commit comments