Skip to content

Commit 0f093f4

Browse files
authored
Implement cardinality_effect for window execs and UnionExec (#20321)
## Which issue does this PR close? - Closes #20291. ## Rationale for this change `WindowAggExec` and `BoundedWindowAggExec` did not implement `cardinality_effect`, which left this property as `Unknown`. Both operators preserve row cardinality: - They evaluate window expressions per input row and append result columns. - They do not filter out rows. - They do not duplicate rows. So their cardinality effect is `Equal`. This PR also updates `UnionExec`, which combines rows from multiple children. Its cardinality effect should be `GreaterEqual` instead of defaulting to `Unknown`. ## What changes are included in this PR? - Implement `cardinality_effect` for `WindowAggExec` as `CardinalityEffect::Equal`. - Implement `cardinality_effect` for `BoundedWindowAggExec` as `CardinalityEffect::Equal`. - Implement `cardinality_effect` for `UnionExec` as `CardinalityEffect::GreaterEqual`. ## Are these changes tested? Unit tested. ## Are there any user-facing changes? No. ## Additional note I used a coding agent for implementation/PR drafting and reviewed the changes myself. If this conflicts with project policy, please let me know.
1 parent b0349ff commit 0f093f4

File tree

3 files changed

+102
-3
lines changed

3 files changed

+102
-3
lines changed

datafusion/physical-plan/src/union.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use super::{
3434
};
3535
use crate::check_if_same_properties;
3636
use crate::execution_plan::{
37-
InvariantLevel, boundedness_from_children, check_default_invariants,
38-
emission_type_from_children,
37+
CardinalityEffect, InvariantLevel, boundedness_from_children,
38+
check_default_invariants, emission_type_from_children,
3939
};
4040
use crate::filter::FilterExec;
4141
use crate::filter_pushdown::{
@@ -360,6 +360,12 @@ impl ExecutionPlan for UnionExec {
360360
}
361361
}
362362

363+
fn cardinality_effect(&self) -> CardinalityEffect {
364+
// Union combines rows from multiple inputs, so output rows are not tied
365+
// to any single input and can only be constrained as greater-or-equal.
366+
CardinalityEffect::GreaterEqual
367+
}
368+
363369
fn supports_limit_pushdown(&self) -> bool {
364370
true
365371
}
@@ -1210,4 +1216,25 @@ mod tests {
12101216
)
12111217
);
12121218
}
1219+
1220+
#[test]
1221+
fn test_union_cardinality_effect() -> Result<()> {
1222+
let schema = create_test_schema()?;
1223+
let input1: Arc<dyn ExecutionPlan> =
1224+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1225+
let input2: Arc<dyn ExecutionPlan> =
1226+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1227+
1228+
let union = UnionExec::try_new(vec![input1, input2])?;
1229+
let union = union
1230+
.as_any()
1231+
.downcast_ref::<UnionExec>()
1232+
.expect("expected UnionExec for multiple inputs");
1233+
1234+
assert!(matches!(
1235+
union.cardinality_effect(),
1236+
CardinalityEffect::GreaterEqual
1237+
));
1238+
Ok(())
1239+
}
12131240
}

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ use datafusion_physical_expr_common::sort_expr::{
6666
OrderingRequirements, PhysicalSortExpr,
6767
};
6868

69+
use crate::execution_plan::CardinalityEffect;
6970
use ahash::RandomState;
7071
use futures::stream::Stream;
7172
use futures::{StreamExt, ready};
@@ -398,6 +399,10 @@ impl ExecutionPlan for BoundedWindowAggExec {
398399
let input_stat = self.input.partition_statistics(partition)?;
399400
self.statistics_helper(input_stat)
400401
}
402+
403+
fn cardinality_effect(&self) -> CardinalityEffect {
404+
CardinalityEffect::Equal
405+
}
401406
}
402407

403408
/// Trait that specifies how we search for (or calculate) partitions. It has two
@@ -1266,6 +1271,7 @@ mod tests {
12661271
use std::time::Duration;
12671272

12681273
use crate::common::collect;
1274+
use crate::execution_plan::CardinalityEffect;
12691275
use crate::expressions::PhysicalSortExpr;
12701276
use crate::projection::{ProjectionExec, ProjectionExpr};
12711277
use crate::streaming::{PartitionStream, StreamingTableExec};
@@ -1850,4 +1856,22 @@ mod tests {
18501856

18511857
Ok(())
18521858
}
1859+
1860+
#[test]
1861+
fn test_bounded_window_agg_cardinality_effect() -> Result<()> {
1862+
let schema = test_schema();
1863+
let input: Arc<dyn ExecutionPlan> =
1864+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
1865+
let plan = bounded_window_exec_pb_latent_range(input, 1, "hash", "sn")?;
1866+
let plan = plan
1867+
.as_any()
1868+
.downcast_ref::<BoundedWindowAggExec>()
1869+
.expect("expected BoundedWindowAggExec");
1870+
1871+
assert!(matches!(
1872+
plan.cardinality_effect(),
1873+
CardinalityEffect::Equal
1874+
));
1875+
Ok(())
1876+
}
18531877
}

datafusion/physical-plan/src/windows/window_agg_exec.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424

2525
use super::utils::create_schema;
26-
use crate::execution_plan::EmissionType;
26+
use crate::execution_plan::{CardinalityEffect, EmissionType};
2727
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2828
use crate::windows::{
2929
calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs,
@@ -315,6 +315,10 @@ impl ExecutionPlan for WindowAggExec {
315315
total_byte_size: Precision::Absent,
316316
})
317317
}
318+
319+
fn cardinality_effect(&self) -> CardinalityEffect {
320+
CardinalityEffect::Equal
321+
}
318322
}
319323

320324
/// Compute the window aggregate columns
@@ -464,3 +468,47 @@ impl RecordBatchStream for WindowAggStream {
464468
Arc::clone(&self.schema)
465469
}
466470
}
471+
472+
#[cfg(test)]
473+
mod tests {
474+
use super::*;
475+
use crate::test::TestMemoryExec;
476+
use crate::windows::create_window_expr;
477+
use arrow::datatypes::{DataType, Field, Schema};
478+
use datafusion_common::ScalarValue;
479+
use datafusion_expr::{
480+
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
481+
};
482+
use datafusion_functions_aggregate::count::count_udaf;
483+
484+
#[test]
485+
fn test_window_agg_cardinality_effect() -> Result<()> {
486+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
487+
let input: Arc<dyn ExecutionPlan> =
488+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
489+
let args = vec![crate::expressions::col("a", &schema)?];
490+
let window_expr = create_window_expr(
491+
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
492+
"count(a)".to_string(),
493+
&args,
494+
&[],
495+
&[],
496+
Arc::new(WindowFrame::new_bounds(
497+
WindowFrameUnits::Rows,
498+
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
499+
WindowFrameBound::CurrentRow,
500+
)),
501+
Arc::clone(&schema),
502+
false,
503+
false,
504+
None,
505+
)?;
506+
507+
let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
508+
assert!(matches!(
509+
window.cardinality_effect(),
510+
CardinalityEffect::Equal
511+
));
512+
Ok(())
513+
}
514+
}

0 commit comments

Comments
 (0)