Skip to content

Commit ce3e726

Browse files
committed
address reviews
1 parent 4471eda commit ce3e726

1 file changed

Lines changed: 61 additions & 42 deletions

File tree

  • datafusion/physical-plan/src/aggregates

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,17 @@ impl AggregateExec {
10491049
/// output_rows = min(output_rows, limit) // if TopK active
10501050
/// ```
10511051
///
1052+
/// **Example 1 — single group key:**
1053+
/// `GROUP BY city` where input_rows = 10,000, NDV(city) = 200
1054+
/// → output_rows = min(10_000, 200) = 200
1055+
///
1056+
/// **Example 2 — two group keys with TopK:**
1057+
/// `GROUP BY city, category` where input_rows = 10,000, NDV(city) = 200,
1058+
/// NDV(category) = 5, limit = 100
1059+
/// → ndv = 200 × 5 = 1,000
1060+
/// → output_rows = min(10_000, 1_000) = 1,000
1061+
/// → output_rows = min(1_000, 100) = 100
1062+
///
10521063
/// When `input_rows` is absent but NDV is available, falls back to:
10531064
///
10541065
/// ```text
@@ -1103,47 +1114,7 @@ impl AggregateExec {
11031114
})
11041115
}
11051116
_ => {
1106-
// When the input row count is 1, we can adopt that statistic keeping its reliability.
1107-
// When it is larger than 1, we degrade the precision since it may decrease after aggregation.
1108-
let num_rows = if let Some(value) = child_statistics.num_rows.get_value()
1109-
{
1110-
if *value > 1 {
1111-
let mut num_rows = child_statistics.num_rows.to_inexact();
1112-
1113-
if !self.group_by.expr.is_empty()
1114-
&& let Some(ndv) = self.compute_group_ndv(child_statistics)
1115-
{
1116-
num_rows = num_rows.map(|n| n.min(ndv));
1117-
}
1118-
1119-
// If TopK mode is active, cap output rows by the limit
1120-
if let Some(limit_opts) = &self.limit_options {
1121-
num_rows = num_rows.map(|n| n.min(limit_opts.limit));
1122-
}
1123-
1124-
num_rows
1125-
} else if *value == 0 {
1126-
child_statistics.num_rows
1127-
} else {
1128-
// num_rows = 1 case
1129-
let grouping_set_num = self.group_by.groups.len();
1130-
child_statistics.num_rows.map(|x| x * grouping_set_num)
1131-
}
1132-
} else {
1133-
let ndv = if !self.group_by.expr.is_empty() {
1134-
self.compute_group_ndv(child_statistics)
1135-
} else {
1136-
None
1137-
};
1138-
match (ndv, &self.limit_options) {
1139-
(Some(n), Some(limit_opts)) => {
1140-
Precision::Inexact(n.min(limit_opts.limit))
1141-
}
1142-
(Some(n), None) => Precision::Inexact(n),
1143-
(None, Some(limit_opts)) => Precision::Inexact(limit_opts.limit),
1144-
(None, None) => Precision::Absent,
1145-
}
1146-
};
1117+
let num_rows = self.estimate_num_rows(child_statistics);
11471118

11481119
let total_byte_size = num_rows
11491120
.get_value()
@@ -1163,11 +1134,59 @@ impl AggregateExec {
11631134
}
11641135
}
11651136

1137+
/// Estimates the output row count for grouped aggregations, combining NDV,
1138+
/// input row count, and TopK limit into a single [`Precision<usize>`].
1139+
fn estimate_num_rows(&self, child_statistics: &Statistics) -> Precision<usize> {
1140+
let ndv = if !self.group_by.expr.is_empty() {
1141+
self.compute_group_ndv(child_statistics)
1142+
} else {
1143+
None
1144+
};
1145+
let limit = self.limit_options.as_ref().map(|lo| lo.limit);
1146+
1147+
if let Some(&value) = child_statistics.num_rows.get_value() {
1148+
if value > 1 {
1149+
let mut num_rows = child_statistics.num_rows.to_inexact();
1150+
if let Some(ndv) = ndv {
1151+
num_rows = num_rows.map(|n| n.min(ndv));
1152+
}
1153+
if let Some(limit) = limit {
1154+
num_rows = num_rows.map(|n| n.min(limit));
1155+
}
1156+
num_rows
1157+
} else if value == 0 {
1158+
child_statistics.num_rows
1159+
} else {
1160+
let grouping_set_num = self.group_by.groups.len();
1161+
child_statistics.num_rows.map(|x| x * grouping_set_num)
1162+
}
1163+
} else {
1164+
match (ndv, limit) {
1165+
(Some(n), Some(l)) => Precision::Inexact(n.min(l)),
1166+
(Some(n), None) => Precision::Inexact(n),
1167+
(None, Some(l)) => Precision::Inexact(l),
1168+
(None, None) => Precision::Absent,
1169+
}
1170+
}
1171+
}
1172+
11661173
/// Computes the estimated number of distinct groups across all grouping sets.
11671174
/// For each grouping set, computes `product(NDV_i + null_adj_i)` for active columns,
11681175
/// then sums across all sets. Returns `None` if any active column is not a direct
1169-
/// column reference or lacks `distinct_count` stats.
1176+
/// column reference or lacks `distinct_count` stats. Non-column expressions
1177+
/// (e.g. `abs(a)`) are not yet supported because expression-level statistics
1178+
/// propagation is still in progress (see <https://github.com/apache/datafusion/pull/21122>).
11701179
/// When `null_count` is absent or unknown, null_adjustment defaults to 0.
1180+
///
1181+
/// **Single key:** `GROUP BY a` where NDV(a) = 100, null_count(a) = 5
1182+
/// → product = max(100 + 1, 1) = 101, total = 101
1183+
///
1184+
/// **Two keys:** `GROUP BY a, b` where NDV(a) = 100, NDV(b) = 50, no nulls
1185+
/// → product = 100 × 50 = 5,000, total = 5,000
1186+
///
1187+
/// **Grouping sets:** `GROUPING SETS ((a), (b), (a, b))` with NDV(a) = 100, NDV(b) = 50
1188+
/// → set(a) = 100, set(b) = 50, set(a, b) = 100 × 50 = 5,000
1189+
/// → total = 100 + 50 + 5,000 = 5,150
11711190
fn compute_group_ndv(&self, child_statistics: &Statistics) -> Option<usize> {
11721191
let mut total: usize = 0;
11731192
for group_mask in &self.group_by.groups {

0 commit comments

Comments
 (0)