Skip to content

Commit cfdce5a

Browse files
Merge branch 'main' into parquet-row-filter-struct-access-tree
2 parents 49b018b + d58e0c6 commit cfdce5a

6 files changed

Lines changed: 156 additions & 21 deletions

File tree

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ required-features = ["test_utils"]
123123
[[bench]]
124124
harness = false
125125
name = "aggregate_vectorized"
126+
required-features = ["test_utils"]
126127

127128
[[bench]]
128129
harness = false

datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
182182
acc + state.group_values.size()
183183
+ state.batch_group_indices.allocated_size()
184184
}
185+
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
186+
output.memory_size()
187+
}
185188
AggregateHashTableState::Done => 0,
186189
}
187190
}
@@ -297,11 +300,53 @@ pub(super) struct AggregateHashTableBuffer {
297300
}
298301

299302
pub(super) enum AggregateHashTableState {
303+
/// Accumulating input rows into group keys and aggregate state.
300304
Building(AggregateHashTableBuffer),
305+
/// Emitting results directly from group keys and aggregate state.
301306
Outputting(AggregateHashTableBuffer),
307+
/// Materialize all the output results, and then incrementally output in the `OutputtingMaterializedFinal` state.
308+
///
309+
/// Note this is a temporary solution until the `GroupValues` issue is solved:
310+
/// Issue: <https://github.com/apache/datafusion/issues/23178>
311+
OutputtingMaterializedFinal(MaterializedFinalOutput),
302312
Done,
303313
}
304314

315+
/// Fully evaluated final aggregate output and the next row offset to emit.
316+
///
317+
/// Final aggregate evaluation consumes accumulator state, so final output is
318+
/// materialized once and then sliced to honor `batch_size` across output polls.
319+
pub(super) struct MaterializedFinalOutput {
320+
batch: RecordBatch,
321+
offset: usize,
322+
}
323+
324+
impl MaterializedFinalOutput {
325+
pub(super) fn new(batch: RecordBatch) -> Self {
326+
Self { batch, offset: 0 }
327+
}
328+
329+
pub(super) fn next_batch(&mut self, batch_size: usize) -> Option<RecordBatch> {
330+
debug_assert!(batch_size > 0);
331+
if self.is_exhausted() {
332+
return None;
333+
}
334+
335+
let length = batch_size.min(self.batch.num_rows() - self.offset);
336+
let batch = self.batch.slice(self.offset, length);
337+
self.offset += length;
338+
Some(batch)
339+
}
340+
341+
pub(super) fn is_exhausted(&self) -> bool {
342+
self.offset >= self.batch.num_rows()
343+
}
344+
345+
pub(super) fn memory_size(&self) -> usize {
346+
self.batch.get_array_memory_size()
347+
}
348+
}
349+
305350
impl HashAggregateAccumulator {
306351
fn new(
307352
aggregate_expr: Arc<AggregateFunctionExpr>,
@@ -440,3 +485,44 @@ impl AggregateHashTableState {
440485
state
441486
}
442487
}
488+
489+
#[cfg(test)]
490+
mod tests {
491+
use std::sync::Arc;
492+
493+
use arrow::array::{Array, Int32Array};
494+
use arrow::datatypes::{DataType, Field, Schema};
495+
496+
use super::*;
497+
498+
#[test]
499+
fn materialized_final_output_slices_batches_until_exhausted() -> Result<()> {
500+
let schema = Arc::new(Schema::new(vec![Field::new(
501+
"group_col",
502+
DataType::Int32,
503+
false,
504+
)]));
505+
let batch = RecordBatch::try_new(
506+
schema,
507+
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
508+
)?;
509+
let mut output = MaterializedFinalOutput::new(batch);
510+
511+
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![1, 2]);
512+
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![3, 4]);
513+
assert_eq!(int32_values(&output.next_batch(2).unwrap(), 0), vec![5]);
514+
assert!(output.next_batch(2).is_none());
515+
assert!(output.is_exhausted());
516+
517+
Ok(())
518+
}
519+
520+
fn int32_values(batch: &RecordBatch, column: usize) -> Vec<i32> {
521+
let array = batch
522+
.column(column)
523+
.as_any()
524+
.downcast_ref::<Int32Array>()
525+
.unwrap();
526+
(0..array.len()).map(|idx| array.value(idx)).collect()
527+
}
528+
}

datafusion/physical-plan/src/aggregates/aggregate_hash_table/final_table.rs

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ use std::sync::Arc;
2020
use arrow::datatypes::SchemaRef;
2121
use arrow::record_batch::RecordBatch;
2222
use datafusion_common::{Result, internal_err};
23+
use datafusion_expr::EmitTo;
2324

2425
use crate::aggregates::AggregateExec;
2526

2627
use super::common::{
27-
AggregateHashTable, AggregateHashTableState, FinalMarker, emit_to_for_batch_size,
28+
AggregateHashTable, AggregateHashTableBuffer, AggregateHashTableState, FinalMarker,
29+
MaterializedFinalOutput,
2830
};
2931

3032
/// Methods specific to the aggregate hash table used in the final aggregation stage.
@@ -55,30 +57,19 @@ impl AggregateHashTable<FinalMarker> {
5557
) -> Result<Option<RecordBatch>> {
5658
let output_schema = Arc::clone(&self.output_schema);
5759
let batch_size = self.batch_size;
58-
match &mut self.state {
60+
// Take ownership of the output state. Note `emit_next_materialized_batch`
61+
// updates state after it emits a materialized slice.
62+
match std::mem::replace(&mut self.state, AggregateHashTableState::Done) {
5963
AggregateHashTableState::Outputting(state) => {
6064
if state.group_values.is_empty() {
61-
self.state = AggregateHashTableState::Done;
6265
return Ok(None);
6366
}
6467

65-
let emit_to =
66-
emit_to_for_batch_size(batch_size, state.group_values.len());
67-
let timer = self.group_by_metrics.emitting_time.timer();
68-
let mut output = state.group_values.emit(emit_to)?;
69-
70-
for acc in state.accumulators.iter_mut() {
71-
output.push(acc.evaluate(emit_to)?);
72-
}
73-
let done = state.group_values.is_empty();
74-
drop(timer);
75-
76-
let batch = RecordBatch::try_new(output_schema, output)?;
77-
debug_assert!(batch.num_rows() > 0);
78-
if done {
79-
self.state = AggregateHashTableState::Done;
80-
}
81-
Ok(Some(batch))
68+
let output = self.materialize_final_output(state, output_schema)?;
69+
Ok(self.emit_next_materialized_batch(output, batch_size))
70+
}
71+
AggregateHashTableState::OutputtingMaterializedFinal(output) => {
72+
Ok(self.emit_next_materialized_batch(output, batch_size))
8273
}
8374
AggregateHashTableState::Done => Ok(None),
8475
AggregateHashTableState::Building(_) => {
@@ -87,6 +78,41 @@ impl AggregateHashTable<FinalMarker> {
8778
}
8879
}
8980

81+
fn materialize_final_output(
82+
&self,
83+
mut state: AggregateHashTableBuffer,
84+
output_schema: SchemaRef,
85+
) -> Result<MaterializedFinalOutput> {
86+
// Final aggregate evaluation consumes accumulator state. Evaluate all
87+
// groups once, then slice the materialized batch on subsequent polls.
88+
let emit_to = EmitTo::All;
89+
let timer = self.group_by_metrics.emitting_time.timer();
90+
let mut output = state.group_values.emit(emit_to)?;
91+
92+
for acc in state.accumulators.iter_mut() {
93+
output.push(acc.evaluate(emit_to)?);
94+
}
95+
drop(timer);
96+
97+
let batch = RecordBatch::try_new(output_schema, output)?;
98+
debug_assert!(batch.num_rows() > 0);
99+
Ok(MaterializedFinalOutput::new(batch))
100+
}
101+
102+
fn emit_next_materialized_batch(
103+
&mut self,
104+
mut output: MaterializedFinalOutput,
105+
batch_size: usize,
106+
) -> Option<RecordBatch> {
107+
let batch = output.next_batch(batch_size);
108+
if output.is_exhausted() {
109+
self.state = AggregateHashTableState::Done;
110+
} else {
111+
self.state = AggregateHashTableState::OutputtingMaterializedFinal(output);
112+
}
113+
batch
114+
}
115+
90116
pub(in crate::aggregates) fn aggregate_batch(
91117
&mut self,
92118
batch: &RecordBatch,

datafusion/physical-plan/src/aggregates/aggregate_hash_table/partial_table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ impl AggregateHashTable<PartialMarker> {
9191
AggregateHashTableState::Building(_) => {
9292
internal_err!("next_output_batch must be called in the outputting state")
9393
}
94+
AggregateHashTableState::OutputtingMaterializedFinal(_) => {
95+
internal_err!(
96+
"partial aggregate output should not materialize final output"
97+
)
98+
}
9499
}
95100
}
96101

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,13 @@ impl AggregateExec {
10091009
));
10101010
}
10111011

1012+
// `GroupedHashAggregateStream` is being incrementally refactored. See the
1013+
// tracking issue for details.
1014+
//
1015+
// New features and improvements should go directly into the new implementation.
1016+
// Please coordinate through the tracking issue.
1017+
//
1018+
// Issue: <https://github.com/apache/datafusion/issues/22710>
10121019
if context
10131020
.session_config()
10141021
.options()
@@ -1028,7 +1035,7 @@ impl AggregateExec {
10281035
}
10291036
}
10301037

1031-
// grouping by something else and we need to just materialize all results
1038+
// Execution paths that have not been migrated use the fallback implementation
10321039
Ok(StreamType::GroupedHash(GroupedHashAggregateStream::new(
10331040
self, context, partition,
10341041
)?))

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ enum OutOfMemoryMode {
132132

133133
/// HashTable based Grouping Aggregator
134134
///
135+
/// # Development Note
136+
///
137+
/// This implementation is being incrementally refactored. See the tracking issue
138+
/// for details.
139+
///
140+
/// New features and improvements should go directly into the new implementation.
141+
/// Please coordinate through the tracking issue.
142+
///
143+
/// Issue: <https://github.com/apache/datafusion/issues/22710>
144+
///
135145
/// # Design Goals
136146
///
137147
/// This structure is designed so that updating the aggregates can be

0 commit comments

Comments
 (0)