Skip to content

Commit ecaea3d

Browse files
Dandandanclaude
andcommitted
fix: use emit+reset strategy for distinct aggregates
Distinct aggregates (COUNT(DISTINCT)) benefit from periodic emit+reset (1.45x faster on Q9) but are catastrophically slow with overflow passthrough (12x slower) because convert_to_state produces per-row state objects. Now uses two strategies based on aggregate type: - Non-distinct: overflow passthrough (convert_to_state, no reset) - Distinct: emit+reset (state() serializes HashSets compactly) Both strategies keep the hash table within the configured size limit for better cache performance. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1390b5e commit ecaea3d

1 file changed

Lines changed: 50 additions & 7 deletions

File tree

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,10 @@ pub(crate) struct GroupedHashAggregateStream {
461461
/// is emitted at end-of-input.
462462
overflow_passthrough: bool,
463463

464+
/// Maximum table size for emit+reset strategy (used for distinct
465+
/// aggregates where overflow passthrough is not applicable).
466+
emit_reset_max_table_size: usize,
467+
464468
// ========================================================================
465469
// EXECUTION RESOURCES:
466470
// Fields related to managing execution resources and monitoring performance.
@@ -673,15 +677,19 @@ impl GroupedHashAggregateStream {
673677
None
674678
};
675679

676-
// Overflow passthrough requires convert_to_state support
677-
// (same requirement as skip_aggregation_probe) and no distinct
678-
// aggregates (convert_to_state for distinct produces per-row
679-
// state objects that are catastrophically expensive to merge).
680+
// Two strategies for keeping the hash table cache-friendly:
681+
//
682+
// 1. Overflow passthrough (non-distinct): convert overflow
683+
// batches to state via convert_to_state. Fast, no
684+
// serialization of accumulated groups.
685+
//
686+
// 2. Emit+reset (distinct): emit accumulated state via state()
687+
// and reset the hash table. Works for distinct aggregates
688+
// where convert_to_state produces catastrophically expensive
689+
// per-row state.
680690
let has_distinct = aggregate_exprs.iter().any(|e| e.is_distinct());
681-
let overflow_passthrough_max_table_size = if agg.mode == AggregateMode::Partial
691+
let max_table_size = if agg.mode == AggregateMode::Partial
682692
&& matches!(group_ordering, GroupOrdering::None)
683-
&& skip_aggregation_probe.is_some()
684-
&& !has_distinct
685693
{
686694
context
687695
.session_config()
@@ -691,6 +699,17 @@ impl GroupedHashAggregateStream {
691699
} else {
692700
0
693701
};
702+
let overflow_passthrough_max_table_size =
703+
if max_table_size > 0 && skip_aggregation_probe.is_some() && !has_distinct {
704+
max_table_size
705+
} else {
706+
0
707+
};
708+
let emit_reset_max_table_size = if max_table_size > 0 && has_distinct {
709+
max_table_size
710+
} else {
711+
0
712+
};
694713

695714
let reduction_factor = if agg.mode == AggregateMode::Partial {
696715
Some(
@@ -725,6 +744,7 @@ impl GroupedHashAggregateStream {
725744
skip_aggregation_probe,
726745
overflow_passthrough_max_table_size,
727746
overflow_passthrough: false,
747+
emit_reset_max_table_size,
728748
reduction_factor,
729749
})
730750
}
@@ -872,6 +892,29 @@ impl Stream for GroupedHashAggregateStream {
872892
}
873893
}
874894

895+
// Emit+reset: for distinct aggregates, emit all
896+
// accumulated state and reset the hash table.
897+
// Uses state() which compactly serializes distinct
898+
// HashSets, unlike convert_to_state which explodes.
899+
if self.emit_reset_max_table_size > 0 {
900+
let table_size = self.group_values.size()
901+
+ self
902+
.accumulators
903+
.iter()
904+
.map(|x| x.size())
905+
.sum::<usize>();
906+
if table_size >= self.emit_reset_max_table_size {
907+
let batch_size = self.batch_size;
908+
if let Some(batch) = self.emit(EmitTo::All, false)? {
909+
self.clear_shrink(batch_size);
910+
timer.done();
911+
self.exec_state =
912+
ExecutionState::ProducingOutput(batch);
913+
break 'reading_input;
914+
}
915+
}
916+
}
917+
875918
// If we reach this point, try to update the memory reservation
876919
// handling out-of-memory conditions as determined by the OOM mode.
877920
if let Some(new_state) =

0 commit comments

Comments
 (0)