Skip to content

Commit 7d9f6ea

Browse files
hhhizzzQiwei HuangRachelint
authored
Avoid repeated EmitTo::First in partial hash aggregate output (apache#23250)
## Which issue does this PR close? - Closes apache#23249. ## Rationale for this change The migrated partial hash aggregate output path still used `EmitTo::First(batch_size)` when draining grouped aggregate state in batches. For terminal output this is unnecessary and can be expensive: `EmitTo::First` is not just slicing the first N rows, it also shifts remaining group indexes and maintains `GroupValues` lookup state. For high-cardinality partial aggregate output, this can cause repeated work during output draining. The final hash aggregate path already avoids this by materializing output once with `EmitTo::All` and then slicing the resulting `RecordBatch`. This PR applies the same approach to partial hash aggregate output. ## What changes are included in this PR? - Remove the helper that selected `EmitTo::First(batch_size)` for hash aggregate terminal output. - Change migrated partial hash aggregate output to: - materialize grouped keys and aggregate state once with `EmitTo::All` - slice the materialized `RecordBatch` into `batch_size` chunks across output polls - Rename the shared materialized-output state/type to mode-neutral names because it is now used by both final and partial output paths. - Add a regression test with a custom `GroupsAccumulator` that fails if partial terminal output calls `EmitTo::First(_)`. - Strengthen the regression test to verify both batch slicing and emitted key/state values. ## Are these changes tested? Yes. Local targeted tests: ```bash cargo test -p datafusion-physical-plan partial_grouped_aggregate_materializes_before_slicing -- --nocapture cargo test -p datafusion-physical-plan materialized_aggregate_output_slices_batches_until_exhausted -- --nocapture git diff --check ``` Additional local verification run during development: ```bash cargo test -p datafusion-physical-plan materialized_final_output_slices_batches_until_exhausted -- --nocapture cargo test -p datafusion-physical-plan partial_grouped_aggregate_uses_raw_partial_stream -- --nocapture ``` The new regression test was also applied to the pre-fix baseline and failed with the expected internal error when the partial output path used `EmitTo::First`. Local benchmark evidence was collected against the implementation commit before the final test/naming polish commit. ClickBench full 43-query run, 5 iterations, 24 cores skip partial aggregation probe ratio `0.8`: mode | total warm time | geomean warm time -- | -- | -- baseline migrated aggregate | 128509.47 ms | 352.79 ms patched migrated aggregate | 19652.37 ms | 180.65 ms baseline old aggregate path | 19774.70 ms | 181.25 ms Largest patched/current wins included: q33: 32961.02ms -> 1642.08ms q34: 32739.34ms -> 1635.07ms q18: 25673.25ms -> 1767.25ms q16: 5949.82ms -> 810.17ms q17: 5906.51ms -> 807.10ms TPC-DS SF10 full99, 10 rounds: Failures: 0 Aggregate geomean current/main: 0.982817 Aggregate current speedup: 1.748% ## Are there any user-facing changes? No. This is an internal physical execution change for hash aggregate output draining. There are no public API or documented behavior changes. --------- Co-authored-by: Qiwei Huang <qiwei.huang@jsessh.com> Co-authored-by: kamille <kamille@apache.org>
1 parent 4dadbbd commit 7d9f6ea

4 files changed

Lines changed: 273 additions & 56 deletions

File tree

datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
182182
acc + state.group_values.size()
183183
+ state.batch_group_indices.allocated_size()
184184
}
185-
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
185+
AggregateHashTableState::OutputtingMaterialized(output) => {
186186
output.memory_size()
187187
}
188188
AggregateHashTableState::Done => 0,
@@ -214,15 +214,6 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
214214
}
215215
}
216216

217-
pub(super) fn emit_to_for_batch_size(batch_size: usize, group_count: usize) -> EmitTo {
218-
debug_assert!(batch_size > 0);
219-
if group_count <= batch_size {
220-
EmitTo::All
221-
} else {
222-
EmitTo::First(batch_size)
223-
}
224-
}
225-
226217
/// State and argument information for a single Aggregate
227218
///
228219
/// For example, for `SELECT COUNT(x), SUM(y WHERE z > 10) ...` there would be two
@@ -304,24 +295,25 @@ pub(super) enum AggregateHashTableState {
304295
Building(AggregateHashTableBuffer),
305296
/// Emitting results directly from group keys and aggregate state.
306297
Outputting(AggregateHashTableBuffer),
307-
/// Materialize all the output results, and then incrementally output in the `OutputtingMaterializedFinal` state.
298+
/// Materialize all the output results, and then incrementally output in the `OutputtingMaterialized` state.
308299
///
309300
/// Note this is a temporary solution until the `GroupValues` issue is solved:
310301
/// Issue: <https://github.com/apache/datafusion/issues/23178>
311-
OutputtingMaterializedFinal(MaterializedFinalOutput),
302+
OutputtingMaterialized(MaterializedAggregateOutput),
312303
Done,
313304
}
314305

315-
/// Fully evaluated final aggregate output and the next row offset to emit.
306+
/// Fully evaluated aggregate output and the next row offset to emit.
316307
///
317-
/// Final aggregate evaluation consumes accumulator state, so final output is
318-
/// materialized once and then sliced to honor `batch_size` across output polls.
319-
pub(super) struct MaterializedFinalOutput {
308+
/// Final aggregate evaluation consumes accumulator state, and partial terminal
309+
/// output should not repeatedly renumber group values with `EmitTo::First`.
310+
/// Materialize once and then slice to honor `batch_size` across output polls.
311+
pub(super) struct MaterializedAggregateOutput {
320312
batch: RecordBatch,
321313
offset: usize,
322314
}
323315

324-
impl MaterializedFinalOutput {
316+
impl MaterializedAggregateOutput {
325317
pub(super) fn new(batch: RecordBatch) -> Self {
326318
Self { batch, offset: 0 }
327319
}
@@ -496,7 +488,7 @@ mod tests {
496488
use super::*;
497489

498490
#[test]
499-
fn materialized_final_output_slices_batches_until_exhausted() -> Result<()> {
491+
fn materialized_aggregate_output_slices_batches_until_exhausted() -> Result<()> {
500492
let schema = Arc::new(Schema::new(vec![Field::new(
501493
"group_col",
502494
DataType::Int32,
@@ -506,7 +498,7 @@ mod tests {
506498
schema,
507499
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
508500
)?;
509-
let mut output = MaterializedFinalOutput::new(batch);
501+
let mut output = MaterializedAggregateOutput::new(batch);
510502

511503
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![1, 2]);
512504
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![3, 4]);

datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::aggregates::AggregateExec;
2626

2727
use super::common::{
2828
AggregateHashTable, AggregateHashTableBuffer, AggregateHashTableState, FinalMarker,
29-
MaterializedFinalOutput,
29+
MaterializedAggregateOutput,
3030
};
3131

3232
/// Methods specific to the aggregate hash table used in the final aggregation stage.
@@ -57,8 +57,8 @@ impl AggregateHashTable<FinalMarker> {
5757
) -> Result<Option<RecordBatch>> {
5858
let output_schema = Arc::clone(&self.output_schema);
5959
let batch_size = self.batch_size;
60-
// Take ownership of the output state. Note `emit_next_materialized_batch`
61-
// updates state after it emits a materialized slice.
60+
// Take ownership of the output state. `emit_next_materialized_batch`
61+
// restores `self.state` to `OutputtingMaterialized` or `Done`.
6262
match std::mem::replace(&mut self.state, AggregateHashTableState::Done) {
6363
AggregateHashTableState::Outputting(state) => {
6464
if state.group_values.is_empty() {
@@ -68,7 +68,7 @@ impl AggregateHashTable<FinalMarker> {
6868
let output = self.materialize_final_output(state, output_schema)?;
6969
Ok(self.emit_next_materialized_batch(output, batch_size))
7070
}
71-
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
71+
AggregateHashTableState::OutputtingMaterialized(output) => {
7272
Ok(self.emit_next_materialized_batch(output, batch_size))
7373
}
7474
AggregateHashTableState::Done => Ok(None),
@@ -82,7 +82,7 @@ impl AggregateHashTable<FinalMarker> {
8282
&self,
8383
mut state: AggregateHashTableBuffer,
8484
output_schema: SchemaRef,
85-
) -> Result<MaterializedFinalOutput> {
85+
) -> Result<MaterializedAggregateOutput> {
8686
// Final aggregate evaluation consumes accumulator state. Evaluate all
8787
// groups once, then slice the materialized batch on subsequent polls.
8888
let emit_to = EmitTo::All;
@@ -96,19 +96,19 @@ impl AggregateHashTable<FinalMarker> {
9696

9797
let batch = RecordBatch::try_new(output_schema, output)?;
9898
debug_assert!(batch.num_rows() > 0);
99-
Ok(MaterializedFinalOutput::new(batch))
99+
Ok(MaterializedAggregateOutput::new(batch))
100100
}
101101

102102
fn emit_next_materialized_batch(
103103
&mut self,
104-
mut output: MaterializedFinalOutput,
104+
mut output: MaterializedAggregateOutput,
105105
batch_size: usize,
106106
) -> Option<RecordBatch> {
107107
let batch = output.next_batch(batch_size);
108108
if output.is_exhausted() {
109109
self.state = AggregateHashTableState::Done;
110110
} else {
111-
self.state = AggregateHashTableState::OutputtingMaterializedFinal(output);
111+
self.state = AggregateHashTableState::OutputtingMaterialized(output);
112112
}
113113
batch
114114
}

datafusion/physical-plan/src/aggregates/aggregate_hash_table/partial_table.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ use arrow::array::{ArrayRef, BooleanArray, new_null_array};
2323
use arrow::datatypes::SchemaRef;
2424
use arrow::record_batch::RecordBatch;
2525
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
26+
use datafusion_expr::EmitTo;
2627

2728
use crate::aggregates::group_values::new_group_values;
2829
use crate::aggregates::order::GroupOrdering;
2930
use crate::aggregates::{AggregateExec, group_id_array, max_duplicate_ordinal};
3031

3132
use super::common::{
3233
AggregateHashTable, AggregateHashTableBuffer, AggregateHashTableState,
33-
EvaluatedAccumulatorArgs, HashAggregateAccumulator, PartialMarker, PartialSkipMarker,
34-
emit_to_for_batch_size,
34+
EvaluatedAccumulatorArgs, HashAggregateAccumulator, MaterializedAggregateOutput,
35+
PartialMarker, PartialSkipMarker,
3536
};
3637

3738
/// Methods specific to the aggregate hash table used in the partial aggregation stage.
@@ -62,43 +63,60 @@ impl AggregateHashTable<PartialMarker> {
6263
) -> Result<Option<RecordBatch>> {
6364
let output_schema = Arc::clone(&self.output_schema);
6465
let batch_size = self.batch_size;
65-
match &mut self.state {
66+
// Take ownership of the output state. `emit_next_materialized_batch`
67+
// restores `self.state` to `OutputtingMaterialized` or `Done`.
68+
match std::mem::replace(&mut self.state, AggregateHashTableState::Done) {
6669
AggregateHashTableState::Outputting(state) => {
6770
if state.group_values.is_empty() {
68-
self.state = AggregateHashTableState::Done;
6971
return Ok(None);
7072
}
7173

72-
let emit_to =
73-
emit_to_for_batch_size(batch_size, state.group_values.len());
74-
let timer = self.group_by_metrics.emitting_time.timer();
75-
let mut output = state.group_values.emit(emit_to)?;
76-
77-
for acc in state.accumulators.iter_mut() {
78-
output.extend(acc.state(emit_to)?);
79-
}
80-
let done = state.group_values.is_empty();
81-
drop(timer);
82-
83-
let batch = RecordBatch::try_new(output_schema, output)?;
84-
debug_assert!(batch.num_rows() > 0);
85-
if done {
86-
self.state = AggregateHashTableState::Done;
87-
}
88-
Ok(Some(batch))
74+
let output = self.materialize_partial_output(state, output_schema)?;
75+
Ok(self.emit_next_materialized_batch(output, batch_size))
76+
}
77+
AggregateHashTableState::OutputtingMaterialized(output) => {
78+
Ok(self.emit_next_materialized_batch(output, batch_size))
8979
}
9080
AggregateHashTableState::Done => Ok(None),
9181
AggregateHashTableState::Building(_) => {
9282
internal_err!("next_output_batch must be called in the outputting state")
9383
}
94-
AggregateHashTableState::OutputtingMaterializedFinal(_) => {
95-
internal_err!(
96-
"partial aggregate output should not materialize final output"
97-
)
98-
}
9984
}
10085
}
10186

87+
fn materialize_partial_output(
88+
&self,
89+
mut state: AggregateHashTableBuffer,
90+
output_schema: SchemaRef,
91+
) -> Result<MaterializedAggregateOutput> {
92+
let emit_to = EmitTo::All;
93+
let timer = self.group_by_metrics.emitting_time.timer();
94+
let mut output = state.group_values.emit(emit_to)?;
95+
96+
for acc in state.accumulators.iter_mut() {
97+
output.extend(acc.state(emit_to)?);
98+
}
99+
drop(timer);
100+
101+
let batch = RecordBatch::try_new(output_schema, output)?;
102+
debug_assert!(batch.num_rows() > 0);
103+
Ok(MaterializedAggregateOutput::new(batch))
104+
}
105+
106+
fn emit_next_materialized_batch(
107+
&mut self,
108+
mut output: MaterializedAggregateOutput,
109+
batch_size: usize,
110+
) -> Option<RecordBatch> {
111+
let batch = output.next_batch(batch_size);
112+
if output.is_exhausted() {
113+
self.state = AggregateHashTableState::Done;
114+
} else {
115+
self.state = AggregateHashTableState::OutputtingMaterialized(output);
116+
}
117+
batch
118+
}
119+
102120
pub(in crate::aggregates) fn can_skip_aggregation(&self) -> bool {
103121
self.state
104122
.building()

0 commit comments

Comments
 (0)