Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum ProfileStatisticsName {
RuntimeFilterInlistMinMaxTime,
RuntimeFilterSpatialTime,
RuntimeFilterBuildTime,
ProgressiveTopKPruneParts,
MemoryUsage,
ExternalServerRetryCount,
ExternalServerRequestCount,
Expand Down Expand Up @@ -355,6 +356,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::NanoSeconds,
plain_statistics: true,
}),
(ProfileStatisticsName::ProgressiveTopKPruneParts, ProfileDesc {
display_name: "progressive topk pruned parts",
desc: "The number of parts pruned by progressive TopK scan",
index: ProfileStatisticsName::ProgressiveTopKPruneParts as usize,
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::MemoryUsage, ProfileDesc {
display_name: "memory usage",
desc: "The real time memory usage",
Expand Down
72 changes: 66 additions & 6 deletions src/query/storages/fuse/src/operations/read/fuse_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::plan::StealablePartitions;
use databend_common_catalog::plan::TopK;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchema;
use databend_common_expression::TableSchema;
use databend_common_pipeline::core::OutputPort;
use databend_common_pipeline::core::Pipeline;
Expand All @@ -42,8 +43,10 @@ use crate::operations::read::DeserializeDataTransform;
use crate::operations::read::NativeDeserializeDataTransform;
use crate::operations::read::partition_stream::PartitionStream;
use crate::operations::read::partition_stream::PartitionStreamSource;
use crate::operations::read::partition_stream::ProgressiveTopKPartitionStream;
use crate::operations::read::partition_stream::ReceiverPartitionStream;
use crate::operations::read::partition_stream::StealPartitionStream;
use crate::operations::read::progressive_topk::ProgressiveTopKState;

#[allow(clippy::too_many_arguments)]
pub fn build_fuse_source_pipeline(
Expand Down Expand Up @@ -72,19 +75,39 @@ pub fn build_fuse_source_pipeline(
max_io_requests = max_io_requests.min(16);
}

let progressive_topk = create_progressive_topk_state(
storage_format,
plan,
topk.as_ref(),
&block_reader.data_schema(),
receiver.is_none(),
ctx.get_runtime_filters(plan.scan_id).is_empty(),
);
if progressive_topk.is_some() {
max_threads = 1;
max_io_requests = 1;
}

let waker = pipeline.get_waker();
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let stream: Arc<dyn PartitionStream> = match receiver {
Some(rx) => Arc::new(ReceiverPartitionStream::new(rx)),
None => {
let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests);
let mut partitions = StealablePartitions::new(partitions, ctx.clone());
if let Some(progressive_topk) = progressive_topk.clone() {
Arc::new(ProgressiveTopKPartitionStream::new(
plan.parts.partitions.clone(),
progressive_topk,
))
} else {
let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests);
let mut partitions = StealablePartitions::new(partitions, ctx.clone());

if matches!(storage_format, FuseStorageFormat::Native) && topk.is_some() {
partitions.disable_steal();
}
if matches!(storage_format, FuseStorageFormat::Native) && topk.is_some() {
partitions.disable_steal();
}

Arc::new(StealPartitionStream::new(partitions.clone(), batch_size))
Arc::new(StealPartitionStream::new(partitions.clone(), batch_size))
}
}
};

Expand Down Expand Up @@ -168,6 +191,7 @@ pub fn build_fuse_source_pipeline(
transform_output,
index_reader.clone(),
virtual_reader.clone(),
progressive_topk.clone(),
)
})?;
}
Expand All @@ -176,6 +200,42 @@ pub fn build_fuse_source_pipeline(
Ok(())
}

fn create_progressive_topk_state(
storage_format: FuseStorageFormat,
plan: &DataSourcePlan,
topk: Option<&TopK>,
read_schema: &DataSchema,
use_local_partitions: bool,
no_runtime_filters: bool,
) -> Option<Arc<ProgressiveTopKState>> {
if !matches!(storage_format, FuseStorageFormat::Parquet)
|| !use_local_partitions
|| !no_runtime_filters
|| !progressive_topk_push_downs_supported(plan)
|| plan.parts.partitions_type() != PartInfoType::BlockLevel
|| plan.parts.partitions.is_empty()
|| !plan.parts.partitions.iter().all(|part| {
FuseBlockPartInfo::from_part(part).is_ok_and(|part| part.sort_min_max.is_some())
})
{
return None;
}

ProgressiveTopKState::try_create(topk, read_schema)
}

fn progressive_topk_push_downs_supported(plan: &DataSourcePlan) -> bool {
plan.push_downs.as_ref().is_none_or(|push_downs| {
!push_downs.lazy_materialization
&& push_downs.virtual_column.is_none()
&& push_downs.agg_index.is_none()
&& push_downs.change_type.is_none()
&& push_downs.sample.is_none()
&& push_downs.secure_filters.is_none()
&& (push_downs.filters.is_none() || push_downs.prewhere.is_some())
})
}

pub fn dispatch_partitions(
ctx: Arc<dyn TableContext>,
plan: &DataSourcePlan,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod native_data_source_deserializer;
mod parquet_data_source;
mod parquet_data_source_deserializer;
mod parquet_rows_fetcher;
mod progressive_topk;
mod raw_data_source;
mod read_block_context;
mod read_data_source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use databend_common_pipeline::core::ProcessorPtr;
use roaring::RoaringTreemap;

use super::parquet_data_source::ParquetDataSource;
use super::progressive_topk::ProgressiveTopKState;
use super::read_data_source::ReadDataSource;
use super::read_state::ReadState;
use super::util::add_data_block_meta;
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct DeserializeDataTransform {

prewhere_info: Option<PrewhereInfo>,
read_state: Option<ReadState>,
progressive_topk: Option<Arc<ProgressiveTopKState>>,
}

unsafe impl Send for DeserializeDataTransform {}
Expand All @@ -85,6 +87,7 @@ impl DeserializeDataTransform {
output: Arc<OutputPort>,
index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,
progressive_topk: Option<Arc<ProgressiveTopKState>>,
) -> Result<ProcessorPtr> {
let scan_progress = ctx.get_scan_progress();

Expand Down Expand Up @@ -129,6 +132,7 @@ impl DeserializeDataTransform {
block_meta_options: plan.block_meta_options.clone(),
prewhere_info,
read_state: None,
progressive_topk,
})))
}
}
Expand Down Expand Up @@ -244,6 +248,10 @@ impl Processor for DeserializeDataTransform {
)?;
}

if let Some(progressive_topk) = &self.progressive_topk {
progressive_topk.complete_part(&data_block);
}

// Perf.
{
metrics_inc_remote_io_deserialize_milliseconds(
Expand Down
47 changes: 47 additions & 0 deletions src/query/storages/fuse/src/operations/read/partition_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
use std::task::Poll;

use async_channel::Receiver;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::StealablePartitions;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
Expand All @@ -33,8 +36,11 @@ use databend_common_pipeline::core::ProcessorPtr;
use databend_common_pipeline::core::SyncTaskHandle;
use databend_common_pipeline::core::SyncTaskSet;
use databend_common_sql::IndexType;
use parking_lot::Mutex;

use crate::FuseBlockPartInfo;
use crate::operations::read::block_partition_meta::BlockPartitionMeta;
use crate::operations::read::progressive_topk::ProgressiveTopKState;
use crate::operations::read::runtime_filter_wait::wait_runtime_filters;

#[async_trait::async_trait]
Expand Down Expand Up @@ -63,6 +69,47 @@ impl PartitionStream for StealPartitionStream {
}
}

pub struct ProgressiveTopKPartitionStream {
partitions: Mutex<VecDeque<PartInfoPtr>>,
state: Arc<ProgressiveTopKState>,
}

impl ProgressiveTopKPartitionStream {
pub fn new(partitions: Vec<PartInfoPtr>, state: Arc<ProgressiveTopKState>) -> Self {
Self {
partitions: Mutex::new(VecDeque::from(partitions)),
state,
}
}
}

#[async_trait::async_trait]
impl PartitionStream for ProgressiveTopKPartitionStream {
async fn fetch(&self, _id: usize) -> Result<Option<Vec<PartInfoPtr>>> {
self.state.wait_for_previous_part().await;

let mut partitions = self.partitions.lock();
let Some(part) = partitions.front() else {
return Ok(None);
};

let fuse_part = FuseBlockPartInfo::from_part(part)?;
if let Some(sort_min_max) = &fuse_part.sort_min_max
&& self.state.never_match(sort_min_max)
{
let skipped_blocks = partitions.len();
partitions.clear();
self.state.record_skipped_blocks(skipped_blocks);
return Ok(None);
}

let part = partitions.pop_front().unwrap();
self.state.record_scheduled_part();
Profile::record_usize_profile(ProfileStatisticsName::ScanPartitions, 1);
Ok(Some(vec![part]))
}
}

pub struct ReceiverPartitionStream {
receiver: Receiver<Result<PartInfoPtr>>,
}
Expand Down
125 changes: 125 additions & 0 deletions src/query/storages/fuse/src/operations/read/progressive_topk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_catalog::plan::TopK;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::Scalar;
use databend_common_expression::TopKSorter;
use databend_common_expression::types::DataType;
use databend_common_expression::types::MutableBitmap;
use parking_lot::Mutex;
use tokio::sync::Notify;

const MAX_PROGRESSIVE_TOPK_LIMIT: usize = 1000;

pub struct ProgressiveTopKState {
sorter: Mutex<TopKSorter>,
sort_column_offset: usize,
scheduled_parts: AtomicUsize,
completed_parts: AtomicUsize,
skipped_blocks: AtomicUsize,
notify: Notify,
}

impl ProgressiveTopKState {
pub(crate) fn try_create(
top_k: Option<&TopK>,
output_schema: &DataSchema,
) -> Option<Arc<Self>> {
let top_k = top_k?;
if top_k.limit > MAX_PROGRESSIVE_TOPK_LIMIT {
return None;
}

let data_type = DataType::from(top_k.field.data_type());
if !supports_progressive_topk(&data_type) {
return None;
}

let sort_column_offset = output_schema.index_of(top_k.field.name()).ok()?;
Some(Arc::new(Self {
sorter: Mutex::new(TopKSorter::new(top_k.limit, top_k.asc)),
sort_column_offset,
scheduled_parts: AtomicUsize::new(0),
completed_parts: AtomicUsize::new(0),
skipped_blocks: AtomicUsize::new(0),
notify: Notify::new(),
}))
}

pub(crate) async fn wait_for_previous_part(&self) {
loop {
let scheduled = self.scheduled_parts.load(Ordering::Acquire);
let completed = self.completed_parts.load(Ordering::Acquire);
if scheduled == completed {
return;
}
self.notify.notified().await;
}
}

pub(crate) fn record_scheduled_part(&self) {
self.scheduled_parts.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn complete_part(&self, data_block: &DataBlock) {
self.update(data_block);
self.completed_parts.fetch_add(1, Ordering::AcqRel);
self.notify.notify_waiters();
}

fn update(&self, data_block: &DataBlock) {
let rows = data_block.num_rows();
if rows == 0 {
return;
}

let column = data_block
.get_by_offset(self.sort_column_offset)
.to_column();
let mut bitmap = MutableBitmap::from_len_set(rows);
self.sorter.lock().push_column(&column, &mut bitmap);
}

pub(crate) fn never_match(&self, sort_min_max: &(Scalar, Scalar)) -> bool {
self.sorter.lock().never_match(sort_min_max)
}

pub(crate) fn record_skipped_blocks(&self, skipped_blocks: usize) {
if skipped_blocks == 0 {
return;
}

self.skipped_blocks
.fetch_add(skipped_blocks, Ordering::Relaxed);
Profile::record_usize_profile(
ProfileStatisticsName::ProgressiveTopKPruneParts,
skipped_blocks,
);
}
}

fn supports_progressive_topk(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Number(_) | DataType::Date | DataType::Timestamp
)
}
1 change: 0 additions & 1 deletion src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl FuseTable {
let topk = plan
.push_downs
.as_ref()
.filter(|_| self.is_native()) // Only native format supports topk push down.
.and_then(|x| x.top_k(plan.schema().as_ref()));

let index_reader = Arc::new(
Expand Down
Loading
Loading