diff --git a/src/query/expression/src/kernels/stream_partition.rs b/src/query/expression/src/kernels/stream_partition.rs index 7b9af52186240..77c574d4f5020 100644 --- a/src/query/expression/src/kernels/stream_partition.rs +++ b/src/query/expression/src/kernels/stream_partition.rs @@ -46,6 +46,77 @@ use crate::with_number_mapped_type; struct PartitionBlockBuilder { num_rows: usize, columns_builder: Vec, + // Fully built chunks sealed during append, so callers can consume them + // without building a large block first and slicing it afterward. + ready_blocks: Vec, +} + +impl PartitionBlockBuilder { + fn create(block: &DataBlock) -> Self { + let mut columns_builder = Vec::with_capacity(block.num_columns()); + + for column in block.columns() { + let data_type = column.data_type(); + columns_builder.push(ColumnBuilder::with_capacity(&data_type, 0)); + } + + Self { + num_rows: 0, + columns_builder, + ready_blocks: Vec::new(), + } + } + + fn memory_size(&self) -> usize { + self.columns_builder.iter().map(|x| x.memory_size()).sum() + } + + fn has_pending_data(&self) -> bool { + self.num_rows != 0 || !self.ready_blocks.is_empty() + } + + fn take_active_block(&mut self, reserve_by_len: bool) -> Option { + if self.num_rows == 0 { + return None; + } + + let mut columns = Vec::with_capacity(self.columns_builder.len()); + let columns_builder = std::mem::take(&mut self.columns_builder); + self.columns_builder.reserve(columns_builder.len()); + + for column_builder in columns_builder { + let data_type = column_builder.data_type(); + let capacity = if reserve_by_len { + column_builder.len() + } else { + 0 + }; + let new_builder = ColumnBuilder::with_capacity(&data_type, capacity); + self.columns_builder.push(new_builder); + columns.push(BlockEntry::from(column_builder.build())); + } + + let num_rows = std::mem::take(&mut self.num_rows); + Some(DataBlock::new(columns, num_rows)) + } + + fn seal_active_block(&mut self) { + if let Some(block) = self.take_active_block(true) { + self.ready_blocks.push(block); + } + } + + fn take_ready_blocks(&mut self) -> Vec { + std::mem::take(&mut self.ready_blocks) + } + + fn take_all_blocks(&mut self) -> Vec { + let mut blocks = self.take_ready_blocks(); + if let Some(block) = self.take_active_block(false) { + blocks.push(block); + } + blocks + } } pub struct BlockPartitionStream { @@ -94,21 +165,12 @@ impl BlockPartitionStream { self.partitions.reserve(self.scatter_size); for _ in 0..self.scatter_size { - let mut columns_builder = Vec::with_capacity(block.num_columns()); - - for column in block.columns() { - let data_type = column.data_type(); - columns_builder.push(ColumnBuilder::with_capacity(&data_type, 0)); - } - - let block_builder = PartitionBlockBuilder { - num_rows: 0, - columns_builder, - }; - self.partitions.push(block_builder); + self.partitions.push(PartitionBlockBuilder::create(&block)); } } + let avg_row_bytes = self.estimate_avg_row_bytes(&block); + let columns = block .take_columns() .into_iter() @@ -119,21 +181,11 @@ impl BlockPartitionStream { DataBlock::divide_indices_by_scatter_size(&indices, self.scatter_size); for (partition_id, indices) in scatter_indices.iter().enumerate() { - self.partitions[partition_id].num_rows += indices.len(); - } - - for (column_idx, column) in columns.into_iter().enumerate() { - for (partition_id, indices) in scatter_indices.iter().enumerate() { - if indices.is_empty() { - continue; - } - - let partition = &mut self.partitions[partition_id]; - let column_builder = &mut partition.columns_builder[column_idx]; - copy_column(indices, &column, column_builder); + if indices.is_empty() { + continue; } - drop(column); + self.append_partition_rows(partition_id, indices, &columns, avg_row_bytes); } if !out_ready { @@ -142,29 +194,14 @@ impl BlockPartitionStream { let mut ready_blocks = Vec::with_capacity(self.partitions.len()); for (id, partition) in self.partitions.iter_mut().enumerate() { - let memory_size = partition - .columns_builder - .iter() - .map(|x| x.memory_size()) - .sum::(); - - let rows = partition.num_rows; - - if memory_size >= self.bytes_threshold || rows >= self.rows_threshold { - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); - - for column_builder in columns_builder { - let historical_size = column_builder.len(); - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, historical_size); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); - } + // Once this partition has produced any ready chunk, flush the + // current active tail together in this out_ready round. + if !partition.ready_blocks.is_empty() { + partition.seal_active_block(); + } - partition.num_rows = 0; - ready_blocks.push((id, DataBlock::new(columns, rows))); + for block in partition.take_ready_blocks() { + ready_blocks.push((id, block)); } } @@ -179,7 +216,7 @@ impl BlockPartitionStream { } for (partition_id, data) in self.partitions.iter().enumerate() { - if data.num_rows != 0 { + if data.has_pending_data() { partition_ids.push(partition_id); } } @@ -200,55 +237,118 @@ impl BlockPartitionStream { continue; } - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); - - for column_builder in columns_builder { - let historical_size = column_builder.len(); - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, historical_size); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); + for block in partition.take_all_blocks() { + take_blocks.push((id, block)); } - - let num_rows = partition.num_rows; - partition.num_rows = 0; - take_blocks.push((id, DataBlock::new(columns, num_rows))); } take_blocks } - pub fn finalize_partition(&mut self, partition_id: usize) -> Option { + pub fn finalize_partition(&mut self, partition_id: usize) -> Vec { if !self.initialize { - return None; + return vec![]; } let partition = &mut self.partitions[partition_id]; - let num_rows = partition.num_rows; - - if num_rows == 0 { - return None; + if partition.num_rows == 0 { + return partition.take_ready_blocks(); } - let mut columns = Vec::with_capacity(partition.columns_builder.len()); - let columns_builder = std::mem::take(&mut partition.columns_builder); - partition.columns_builder.reserve(columns_builder.len()); + partition.take_all_blocks() + } - for column_builder in columns_builder { - let data_type = column_builder.data_type(); - let new_builder = ColumnBuilder::with_capacity(&data_type, 0); - partition.columns_builder.push(new_builder); - columns.push(BlockEntry::from(column_builder.build())); + fn append_partition_rows( + &mut self, + partition_id: usize, + indices: &[u32], + columns: &[Column], + avg_row_bytes: usize, + ) { + let mut offset = 0; + let rows_threshold = self.rows_threshold; + let bytes_threshold = self.bytes_threshold; + + while offset < indices.len() { + let chunk_limit = { + let partition = &mut self.partitions[partition_id]; + let limit = chunk_limit(partition, avg_row_bytes, rows_threshold, bytes_threshold); + if limit == 0 && partition.num_rows != 0 { + partition.seal_active_block(); + continue; + } + + limit.max(1) + }; + + let chunk_len = (indices.len() - offset).min(chunk_limit); + let chunk = &indices[offset..offset + chunk_len]; + + { + let partition = &mut self.partitions[partition_id]; + partition.num_rows += chunk_len; + + for (column_idx, column) in columns.iter().enumerate() { + let column_builder = &mut partition.columns_builder[column_idx]; + copy_column(chunk, column, column_builder); + } + + if should_seal_active_block( + partition, + chunk_len, + chunk_limit, + rows_threshold, + bytes_threshold, + ) { + partition.seal_active_block(); + } + } + + offset += chunk_len; } + } - partition.num_rows = 0; - Some(DataBlock::new(columns, num_rows)) + fn estimate_avg_row_bytes(&self, block: &DataBlock) -> usize { + block.memory_size().div_ceil(block.num_rows()).max(1) } } +fn chunk_limit( + partition: &PartitionBlockBuilder, + avg_row_bytes: usize, + rows_threshold: usize, + bytes_threshold: usize, +) -> usize { + let rows_limit = rows_threshold.saturating_sub(partition.num_rows); + + // How many more rows the current active chunk can still accept under the + // bytes rule, based on the input block's avg_row_bytes estimate. + let bytes_limit = if bytes_threshold == usize::MAX { + usize::MAX + } else { + let memory_size = partition.memory_size(); + bytes_threshold.saturating_sub(memory_size) / avg_row_bytes + }; + + rows_limit.min(bytes_limit) +} + +fn should_seal_active_block( + partition: &PartitionBlockBuilder, + chunk_len: usize, + chunk_limit: usize, + rows_threshold: usize, + bytes_threshold: usize, +) -> bool { + let threshold_reached = + partition.num_rows >= rows_threshold || partition.memory_size() >= bytes_threshold; + + let chunk_budget_exhausted = chunk_limit != usize::MAX && chunk_len == chunk_limit; + + threshold_reached || chunk_budget_exhausted +} + pub fn copy_column(indices: &[I], from: &Column, to: &mut ColumnBuilder) { match to { ColumnBuilder::EmptyArray { len } => match from { @@ -532,8 +632,205 @@ fn copy_array( from: &ArrayColumn, indices: &[I], ) { - // TODO: for index in indices { unsafe { to.push(from.index_unchecked(index.to_usize())) } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::FromData; + use crate::Scalar; + use crate::types::DataType; + use crate::types::Int32Type; + + fn make_block(values: Vec) -> DataBlock { + DataBlock::new_from_columns(vec![Int32Type::from_data(values)]) + } + + use crate::types::NumberColumn; + + fn collect_column_values(block: &DataBlock) -> Vec { + let col = block.columns()[0].to_column(); + match col.as_number().unwrap() { + NumberColumn::Int32(buf) => buf.to_vec(), + _ => panic!("expected Int32 column"), + } + } + + #[test] + fn test_partition_no_split_under_threshold() { + let mut stream = BlockPartitionStream::create(100, 0, 2); + // All indices go to partition 0 + let indices = vec![0u64; 50]; + let block = make_block((0..50).collect()); + let result = stream.partition(indices, block, true); + // 50 rows < 100 threshold, nothing emitted + assert!(result.is_empty()); + } + + #[test] + fn test_partition_emit_at_threshold() { + let mut stream = BlockPartitionStream::create(10, 0, 1); + let indices = vec![0u64; 10]; + let block = make_block((0..10).collect()); + let result = stream.partition(indices, block, true); + assert_eq!(result.len(), 1); + assert_eq!(result[0].0, 0); + assert_eq!(result[0].1.num_rows(), 10); + } + + #[test] + fn test_partition_splits_large_block() { + let mut stream = BlockPartitionStream::create(10, 0, 1); + // Push 25 rows into partition 0 + let indices = vec![0u64; 25]; + let block = make_block((0..25).collect()); + let result = stream.partition(indices, block, true); + // Should be split into blocks of 10, 10, 5 + assert_eq!(result.len(), 3); + assert_eq!(result[0].1.num_rows(), 10); + assert_eq!(result[1].1.num_rows(), 10); + assert_eq!(result[2].1.num_rows(), 5); + // All should have partition_id 0 + assert!(result.iter().all(|(id, _)| *id == 0)); + // Verify data integrity + let all_values: Vec = result + .iter() + .flat_map(|(_, b)| collect_column_values(b)) + .collect(); + assert_eq!(all_values, (0..25).collect::>()); + + let indices = vec![0u64; 20]; + let block = make_block((0..20).collect()); + let result = stream.partition(indices, block, true); + // Should be split into blocks of 10, 10 + assert_eq!(result.len(), 2); + assert_eq!(result[0].1.num_rows(), 10); + assert_eq!(result[1].1.num_rows(), 10); + } + + #[test] + fn test_partition_multiple_partitions_split() { + let mut stream = BlockPartitionStream::create(5, 0, 2); + // 8 rows to partition 0, 7 rows to partition 1 + let mut indices = vec![0u64; 8]; + indices.extend(vec![1u64; 7]); + let block = make_block((0..15).collect()); + let result = stream.partition(indices, block, true); + let p0: Vec<_> = result.iter().filter(|(id, _)| *id == 0).collect(); + let p1: Vec<_> = result.iter().filter(|(id, _)| *id == 1).collect(); + // partition 0: 8 rows -> split into 5 + 3 + assert_eq!(p0.len(), 2); + assert_eq!(p0[0].1.num_rows(), 5); + assert_eq!(p0[1].1.num_rows(), 3); + // partition 1: 7 rows -> split into 5 + 2 + assert_eq!(p1.len(), 2); + assert_eq!(p1[0].1.num_rows(), 5); + assert_eq!(p1[1].1.num_rows(), 2); + } + + #[test] + fn test_partition_prefers_bytes_threshold() { + let mut stream = BlockPartitionStream::create(10, 17, 1); + let indices = vec![0u64; 9]; + let block = make_block((0..9).collect()); + + let result = stream.partition(indices, block, true); + + assert_eq!(result.len(), 3); + assert_eq!(result[0].1.num_rows(), 4); + assert_eq!(result[1].1.num_rows(), 4); + assert_eq!(result[2].1.num_rows(), 1); + + let all_values: Vec = result + .iter() + .flat_map(|(_, b)| collect_column_values(b)) + .collect(); + assert_eq!(all_values, (0..9).collect::>()); + } + + #[test] + fn test_finalize_partition_splits() { + let mut stream = BlockPartitionStream::create(10, 0, 1); + // Push 25 rows but don't emit (out_ready=false) + let indices = vec![0u64; 25]; + let block = make_block((0..25).collect()); + let result = stream.partition(indices, block, false); + assert!(result.is_empty()); + assert_eq!(stream.partition_ids(), vec![0]); + // Finalize should split + let blocks = stream.finalize_partition(0); + assert_eq!(blocks.len(), 3); + assert_eq!(blocks[0].num_rows(), 10); + assert_eq!(blocks[1].num_rows(), 10); + assert_eq!(blocks[2].num_rows(), 5); + } + + #[test] + fn test_finalize_empty_partition() { + let mut stream = BlockPartitionStream::create(10, 0, 2); + // Initialize by pushing some data to partition 0 + let indices = vec![0u64; 5]; + let block = make_block(vec![1, 2, 3, 4, 5]); + stream.partition(indices, block, false); + // Partition 1 has no data + let blocks = stream.finalize_partition(1); + assert!(blocks.is_empty()); + } + + #[test] + fn test_take_partitions_splits() { + let mut stream = BlockPartitionStream::create(5, 0, 3); + // Push 12 rows to partition 0, 8 to partition 1, 3 to partition 2 + let mut indices = vec![0u64; 12]; + indices.extend(vec![1u64; 8]); + indices.extend(vec![2u64; 3]); + let block = make_block((0..23).collect()); + stream.partition(indices, block, false); + + // Take all except partition 2 + let excluded: HashSet = [2].into_iter().collect(); + let result = stream.take_partitions(&excluded); + + let p0: Vec<_> = result.iter().filter(|(id, _)| *id == 0).collect(); + let p1: Vec<_> = result.iter().filter(|(id, _)| *id == 1).collect(); + let p2: Vec<_> = result.iter().filter(|(id, _)| *id == 2).collect(); + // partition 0: 12 rows -> 5 + 5 + 2 + assert_eq!(p0.len(), 3); + assert_eq!(p0[0].1.num_rows(), 5); + assert_eq!(p0[1].1.num_rows(), 5); + assert_eq!(p0[2].1.num_rows(), 2); + // partition 1: 8 rows -> 5 + 3 + assert_eq!(p1.len(), 2); + assert_eq!(p1[0].1.num_rows(), 5); + assert_eq!(p1[1].1.num_rows(), 3); + // partition 2 excluded + assert!(p2.is_empty()); + } + + #[test] + fn test_split_by_bytes_when_no_row_threshold() { + // rows_threshold=0 means usize::MAX, so splitting is driven only by + // bytes_threshold. Int32 avg row bytes is 4, thus bytes_threshold=9 + // should split into chunks of 2, 2, 2, 1. + let mut stream = BlockPartitionStream::create(0, 9, 1); + let indices = vec![0u64; 7]; + let block = make_block((0..7).collect()); + + let result = stream.partition(indices, block, false); + assert!(result.is_empty()); + assert_eq!(stream.partition_ids(), vec![0]); + + let blocks = stream.finalize_partition(0); + assert_eq!(blocks.len(), 4); + assert_eq!(blocks[0].num_rows(), 2); + assert_eq!(blocks[1].num_rows(), 2); + assert_eq!(blocks[2].num_rows(), 2); + assert_eq!(blocks[3].num_rows(), 1); + + let all_values: Vec = blocks.iter().flat_map(collect_column_values).collect(); + assert_eq!(all_values, (0..7).collect::>()); + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs index 38ccdb437a465..bdc9ca039c0c1 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs @@ -239,7 +239,7 @@ impl PartitionStream for LocalPartitionStream { let ids = self.partition_stream.partition_ids(); let mut pending_blocks = Vec::with_capacity(ids.len()); for id in ids { - if let Some(block) = self.partition_stream.finalize_partition(id) { + for block in self.partition_stream.finalize_partition(id) { pending_blocks.push((id, block)); } } @@ -291,7 +291,7 @@ impl SharedPartitionStream { let mut pending_blocks = Vec::with_capacity(ids.len()); for id in ids { - if let Some(block) = inner.partition_stream.finalize_partition(id) { + for block in inner.partition_stream.finalize_partition(id) { pending_blocks.push((id, block)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index b37ab57a9c389..8010d2792729a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -291,8 +291,8 @@ impl HashJoinSpiller { if let Some(buffer_blocks) = self.restore_cross_buffer(partition_id)? { data_blocks.extend(buffer_blocks); } - } else if let Some(buffer_block) = self.restore_buffer(partition_id) { - data_blocks.push(buffer_block); + } else { + data_blocks.extend(self.restore_buffer(partition_id)); } } @@ -324,7 +324,7 @@ impl HashJoinSpiller { Ok(data_blocks) } - fn restore_buffer(&mut self, partition_id: usize) -> Option { + fn restore_buffer(&mut self, partition_id: usize) -> Vec { self.block_partition_stream.finalize_partition(partition_id) } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs index e60310020408d..4b81076417a01 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs @@ -334,7 +334,7 @@ impl GraceHashJoin { let ready_partitions_id = self.build_partition_stream.partition_ids(); let mut ready_partitions = Vec::with_capacity(ready_partitions_id.len()); for id in ready_partitions_id { - if let Some(data) = self.build_partition_stream.finalize_partition(id) { + for data in self.build_partition_stream.finalize_partition(id) { ready_partitions.push((id, data)); } } @@ -345,7 +345,7 @@ impl GraceHashJoin { let ready_partitions_id = self.probe_partition_stream.partition_ids(); for id in ready_partitions_id { - if let Some(data_block) = self.probe_partition_stream.finalize_partition(id) { + for data_block in self.probe_partition_stream.finalize_partition(id) { self.partitions[id].writer.write(data_block)?; self.partitions[id].writer.flush()?; } diff --git a/src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs b/src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs index 2881e8c9afb76..41007a8787229 100644 --- a/src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs +++ b/src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs @@ -129,7 +129,7 @@ impl Processor for HashSendSink { let mut futures = Vec::new(); for partition_id in 0..self.channels.len() { - if let Some(block) = self.partition_stream.finalize_partition(partition_id) { + for block in self.partition_stream.finalize_partition(partition_id) { if block.is_empty() { continue; } diff --git a/src/query/service/src/servers/flight/v1/exchange/hash_send_transform.rs b/src/query/service/src/servers/flight/v1/exchange/hash_send_transform.rs index 5cf4a674eb972..b5d360f529abf 100644 --- a/src/query/service/src/servers/flight/v1/exchange/hash_send_transform.rs +++ b/src/query/service/src/servers/flight/v1/exchange/hash_send_transform.rs @@ -162,7 +162,7 @@ impl Processor for HashSendTransform { let mut futures = Vec::new(); for partition_id in 0..self.channels.len() { - if let Some(block) = self.partition_stream.finalize_partition(partition_id) { + for block in self.partition_stream.finalize_partition(partition_id) { if block.is_empty() { continue; }