Skip to content

Commit d5b2f3c

Browse files
committed
add config to control if we enable blocked groups optimization.
1 parent 3479de7 commit d5b2f3c

2 files changed

Lines changed: 19 additions & 3 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,18 @@ config_namespace! {
405405
/// in joins can reduce memory usage when joining large
406406
/// tables with a highly-selective join filter, but is also slightly slower.
407407
pub enforce_batch_size_in_joins: bool, default = false
408+
409+
/// Should DataFusion use the the blocked approach to manage the groups
410+
/// values and their related states in accumulators.
411+
/// By default, the blocked approach will be used. And the blocked approach
412+
/// allocates capacity for the block based on a predefined block size firstly.
413+
/// When the block reaches its limit, we allocate a new block (also with
414+
/// the same predefined block size based capacity) instead of expanding
415+
/// the current one and copying the data.
416+
/// If setting this flag to `false`, will fall-back to use the single approach,
417+
/// values are managed within a single large block(can think of it as a Vec).
418+
/// As this block grows, it often triggers numerous copies, resulting in poor performance.
419+
pub enable_aggregation_blocked_groups: bool, default = true
408420
}
409421
}
410422

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ pub(crate) fn create_group_accumulator(
659659

660660
/// Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s.
661661
/// The blocked optimization will be enabled when:
662-
/// - When `enable_aggregation_intermediate_states_blocked_approach` is true
662+
/// - When `enable_aggregation_blocked_groups` is true(default to true)
663663
/// - It is not streaming aggregation(because blocked mode can't support Emit::first(exact n))
664664
/// - The spilling is disabled(still need to consider more to support it efficiently)
665665
/// - The accumulator is not empty(I am still not sure about logic in this case)
@@ -676,8 +676,12 @@ fn maybe_enable_blocked_groups(
676676
block_size: usize,
677677
group_ordering: &GroupOrdering,
678678
) -> Result<bool> {
679-
// if !context.session_config().options().execution
680-
if !matches!(group_ordering, GroupOrdering::None)
679+
if !context
680+
.session_config()
681+
.options()
682+
.execution
683+
.enable_aggregation_blocked_groups
684+
|| !matches!(group_ordering, GroupOrdering::None)
681685
|| accumulators.is_empty()
682686
|| context.runtime_env().disk_manager.tmp_files_enabled()
683687
{

0 commit comments

Comments
 (0)