diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 35c65f6af30ea..0f8670ee90d8d 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -74,10 +74,11 @@ impl AggregateHashTable { Self { direct_append: false, current_radix_bits: config.initial_radix_bits, - payload: PartitionedPayload::new( + payload: PartitionedPayload::new_with_start_bit( group_types, aggrs, 1 << config.initial_radix_bits, + config.partition_start_bit, vec![arena], ), hash_index: HashIndex::new(&config, capacity), @@ -105,10 +106,11 @@ impl AggregateHashTable { Self { direct_append: !need_init_entry, current_radix_bits: config.initial_radix_bits, - payload: PartitionedPayload::new( + payload: PartitionedPayload::new_with_start_bit( group_types, aggrs, 1 << config.initial_radix_bits, + config.partition_start_bit, vec![arena], ), hash_index, diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index b6c54cb39f0cd..a3f715caba094 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -154,6 +154,7 @@ pub struct HashTableConfig { // Max radix bits across all threads, this is a hint to repartition pub current_max_radix_bits: Arc, pub initial_radix_bits: u64, + pub partition_start_bit: u64, pub max_radix_bits: u64, pub repartition_radix_bits_incr: u64, pub block_fill_factor: f64, @@ -167,6 +168,7 @@ impl Default for HashTableConfig { Self { current_max_radix_bits: Arc::new(AtomicU64::new(3)), initial_radix_bits: 3, + partition_start_bit: 0, max_radix_bits: MAX_RADIX_BITS, repartition_radix_bits_incr: 2, block_fill_factor: 1.8, @@ -211,6 +213,11 @@ impl HashTableConfig { self } + pub fn with_partition_start_bit(mut self, partition_start_bit: u64) -> Self { + self.partition_start_bit = partition_start_bit; + self + } + pub fn with_experiment_hash_index(mut self, enable: bool) -> Self { self.enable_experiment_hash_index = enable; self diff --git a/src/query/expression/src/aggregate/partitioned_payload.rs b/src/query/expression/src/aggregate/partitioned_payload.rs index da498edece3e5..10ceeeadd61d7 100644 --- a/src/query/expression/src/aggregate/partitioned_payload.rs +++ b/src/query/expression/src/aggregate/partitioned_payload.rs @@ -36,10 +36,15 @@ struct PartitionMask { impl PartitionMask { fn new(partition_count: u64) -> Self { + Self::with_start_bit(partition_count, 0) + } + + fn with_start_bit(partition_count: u64, start_bit: u64) -> Self { let radix_bits = partition_count.trailing_zeros() as u64; debug_assert_eq!(1 << radix_bits, partition_count); + debug_assert!(start_bit + radix_bits <= 48); - let shift = 48 - radix_bits; + let shift = 48 - start_bit - radix_bits; let mask = ((1 << radix_bits) - 1) << shift; Self { mask, shift } @@ -59,6 +64,7 @@ pub struct PartitionedPayload { pub arenas: Vec>, + partition_start_bit: u64, partition_mask: PartitionMask, } @@ -71,6 +77,16 @@ impl PartitionedPayload { aggrs: Vec, partition_count: u64, arenas: Vec>, + ) -> Self { + Self::new_with_start_bit(group_types, aggrs, partition_count, 0, arenas) + } + + pub fn new_with_start_bit( + group_types: Vec, + aggrs: Vec, + partition_count: u64, + partition_start_bit: u64, + arenas: Vec>, ) -> Self { let states_layout = if !aggrs.is_empty() { Some(get_states_layout(&aggrs).unwrap()) @@ -101,7 +117,8 @@ impl PartitionedPayload { row_layout, arenas, - partition_mask: PartitionMask::new(partition_count), + partition_start_bit, + partition_mask: PartitionMask::with_start_bit(partition_count, partition_start_bit), } } @@ -169,11 +186,17 @@ impl PartitionedPayload { group_types, aggrs, arenas, + partition_start_bit, .. } = self; - let mut new_partition_payload = - PartitionedPayload::new(group_types, aggrs, new_partition_count as u64, arenas); + let mut new_partition_payload = PartitionedPayload::new_with_start_bit( + group_types, + aggrs, + new_partition_count as u64, + partition_start_bit, + arenas, + ); state.clear(); for payload in payloads.into_iter() { @@ -184,7 +207,9 @@ impl PartitionedPayload { } pub fn combine(&mut self, other: PartitionedPayload, state: &mut PayloadFlushState) { - if other.partition_count() == self.partition_count() { + if other.partition_count() == self.partition_count() + && other.partition_start_bit == self.partition_start_bit + { for (l, r) in self.payloads.iter_mut().zip(other.payloads.into_iter()) { l.combine(r); } @@ -293,3 +318,19 @@ impl PartitionedPayload { self.payloads.iter().map(|x| x.memory_size()).sum() } } + +#[cfg(test)] +mod tests { + use super::PartitionMask; + + #[test] + fn test_partition_mask_with_start_bit() { + let top_bit_mask = PartitionMask::new(2); + assert_eq!(top_bit_mask.index(1_u64 << 47), 1); + assert_eq!(top_bit_mask.index(1_u64 << 44), 0); + + let shifted_mask = PartitionMask::with_start_bit(2, 3); + assert_eq!(shifted_mask.index(1_u64 << 47), 0); + assert_eq!(shifted_mask.index(1_u64 << 44), 1); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs index 1b94a7cba986f..47fafc6ed3c1c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::AtomicU64; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -49,6 +50,7 @@ fn build_partition_bucket_experimental( shuffle_mode: AggregateShuffleMode, ) -> Result<()> { let mut final_parallelism = ctx.get_settings().get_max_threads()? as usize; + let base_consumed_bits = shuffle_mode.determine_radix_bits(); match shuffle_mode { AggregateShuffleMode::Row => { let schema = params.spill_schema(); @@ -107,6 +109,7 @@ fn build_partition_bucket_experimental( let mut builder = TransformPipeBuilder::create(); let (tx, rx) = async_channel::unbounded(); + let next_task_id = Arc::new(AtomicU64::new(1)); for id in 0..final_parallelism { let input_port = InputPort::create(); let output_port = OutputPort::create(); @@ -115,9 +118,11 @@ fn build_partition_bucket_experimental( output_port.clone(), params.clone(), id, + base_consumed_bits, ctx.clone(), tx.clone(), rx.clone(), + next_task_id.clone(), )?; builder.add_transform(input_port, output_port, ProcessorPtr::create(processor)); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs index c6b7031882b07..1a4944a1b1165 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_final_aggregate.rs @@ -15,6 +15,8 @@ use std::any::Any; use std::mem; use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use async_channel::Receiver; use async_channel::Sender; @@ -46,6 +48,7 @@ use crate::pipelines::processors::transforms::aggregator::transform_aggregate_pa use crate::sessions::QueryContext; const SPILL_BUCKET_NUM: usize = 2; +const SPILL_BUCKET_BITS: u64 = SPILL_BUCKET_NUM.trailing_zeros() as u64; enum Stage { Input, @@ -53,6 +56,7 @@ enum Stage { } pub struct FinalAggregateTask { + task_id: u64, spilled_depth: usize, spilled_payload: Vec, tx: Sender, @@ -74,9 +78,13 @@ pub struct NewTransformFinalAggregate { flush_state: PayloadFlushState, statistics: AggregationStatistics, _id: usize, + base_consumed_bits: u64, + max_partition_depth: usize, + current_partition_depth: usize, spiller: NewAggregateSpiller, settings: MemorySettings, max_aggregate_spill_level: usize, + next_task_id: Arc, } impl NewTransformFinalAggregate { @@ -85,21 +93,20 @@ impl NewTransformFinalAggregate { output: Arc, params: Arc, _id: usize, + base_consumed_bits: u64, ctx: Arc, tx: Sender, rx: Receiver, + next_task_id: Arc, ) -> Result> { let settings = ctx.get_settings(); - let max_aggregate_spill_level = settings.get_max_aggregate_spill_level()?; + let available_partition_depths = + (48_u64.saturating_sub(base_consumed_bits) / SPILL_BUCKET_BITS) as usize; + let max_partition_depth = available_partition_depths.saturating_sub(1); + let max_aggregate_spill_level = + (settings.get_max_aggregate_spill_level()? as usize).min(max_partition_depth); - let hashtable = AggregateHashTable::new( - params.group_data_types.clone(), - params.aggregate_functions.clone(), - HashTableConfig::default() - .with_initial_radix_bits(SPILL_BUCKET_NUM.trailing_zeros() as u64) - .with_experiment_hash_index(params.enable_experiment_hash_index), - Arc::new(Bump::new()), - ); + let hashtable = Self::create_hashtable(¶ms, base_consumed_bits, 0); let flush_state = PayloadFlushState::default(); let spiller = NewAggregateSpiller::try_create( @@ -128,14 +135,67 @@ impl NewTransformFinalAggregate { flush_state, statistics: AggregationStatistics::new("NewFinalAggregate"), _id, + base_consumed_bits, + max_partition_depth, + current_partition_depth: 0, spiller, settings: MemorySettings::from_aggregate_settings(&ctx)?, - max_aggregate_spill_level: max_aggregate_spill_level as usize, + max_aggregate_spill_level, + next_task_id, })) } } impl NewTransformFinalAggregate { + fn next_task_id(&self) -> u64 { + self.next_task_id.fetch_add(1, Ordering::Relaxed) + } + + fn partition_start_bit(base_consumed_bits: u64, spilled_depth: usize) -> u64 { + base_consumed_bits + spilled_depth as u64 * SPILL_BUCKET_BITS + } + + fn create_hashtable( + params: &Arc, + base_consumed_bits: u64, + spilled_depth: usize, + ) -> AggregateHashTable { + AggregateHashTable::new( + params.group_data_types.clone(), + params.aggregate_functions.clone(), + HashTableConfig::default() + .with_initial_radix_bits(SPILL_BUCKET_BITS) + .with_partition_start_bit(Self::partition_start_bit( + base_consumed_bits, + spilled_depth, + )) + .with_experiment_hash_index(params.enable_experiment_hash_index), + Arc::new(Bump::new()), + ) + } + + fn reset_hashtable(&mut self, spilled_depth: usize) { + let partition_depth = spilled_depth.min(self.max_partition_depth); + self.hashtable = HashTable::AggregateHashTable(Self::create_hashtable( + &self.params, + self.base_consumed_bits, + partition_depth, + )); + self.current_partition_depth = partition_depth; + } + + // One final-aggregate processor handles both the original input stream and + // recursively spilled tasks received from the channel. Those tasks may + // belong to different spill depths, and each depth must consume a different + // hash-bit window when building the internal partitions. The window start is + // `base_consumed_bits + spilled_depth * SPILL_BUCKET_BITS`. + fn ensure_spill_depth(&mut self, spilled_depth: usize) { + let partition_depth = spilled_depth.min(self.max_partition_depth); + if self.current_partition_depth != partition_depth { + self.reset_hashtable(spilled_depth); + } + } + fn handle_serialized(&mut self, payload: SerializedPayload) -> Result<()> { if payload.data_block.is_empty() { return Ok(()); @@ -173,18 +233,6 @@ impl NewTransformFinalAggregate { Ok(()) } - fn handle_new_spilled(&mut self, payloads: Vec) -> Result<()> { - for payload in payloads { - let restored = self.spiller.restore(payload)?; - let AggregateMeta::Serialized(restored) = restored else { - unreachable!("unexpected aggregate meta, found type: {:?}", restored) - }; - self.handle_serialized(restored)?; - } - - Ok(()) - } - fn handle_meta(&mut self, meta: AggregateMeta, need_check_spill: bool) -> Result<()> { match meta { AggregateMeta::Serialized(payload) => { @@ -194,22 +242,30 @@ impl NewTransformFinalAggregate { self.handle_aggregate_payload(payload)?; } AggregateMeta::NewSpilled(payloads) => { - self.handle_new_spilled(payloads)?; + for payload in payloads { + let restored = self.spiller.restore(payload)?; + self.handle_meta(restored, need_check_spill)?; + } + return Ok(()); } AggregateMeta::NewBucketSpilled(payload) => { - self.handle_new_spilled(vec![payload])?; + let restored = self.spiller.restore(payload)?; + self.handle_meta(restored, need_check_spill)?; + return Ok(()); } AggregateMeta::Partitioned { bucket: _, data } => { for meta in data { self.handle_meta(meta, need_check_spill)?; } + return Ok(()); } _ => { unreachable!("unexpected aggregate meta, found type: {:?}", meta); } } - if need_check_spill && self.settings.check_spill() { + // If already trigger spilled for this task, we continue to spill the remaining part + if self.spilled_occurred || (need_check_spill && self.settings.check_spill()) { self.spill_out()?; } @@ -219,10 +275,6 @@ impl NewTransformFinalAggregate { fn spill_out(&mut self) -> Result<()> { self.spilled_occurred = true; if let HashTable::AggregateHashTable(v) = mem::take(&mut self.hashtable) { - let group_types = v.payload.group_types.clone(); - let aggrs = v.payload.aggrs.clone(); - let config = v.config.clone(); - for (bucket, payload) in v.payload.payloads.into_iter().enumerate() { if payload.len() == 0 { continue; @@ -231,34 +283,45 @@ impl NewTransformFinalAggregate { let data_block = payload.aggregate_flush_all()?.consume_convert_to_full(); self.spiller.spill(bucket, data_block)?; } - - let arena = Arc::new(Bump::new()); - self.hashtable = HashTable::AggregateHashTable(AggregateHashTable::new( - group_types, - aggrs, - config, - arena, - )); } else { unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state during spill check") } + self.reset_hashtable(self.current_partition_depth); Ok(()) } - fn finish(&mut self, spilled_depth: usize, tx: Sender) -> Result<()> { + fn finish( + &mut self, + task_id: Option, + spilled_depth: usize, + tx: Sender, + ) -> Result<()> { if self.spilled_occurred { + let (output_rows, hash_index_resizes) = match &self.hashtable { + HashTable::AggregateHashTable(ht) => { + (ht.payload.len(), ht.hash_index_resize_count()) + } + _ => unreachable!("[TRANSFORM-AGGREGATOR] Invalid hash table state before spill"), + }; self.spill_finish(spilled_depth, tx)?; + if let Some(task_id) = task_id { + self.statistics.log_task_finish_statistics( + task_id, + self._id, + spilled_depth, + output_rows, + hash_index_resizes, + true, + ); + } else { + self.statistics.reset(); + } self.spilled_occurred = false; return Ok(()); } if let HashTable::AggregateHashTable(mut ht) = mem::take(&mut self.hashtable) { - let group_types = ht.payload.group_types.clone(); - let aggrs = ht.payload.aggrs.clone(); - let config = ht.config.clone(); - - self.statistics.log_finish_statistics(&ht); let mut blocks = vec![]; self.flush_state.clear(); @@ -280,12 +343,19 @@ impl NewTransformFinalAggregate { self.output.push_data(Ok(concat)); } } - self.hashtable = HashTable::AggregateHashTable(AggregateHashTable::new( - group_types, - aggrs, - config, - Arc::new(Bump::new()), - )); + if let Some(task_id) = task_id { + self.statistics.log_task_finish_statistics( + task_id, + self._id, + spilled_depth, + ht.payload.len(), + ht.hash_index_resize_count(), + false, + ); + } else { + self.statistics.log_finish_statistics(&ht); + } + self.reset_hashtable(self.current_partition_depth); } Ok(()) @@ -300,9 +370,25 @@ impl NewTransformFinalAggregate { chunks[payload.bucket as usize].push(payload); } - for chunk in chunks.into_iter() { + let next_spill_depth = spilled_depth + 1; + for (bucket, chunk) in chunks.into_iter().enumerate() { + let task_id = self.next_task_id(); + let rows = chunk + .iter() + .map(|payload| payload.row_group.num_rows() as usize) + .sum::(); + log::info!( + "Spill finish emitted task: task_id={}, processor={}, spill_depth={}, bucket={}, payloads={}, rows={}", + task_id, + self._id, + spilled_depth, + bucket, + chunk.len(), + rows, + ); let spilled = FinalAggregateTask { - spilled_depth: spilled_depth + 1, + task_id, + spilled_depth: next_spill_depth, spilled_payload: chunk, tx: tx.clone(), }; @@ -381,22 +467,26 @@ impl Processor for NewTransformFinalAggregate { fn process(&mut self) -> Result<()> { let input_data = self.input_data.take(); if let Some(meta) = input_data { - self.handle_meta(meta, true)?; + self.ensure_spill_depth(0); + self.handle_meta(meta, self.max_aggregate_spill_level > 0)?; return Ok(()); } else if let Some(mut task) = self.channel_data.take() { + self.ensure_spill_depth(task.spilled_depth); + self.statistics.reset(); let meta = AggregateMeta::NewSpilled(mem::take(&mut task.spilled_payload)); if task.spilled_depth >= self.max_aggregate_spill_level { self.handle_meta(meta, false)?; } else { self.handle_meta(meta, true)?; } - self.finish(task.spilled_depth, task.tx)?; + self.finish(Some(task.task_id), task.spilled_depth, task.tx)?; return Ok(()); } else { + self.ensure_spill_depth(0); let sender = mem::take(&mut self.tx) .expect("logic error: called finished for input data more than once"); - self.finish(0, sender)?; + self.finish(None, 0, sender)?; } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs index ebf538a9c1f19..90a183a06d87f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/statistics.rs @@ -45,7 +45,43 @@ impl AggregationStatistics { } } + pub fn reset(&mut self) { + self.processed_rows = 0; + self.processed_bytes = 0; + self.first_block_start = None; + self.start = Instant::now(); + } + pub fn log_finish_statistics(&mut self, hashtable: &AggregateHashTable) { + self.log_finish( + hashtable.payload.len(), + hashtable.hash_index_resize_count(), + None, + ); + } + + pub fn log_task_finish_statistics( + &mut self, + task_id: u64, + processor_id: usize, + spill_depth: usize, + output_rows: usize, + hash_index_resizes: usize, + spilled: bool, + ) { + self.log_finish( + output_rows, + hash_index_resizes, + Some((task_id, processor_id, spill_depth, spilled)), + ); + } + + fn log_finish( + &mut self, + output_rows: usize, + hash_index_resizes: usize, + task: Option<(u64, usize, usize, bool)>, + ) { let elapsed = self.start.elapsed().as_secs_f64(); let real_elapsed = self .first_block_start @@ -53,22 +89,41 @@ impl AggregationStatistics { .map(|t| t.elapsed().as_secs_f64()) .unwrap_or(elapsed); - log::info!( - "[TRANSFORM-AGGREGATOR][{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", - self.stage, - self.processed_rows, - hashtable.payload.len(), - elapsed, - real_elapsed, - convert_number_size(self.processed_rows as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64 / elapsed), - convert_byte_size(self.processed_bytes as f64), - hashtable.hash_index_resize_count(), - ); + match task { + Some((task_id, processor_id, spill_depth, spilled)) => { + log::info!( + "{}[{}] Task completed: task_id={}, spill_depth={}, spilled={}, {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + self.stage, + processor_id, + task_id, + spill_depth, + spilled, + self.processed_rows, + output_rows, + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + hash_index_resizes, + ); + } + None => { + log::info!( + "[{}] Aggregation completed: {} → {} rows in {:.2}s (real: {:.2}s), throughput: {} rows/sec, {}/sec, total: {}, hash index resizes: {}", + self.stage, + self.processed_rows, + output_rows, + elapsed, + real_elapsed, + convert_number_size(self.processed_rows as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64 / elapsed), + convert_byte_size(self.processed_bytes as f64), + hash_index_resizes, + ); + } + } - self.processed_rows = 0; - self.processed_bytes = 0; - self.first_block_start = None; - self.start = Instant::now(); + self.reset(); } } diff --git a/tests/sqllogictests/suites/debug_local/fail.test b/tests/sqllogictests/suites/debug_local/fail.test deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/sqllogictests/suites/debug_local/success.test b/tests/sqllogictests/suites/debug_local/success.test deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/sqllogictests/suites/tpch/aggregate_shuffle.test b/tests/sqllogictests/suites/tpch/aggregate_shuffle.test index 35329823c3fbd..940606f7157e9 100644 --- a/tests/sqllogictests/suites/tpch/aggregate_shuffle.test +++ b/tests/sqllogictests/suites/tpch/aggregate_shuffle.test @@ -30,5 +30,8 @@ include ./queries.test statement ok set force_aggregate_data_spill = 1; +statement ok +set max_aggregate_spill_level = 2; + # TPC-H TEST include ./queries.test \ No newline at end of file