From 93718fbc2529c4b13e1380d1436b98b811d89013 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Mon, 23 Mar 2026 13:56:59 +0800 Subject: [PATCH 1/6] fix: use spill depth partition bit in new final agg --- .../src/aggregate/aggregate_hashtable.rs | 6 +- src/query/expression/src/aggregate/mod.rs | 7 ++ .../src/aggregate/partitioned_payload.rs | 51 +++++++++-- .../aggregator/build_partition_bucket.rs | 2 + .../new_transform_final_aggregate.rs | 90 ++++++++++++------- 5 files changed, 116 insertions(+), 40 deletions(-) diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 35c65f6af30ea..0f8670ee90d8d 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -74,10 +74,11 @@ impl AggregateHashTable { Self { direct_append: false, current_radix_bits: config.initial_radix_bits, - payload: PartitionedPayload::new( + payload: PartitionedPayload::new_with_start_bit( group_types, aggrs, 1 << config.initial_radix_bits, + config.partition_start_bit, vec![arena], ), hash_index: HashIndex::new(&config, capacity), @@ -105,10 +106,11 @@ impl AggregateHashTable { Self { direct_append: !need_init_entry, current_radix_bits: config.initial_radix_bits, - payload: PartitionedPayload::new( + payload: PartitionedPayload::new_with_start_bit( group_types, aggrs, 1 << config.initial_radix_bits, + config.partition_start_bit, vec![arena], ), hash_index, diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index b6c54cb39f0cd..a3f715caba094 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -154,6 +154,7 @@ pub struct HashTableConfig { // Max radix bits across all threads, this is a hint to repartition pub current_max_radix_bits: Arc, pub initial_radix_bits: u64, + pub partition_start_bit: u64, pub max_radix_bits: u64, pub repartition_radix_bits_incr: u64, pub block_fill_factor: f64, @@ -167,6 +168,7 @@ impl Default for HashTableConfig { Self { current_max_radix_bits: Arc::new(AtomicU64::new(3)), initial_radix_bits: 3, + partition_start_bit: 0, max_radix_bits: MAX_RADIX_BITS, repartition_radix_bits_incr: 2, block_fill_factor: 1.8, @@ -211,6 +213,11 @@ impl HashTableConfig { self } + pub fn with_partition_start_bit(mut self, partition_start_bit: u64) -> Self { + self.partition_start_bit = partition_start_bit; + self + } + pub fn with_experiment_hash_index(mut self, enable: bool) -> Self { self.enable_experiment_hash_index = enable; self diff --git a/src/query/expression/src/aggregate/partitioned_payload.rs b/src/query/expression/src/aggregate/partitioned_payload.rs index da498edece3e5..10ceeeadd61d7 100644 --- a/src/query/expression/src/aggregate/partitioned_payload.rs +++ b/src/query/expression/src/aggregate/partitioned_payload.rs @@ -36,10 +36,15 @@ struct PartitionMask { impl PartitionMask { fn new(partition_count: u64) -> Self { + Self::with_start_bit(partition_count, 0) + } + + fn with_start_bit(partition_count: u64, start_bit: u64) -> Self { let radix_bits = partition_count.trailing_zeros() as u64; debug_assert_eq!(1 << radix_bits, partition_count); + debug_assert!(start_bit + radix_bits <= 48); - let shift = 48 - radix_bits; + let shift = 48 - start_bit - radix_bits; let mask = ((1 << radix_bits) - 1) << shift; Self { mask, shift } @@ -59,6 +64,7 @@ pub struct PartitionedPayload { pub arenas: Vec>, + partition_start_bit: u64, partition_mask: PartitionMask, } @@ -71,6 +77,16 @@ impl PartitionedPayload { aggrs: Vec, partition_count: u64, arenas: Vec>, + ) -> Self { + Self::new_with_start_bit(group_types, aggrs, partition_count, 0, arenas) + } + + pub fn new_with_start_bit( + group_types: Vec, + aggrs: Vec, + partition_count: u64, + partition_start_bit: u64, + arenas: Vec>, ) -> Self { let states_layout = if !aggrs.is_empty() { Some(get_states_layout(&aggrs).unwrap()) @@ -101,7 +117,8 @@ impl PartitionedPayload { row_layout, arenas, - partition_mask: PartitionMask::new(partition_count), + partition_start_bit, + partition_mask: PartitionMask::with_start_bit(partition_count, partition_start_bit), } } @@ -169,11 +186,17 @@ impl PartitionedPayload { group_types, aggrs, arenas, + partition_start_bit, .. } = self; - let mut new_partition_payload = - PartitionedPayload::new(group_types, aggrs, new_partition_count as u64, arenas); + let mut new_partition_payload = PartitionedPayload::new_with_start_bit( + group_types, + aggrs, + new_partition_count as u64, + partition_start_bit, + arenas, + ); state.clear(); for payload in payloads.into_iter() { @@ -184,7 +207,9 @@ impl PartitionedPayload { } pub fn combine(&mut self, other: PartitionedPayload, state: &mut PayloadFlushState) { - if other.partition_count() == self.partition_count() { + if other.partition_count() == self.partition_count() + && other.partition_start_bit == self.partition_start_bit + { for (l, r) in self.payloads.iter_mut().zip(other.payloads.into_iter()) { l.combine(r); } @@ -293,3 +318,19 @@ impl PartitionedPayload { self.payloads.iter().map(|x| x.memory_size()).sum() } } + +#[cfg(test)] +mod tests { + use super::PartitionMask; + + #[test] + fn test_partition_mask_with_start_bit() { + let top_bit_mask = PartitionMask::new(2); + assert_eq!(top_bit_mask.index(1_u64 << 47), 1); + assert_eq!(top_bit_mask.index(1_u64 << 44), 0); + + let shifted_mask = PartitionMask::with_start_bit(2, 3); + assert_eq!(shifted_mask.index(1_u64 << 47), 0); + assert_eq!(shifted_mask.index(1_u64 << 44), 1); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs index 1b94a7cba986f..4abb38308b55d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs @@ -49,6 +49,7 @@ fn build_partition_bucket_experimental( shuffle_mode: AggregateShuffleMode, ) -> Result<()> { let mut final_parallelism = ctx.get_settings().get_max_threads()? as usize; + let base_consumed_bits = shuffle_mode.determine_radix_bits(); match shuffle_mode { AggregateShuffleMode::Row => { let schema = params.spill_schema(); @@ -115,6 +116,7 @@ fn build_partition_bucket_experimental( output_port.clone(), params.clone(), id, + base_consumed_bits, ctx.clone(), tx.clone(), rx.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index c6b7031882b07..387578ba460f4 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -74,6 +74,9 @@ pub struct NewTransformFinalAggregate { flush_state: PayloadFlushState, statistics: AggregationStatistics, _id: usize, + base_consumed_bits: u64, + max_partition_depth: usize, + current_partition_depth: usize, spiller: NewAggregateSpiller, settings: MemorySettings, max_aggregate_spill_level: usize, @@ -85,21 +88,18 @@ impl NewTransformFinalAggregate { output: Arc, params: Arc, _id: usize, + base_consumed_bits: u64, ctx: Arc, tx: Sender, rx: Receiver, ) -> Result> { let settings = ctx.get_settings(); - let max_aggregate_spill_level = settings.get_max_aggregate_spill_level()?; + let available_partition_depths = 48_u64.saturating_sub(base_consumed_bits) as usize; + let max_partition_depth = available_partition_depths.saturating_sub(1); + let max_aggregate_spill_level = + (settings.get_max_aggregate_spill_level()? as usize).min(max_partition_depth); - let hashtable = AggregateHashTable::new( - params.group_data_types.clone(), - params.aggregate_functions.clone(), - HashTableConfig::default() - .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64) - .with_experiment_hash_index(params.enable_experiment_hash_index), - Arc::new(Bump::new()), - ); + let hashtable = Self::create_hashtable(¶ms, base_consumed_bits, 0); let flush_state = PayloadFlushState::default(); let spiller = NewAggregateSpiller::try_create( @@ -128,14 +128,55 @@ impl NewTransformFinalAggregate { flush_state, statistics: AggregationStatistics::new("NewFinalAggregate"), _id, + base_consumed_bits, + max_partition_depth, + current_partition_depth: 0, spiller, settings: MemorySettings::from_aggregate_settings(&ctx)?, - max_aggregate_spill_level: max_aggregate_spill_level as usize, + max_aggregate_spill_level, })) } } impl NewTransformFinalAggregate { + fn create_hashtable( + params: &Arc, + base_consumed_bits: u64, + spilled_depth: usize, + ) -> AggregateHashTable { + AggregateHashTable::new( + params.group_data_types.clone(), + params.aggregate_functions.clone(), + HashTableConfig::default() + .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64) + .with_partition_start_bit(base_consumed_bits + spilled_depth as u64) + .with_experiment_hash_index(params.enable_experiment_hash_index), + Arc::new(Bump::new()), + ) + } + + fn reset_hashtable(&mut self, spilled_depth: usize) { + let partition_depth = spilled_depth.min(self.max_partition_depth); + self.hashtable = HashTable::AggregateHashTable(Self::create_hashtable( + &self.params, + self.base_consumed_bits, + partition_depth, + )); + self.current_partition_depth = partition_depth; + } + + // One final-aggregate processor handles both the original input stream and + // recursively spilled tasks received from the channel. Those tasks may + // belong to different spill depths, and each depth must consume a different + // hash bit (`base_consumed_bits + spilled_depth`) when building the internal + // 2-way partitions. + fn ensure_spill_depth(&mut self, spilled_depth: usize) { + let partition_depth = spilled_depth.min(self.max_partition_depth); + if self.current_partition_depth != partition_depth { + self.reset_hashtable(spilled_depth); + } + } + fn handle_serialized(&mut self, payload: SerializedPayload) -> Result<()> { if payload.data_block.is_empty() { return Ok(()); @@ -219,10 +260,6 @@ impl NewTransformFinalAggregate { fn spill_out(&mut self) -> Result<()> { self.spilled_occurred = true; if let HashTable::AggregateHashTable(v) = mem::take(&mut self.hashtable) { - let group_types = v.payload.group_types.clone(); - let aggrs = v.payload.aggrs.clone(); - let config = v.config.clone(); - for (bucket, payload) in v.payload.payloads.into_iter().enumerate() { if payload.len() == 0 { continue; @@ -231,17 +268,10 @@ impl NewTransformFinalAggregate { let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); self.spiller.spill(bucket, data_block)?; } - - let arena = Arc::new(Bump::new()); - self.hashtable = HashTable::AggregateHashTable(AggregateHashTable::new( - group_types, - aggrs, - config, - arena, - )); } else { unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state during spill check") } + self.reset_hashtable(self.current_partition_depth); Ok(()) } @@ -254,10 +284,6 @@ impl NewTransformFinalAggregate { } if let HashTable::AggregateHashTable(mut ht) = mem::take(&mut self.hashtable) { - let group_types = ht.payload.group_types.clone(); - let aggrs = ht.payload.aggrs.clone(); - let config = ht.config.clone(); - self.statistics.log_finish_statistics(&ht); let mut blocks = vec![]; self.flush_state.clear(); @@ -280,12 +306,7 @@ impl NewTransformFinalAggregate { self.output.push_data(Ok(concat)); } } - self.hashtable = HashTable::AggregateHashTable(AggregateHashTable::new( - group_types, - aggrs, - config, - Arc::new(Bump::new()), - )); + self.reset_hashtable(self.current_partition_depth); } Ok(()) @@ -381,9 +402,11 @@ impl Processor for NewTransformFinalAggregate { fn process(&mut self) -> Result<()> { let input_data = self.input_data.take(); if let Some(meta) = input_data { - self.handle_meta(meta, true)?; + self.ensure_spill_depth(0); + self.handle_meta(meta, self.max_aggregate_spill_level > 0)?; return Ok(()); } else if let Some(mut task) = self.channel_data.take() { + self.ensure_spill_depth(task.spilled_depth); let meta = AggregateMeta::NewSpilled(mem::take(&mut task.spilled_payload)); if task.spilled_depth >= self.max_aggregate_spill_level { self.handle_meta(meta, false)?; @@ -394,6 +417,7 @@ impl Processor for NewTransformFinalAggregate { return Ok(()); } else { + self.ensure_spill_depth(0); let sender = mem::take(&mut self.tx) .expect("logic error: called finished for input data more than once"); self.finish(0, sender)?; From cf56e34e8c062e5ac9beead990578304e10b78c4 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 24 Mar 2026 10:07:25 +0800 Subject: [PATCH 2/6] feat: add final agg task statistics --- .../aggregator/build_partition_bucket.rs | 3 + .../new_transform_final_aggregate.rs | 92 ++++++++++++++++--- .../transforms/aggregator/statistics.rs | 87 ++++++++++++++---- 3 files changed, 155 insertions(+), 27 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs index 4abb38308b55d..47fafc6ed3c1c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -108,6 +109,7 @@ fn build_partition_bucket_experimental( let mut builder = TransformPipeBuilder::create(); let (tx, rx) = async_channel::unbounded(); + let next_task_id = Arc::new(AtomicU64::new(1)); for id in 0..final_parallelism { let input_port = InputPort::create(); let output_port = OutputPort::create(); @@ -120,6 +122,7 @@ fn build_partition_bucket_experimental( ctx.clone(), tx.clone(), rx.clone(), + next_task_id.clone(), )?; builder.add_transform(input_port, output_port, ProcessorPtr::create(processor)); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 387578ba460f4..8831f9abb340f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -15,6 +15,8 @@ use std::any::Any; use std::mem; use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use async_channel::Receiver; use async_channel::Sender; @@ -46,6 +48,7 @@ use crate::pipelines::processors::transforms::aggregator::transform_aggregate_pa use crate::sessions::QueryContext; const SPILL_BUCKET_NUM: usize = 2; +const SPILL_BUCKET_BITS: u64 = SPILL_BUCKET_NUM.trailing_zeros() as u64; enum Stage { Input, @@ -53,6 +56,7 @@ enum Stage { } pub struct FinalAggregateTask { + task_id: u64, spilled_depth: usize, spilled_payload: Vec, tx: Sender, @@ -80,6 +84,7 @@ pub struct NewTransformFinalAggregate { spiller: NewAggregateSpiller, settings: MemorySettings, max_aggregate_spill_level: usize, + next_task_id: Arc, } impl NewTransformFinalAggregate { @@ -92,9 +97,11 @@ impl NewTransformFinalAggregate { ctx: Arc, tx: Sender, rx: Receiver, + next_task_id: Arc, ) -> Result> { let settings = ctx.get_settings(); - let available_partition_depths = 48_u64.saturating_sub(base_consumed_bits) as usize; + let available_partition_depths = + (48_u64.saturating_sub(base_consumed_bits) / SPILL_BUCKET_BITS) as usize; let max_partition_depth = available_partition_depths.saturating_sub(1); let max_aggregate_spill_level = (settings.get_max_aggregate_spill_level()? as usize).min(max_partition_depth); @@ -134,11 +141,20 @@ impl NewTransformFinalAggregate { spiller, settings: MemorySettings::from_aggregate_settings(&ctx)?, max_aggregate_spill_level, + next_task_id, })) } } impl NewTransformFinalAggregate { + fn next_task_id(&self) -> u64 { + self.next_task_id.fetch_add(1, Ordering::Relaxed) + } + + fn partition_start_bit(base_consumed_bits: u64, spilled_depth: usize) -> u64 { + base_consumed_bits + spilled_depth as u64 * SPILL_BUCKET_BITS + } + fn create_hashtable( params: &Arc, base_consumed_bits: u64, @@ -148,8 +164,11 @@ impl NewTransformFinalAggregate { params.group_data_types.clone(), params.aggregate_functions.clone(), HashTableConfig::default() - .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64) - .with_partition_start_bit(base_consumed_bits + spilled_depth as u64) + .with_initial_radix_bits(SPILL_BUCKET_BITS) + .with_partition_start_bit(Self::partition_start_bit( + base_consumed_bits, + spilled_depth, + )) .with_experiment_hash_index(params.enable_experiment_hash_index), Arc::new(Bump::new()), ) @@ -168,8 +187,8 @@ impl NewTransformFinalAggregate { // One final-aggregate processor handles both the original input stream and // recursively spilled tasks received from the channel. Those tasks may // belong to different spill depths, and each depth must consume a different - // hash bit (`base_consumed_bits + spilled_depth`) when building the internal - // 2-way partitions. + // hash-bit window when building the internal partitions. The window start is + // `base_consumed_bits + spilled_depth * SPILL_BUCKET_BITS`. fn ensure_spill_depth(&mut self, spilled_depth: usize) { let partition_depth = spilled_depth.min(self.max_partition_depth); if self.current_partition_depth != partition_depth { @@ -275,16 +294,38 @@ impl NewTransformFinalAggregate { Ok(()) } - fn finish(&mut self, spilled_depth: usize, tx: Sender) -> Result<()> { + fn finish( + &mut self, + task_id: Option, + spilled_depth: usize, + tx: Sender, + ) -> Result<()> { if self.spilled_occurred { + let (output_rows, hash_index_resizes) = match &self.hashtable { + HashTable::AggregateHashTable(ht) => { + (ht.payload.len(), ht.hash_index_resize_count()) + } + _ => unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state before spill"), + }; self.spill_finish(spilled_depth, tx)?; + if let Some(task_id) = task_id { + self.statistics.log_task_finish_statistics( + task_id, + self._id, + spilled_depth, + output_rows, + hash_index_resizes, + true, + ); + } else { + self.statistics.reset(); + } self.spilled_occurred = false; return Ok(()); } if let HashTable::AggregateHashTable(mut ht) = mem::take(&mut self.hashtable) { - self.statistics.log_finish_statistics(&ht); let mut blocks = vec![]; self.flush_state.clear(); @@ -306,6 +347,18 @@ impl NewTransformFinalAggregate { self.output.push_data(Ok(concat)); } } + if let Some(task_id) = task_id { + self.statistics.log_task_finish_statistics( + task_id, + self._id, + spilled_depth, + ht.payload.len(), + ht.hash_index_resize_count(), + false, + ); + } else { + self.statistics.log_finish_statistics(&ht); + } self.reset_hashtable(self.current_partition_depth); } @@ -321,9 +374,25 @@ impl NewTransformFinalAggregate { chunks[payload.bucket as usize].push(payload); } - for chunk in chunks.into_iter() { + let next_spill_depth = spilled_depth + 1; + for (bucket, chunk) in chunks.into_iter().enumerate() { + let task_id = self.next_task_id(); + let rows = chunk + .iter() + .map(|payload| payload.row_group.num_rows() as usize) + .sum::(); + log::info!( + "Spill finish emitted task: task_id={}, processor={}, spill_depth={}, bucket={}, payloads={}, rows={}", + task_id, + self._id, + spilled_depth, + bucket, + chunk.len(), + rows, + ); let spilled = FinalAggregateTask { - spilled_depth: spilled_depth + 1, + task_id, + spilled_depth: next_spill_depth, spilled_payload: chunk, tx: tx.clone(), }; @@ -407,20 +476,21 @@ impl Processor for NewTransformFinalAggregate { return Ok(()); } else if let Some(mut task) = self.channel_data.take() { self.ensure_spill_depth(task.spilled_depth); + self.statistics.reset(); let meta = AggregateMeta::NewSpilled(mem::take(&mut task.spilled_payload)); if task.spilled_depth >= self.max_aggregate_spill_level { self.handle_meta(meta, false)?; } else { self.handle_meta(meta, true)?; } - self.finish(task.spilled_depth, task.tx)?; + self.finish(Some(task.task_id), task.spilled_depth, task.tx)?; return Ok(()); } else { self.ensure_spill_depth(0); let sender = mem::take(&mut self.tx) .expect("logic error: called finished for input data more than once"); - self.finish(0, sender)?; + self.finish(None, 0, sender)?; } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs index ebf538a9c1f19..52dbaea523e2e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs @@ -45,7 +45,43 @@ impl AggregationStatistics { } } + pub fn reset(&mut self) { + self.processed_rows = 0; + self.processed_bytes = 0; + self.first_block_start = None; + self.start = Instant::now(); + } + pub fn log_finish_statistics(&mut self, hashtable: &AggregateHashTable) { + self.log_finish( + hashtable.payload.len(), + hashtable.hash_index_resize_count(), + None, + ); + } + + pub fn log_task_finish_statistics( + &mut self, + task_id: u64, + processor_id: usize, + spill_depth: usize, + output_rows: usize, + hash_index_resizes: usize, + spilled: bool, + ) { + self.log_finish( + output_rows, + hash_index_resizes, + Some((task_id, processor_id, spill_depth, spilled)), + ); + } + + fn log_finish( + &mut self, + output_rows: usize, + hash_index_resizes: usize, + task: Option<(u64, usize, usize, bool)>, + ) { let elapsed = self.start.elapsed().as_secs_f64(); let real_elapsed = self .first_block_start @@ -53,22 +89,41 @@ impl AggregationStatistics { .map(|t| t.elapsed().as_secs_f64()) .unwrap_or(elapsed); - log::info!( - "[TRANSFORM-AGGREGATOR][{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", - self.stage, - self.processed_rows, - hashtable.payload.len(), - elapsed, - real_elapsed, - convert_number_size(self.processed_rows as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64), - hashtable.hash_index_resize_count(), - ); + match task { + Some((task_id, processor_id, spill_depth, spilled)) => { + log::info!( + "[{}] Task completed: task_id={}, processor={}, spill_depth={}, spilled={}, {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + self.stage, + task_id, + processor_id, + spill_depth, + spilled, + self.processed_rows, + output_rows, + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + hash_index_resizes, + ); + } + None => { + log::info!( + "[{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + self.stage, + self.processed_rows, + output_rows, + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + hash_index_resizes, + ); + } + } - self.processed_rows = 0; - self.processed_bytes = 0; - self.first_block_start = None; - self.start = Instant::now(); + self.reset(); } } From 0591ca404eeba30592fd8e9eeabcf1709bef340a Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 27 Mar 2026 11:07:16 +0800 Subject: [PATCH 3/6] fix: need check_spill for every payload --- .../new_transform_final_aggregate.rs | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 8831f9abb340f..5f2f529d0c8ce 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -233,18 +233,6 @@ impl NewTransformFinalAggregate { Ok(()) } - fn handle_new_spilled(&mut self, payloads: Vec) -> Result<()> { - for payload in payloads { - let restored = self.spiller.restore(payload)?; - let AggregateMeta::Serialized(restored) = restored else { - unreachable!("unexpected aggregate meta, found type: {:?}", restored) - }; - self.handle_serialized(restored)?; - } - - Ok(()) - } - fn handle_meta(&mut self, meta: AggregateMeta, need_check_spill: bool) -> Result<()> { match meta { AggregateMeta::Serialized(payload) => { @@ -254,15 +242,22 @@ impl NewTransformFinalAggregate { self.handle_aggregate_payload(payload)?; } AggregateMeta::NewSpilled(payloads) => { - self.handle_new_spilled(payloads)?; + for payload in payloads { + let restored = self.spiller.restore(payload)?; + self.handle_meta(restored, need_check_spill)?; + } + return Ok(()); } AggregateMeta::NewBucketSpilled(payload) => { - self.handle_new_spilled(vec![payload])?; + let restored = self.spiller.restore(payload)?; + self.handle_meta(restored, need_check_spill)?; + return Ok(()); } AggregateMeta::Partitioned { bucket: _, data } => { for meta in data { self.handle_meta(meta, need_check_spill)?; } + return Ok(()); } _ => { unreachable!("unexpected aggregate meta, found type: {:?}", meta); From 85958186dc76bf21d9a09a997d4d81b36f8d8011 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 27 Mar 2026 13:20:21 +0800 Subject: [PATCH 4/6] test: add test --- tests/sqllogictests/suites/debug_local/fail.test | 0 tests/sqllogictests/suites/debug_local/success.test | 0 tests/sqllogictests/suites/tpch/aggregate_shuffle.test | 3 +++ 3 files changed, 3 insertions(+) delete mode 100644 tests/sqllogictests/suites/debug_local/fail.test delete mode 100644 tests/sqllogictests/suites/debug_local/success.test diff --git a/tests/sqllogictests/suites/debug_local/fail.test b/tests/sqllogictests/suites/debug_local/fail.test deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/sqllogictests/suites/debug_local/success.test b/tests/sqllogictests/suites/debug_local/success.test deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/sqllogictests/suites/tpch/aggregate_shuffle.test b/tests/sqllogictests/suites/tpch/aggregate_shuffle.test index 35329823c3fbd..940606f7157e9 100644 --- a/tests/sqllogictests/suites/tpch/aggregate_shuffle.test +++ b/tests/sqllogictests/suites/tpch/aggregate_shuffle.test @@ -30,5 +30,8 @@ include ./queries.test statement ok set force_aggregate_data_spill = 1; +statement ok +set max_aggregate_spill_level = 2; + # TPC-H TEST include ./queries.test \ No newline at end of file From 799bb740b97cb7ad53a76ab4034e637a3cdb03da Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Wed, 25 Mar 2026 13:57:49 +0800 Subject: [PATCH 5/6] polish statistics.rs --- .../pipelines/processors/transforms/aggregator/statistics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs index 52dbaea523e2e..90a183a06d87f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs @@ -92,10 +92,10 @@ impl AggregationStatistics { match task { Some((task_id, processor_id, spill_depth, spilled)) => { log::info!( - "[{}] Task completed: task_id={}, processor={}, spill_depth={}, spilled={}, {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + "{}[{}] Task completed: task_id={}, spill_depth={}, spilled={}, {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", self.stage, - task_id, processor_id, + task_id, spill_depth, spilled, self.processed_rows, From 9ce4bc517e01848b4257316f6584e0f71d614f64 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Fri, 27 Mar 2026 15:20:47 +0800 Subject: [PATCH 6/6] cargo fmt Revert "cargo fmt" This reverts commit c9cccc35fc072b62dbbdae35e990cf584dd5f5ff. --- .../aggregator/new_aggregate/new_transform_final_aggregate.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index 5f2f529d0c8ce..1a4944a1b1165 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -264,7 +264,8 @@ impl NewTransformFinalAggregate { } } - if need_check_spill && self.settings.check_spill() { + // If already trigger spilled for this task, we continue to spill the remaining part + if self.spilled_occurred || (need_check_spill && self.settings.check_spill()) { self.spill_out()?; }