Skip to content

Commit 43ef57a

Browse files
committed
feat(parquet): initial support prune parts at runtime when do topk query
1 parent b61cd1f commit 43ef57a

9 files changed

Lines changed: 399 additions & 9 deletions

File tree

src/common/base/src/runtime/profile/profiles.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub enum ProfileStatisticsName {
6363
RuntimeFilterInlistMinMaxTime,
6464
RuntimeFilterSpatialTime,
6565
RuntimeFilterBuildTime,
66+
ProgressiveTopKPruneParts,
6667
MemoryUsage,
6768
ExternalServerRetryCount,
6869
ExternalServerRequestCount,
@@ -355,6 +356,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
355356
unit: StatisticsUnit::NanoSeconds,
356357
plain_statistics: true,
357358
}),
359+
(ProfileStatisticsName::ProgressiveTopKPruneParts, ProfileDesc {
360+
display_name: "progressive topk pruned parts",
361+
desc: "The number of parts pruned by progressive TopK scan",
362+
index: ProfileStatisticsName::ProgressiveTopKPruneParts as usize,
363+
unit: StatisticsUnit::Count,
364+
plain_statistics: true,
365+
}),
358366
(ProfileStatisticsName::MemoryUsage, ProfileDesc {
359367
display_name: "memory usage",
360368
desc: "The real time memory usage",

src/query/storages/fuse/src/operations/read/fuse_source.rs

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_catalog::plan::StealablePartitions;
2323
use databend_common_catalog::plan::TopK;
2424
use databend_common_catalog::table_context::TableContext;
2525
use databend_common_exception::Result;
26+
use databend_common_expression::DataSchema;
2627
use databend_common_expression::TableSchema;
2728
use databend_common_pipeline::core::OutputPort;
2829
use databend_common_pipeline::core::Pipeline;
@@ -42,8 +43,10 @@ use crate::operations::read::DeserializeDataTransform;
4243
use crate::operations::read::NativeDeserializeDataTransform;
4344
use crate::operations::read::partition_stream::PartitionStream;
4445
use crate::operations::read::partition_stream::PartitionStreamSource;
46+
use crate::operations::read::partition_stream::ProgressiveTopKPartitionStream;
4547
use crate::operations::read::partition_stream::ReceiverPartitionStream;
4648
use crate::operations::read::partition_stream::StealPartitionStream;
49+
use crate::operations::read::progressive_topk::ProgressiveTopKState;
4750

4851
#[allow(clippy::too_many_arguments)]
4952
pub fn build_fuse_source_pipeline(
@@ -72,19 +75,39 @@ pub fn build_fuse_source_pipeline(
7275
max_io_requests = max_io_requests.min(16);
7376
}
7477

78+
let progressive_topk = create_progressive_topk_state(
79+
storage_format,
80+
plan,
81+
topk.as_ref(),
82+
&block_reader.data_schema(),
83+
receiver.is_none(),
84+
ctx.get_runtime_filters(plan.scan_id).is_empty(),
85+
);
86+
if progressive_topk.is_some() {
87+
max_threads = 1;
88+
max_io_requests = 1;
89+
}
90+
7591
let waker = pipeline.get_waker();
7692
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
7793
let stream: Arc<dyn PartitionStream> = match receiver {
7894
Some(rx) => Arc::new(ReceiverPartitionStream::new(rx)),
7995
None => {
80-
let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests);
81-
let mut partitions = StealablePartitions::new(partitions, ctx.clone());
96+
if let Some(progressive_topk) = progressive_topk.clone() {
97+
Arc::new(ProgressiveTopKPartitionStream::new(
98+
plan.parts.partitions.clone(),
99+
progressive_topk,
100+
))
101+
} else {
102+
let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests);
103+
let mut partitions = StealablePartitions::new(partitions, ctx.clone());
82104

83-
if matches!(storage_format, FuseStorageFormat::Native) && topk.is_some() {
84-
partitions.disable_steal();
85-
}
105+
if matches!(storage_format, FuseStorageFormat::Native) && topk.is_some() {
106+
partitions.disable_steal();
107+
}
86108

87-
Arc::new(StealPartitionStream::new(partitions.clone(), batch_size))
109+
Arc::new(StealPartitionStream::new(partitions.clone(), batch_size))
110+
}
88111
}
89112
};
90113

@@ -168,6 +191,7 @@ pub fn build_fuse_source_pipeline(
168191
transform_output,
169192
index_reader.clone(),
170193
virtual_reader.clone(),
194+
progressive_topk.clone(),
171195
)
172196
})?;
173197
}
@@ -176,6 +200,42 @@ pub fn build_fuse_source_pipeline(
176200
Ok(())
177201
}
178202

203+
fn create_progressive_topk_state(
204+
storage_format: FuseStorageFormat,
205+
plan: &DataSourcePlan,
206+
topk: Option<&TopK>,
207+
read_schema: &DataSchema,
208+
use_local_partitions: bool,
209+
no_runtime_filters: bool,
210+
) -> Option<Arc<ProgressiveTopKState>> {
211+
if !matches!(storage_format, FuseStorageFormat::Parquet)
212+
|| !use_local_partitions
213+
|| !no_runtime_filters
214+
|| !progressive_topk_push_downs_supported(plan)
215+
|| plan.parts.partitions_type() != PartInfoType::BlockLevel
216+
|| plan.parts.partitions.is_empty()
217+
|| !plan.parts.partitions.iter().all(|part| {
218+
FuseBlockPartInfo::from_part(part).is_ok_and(|part| part.sort_min_max.is_some())
219+
})
220+
{
221+
return None;
222+
}
223+
224+
ProgressiveTopKState::try_create(topk, read_schema)
225+
}
226+
227+
fn progressive_topk_push_downs_supported(plan: &DataSourcePlan) -> bool {
228+
plan.push_downs.as_ref().is_none_or(|push_downs| {
229+
!push_downs.lazy_materialization
230+
&& push_downs.virtual_column.is_none()
231+
&& push_downs.agg_index.is_none()
232+
&& push_downs.change_type.is_none()
233+
&& push_downs.sample.is_none()
234+
&& push_downs.secure_filters.is_none()
235+
&& (push_downs.filters.is_none() || push_downs.prewhere.is_some())
236+
})
237+
}
238+
179239
pub fn dispatch_partitions(
180240
ctx: Arc<dyn TableContext>,
181241
plan: &DataSourcePlan,

src/query/storages/fuse/src/operations/read/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod native_data_source_deserializer;
2020
mod parquet_data_source;
2121
mod parquet_data_source_deserializer;
2222
mod parquet_rows_fetcher;
23+
mod progressive_topk;
2324
mod raw_data_source;
2425
mod read_block_context;
2526
mod read_data_source;

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_pipeline::core::ProcessorPtr;
4141
use roaring::RoaringTreemap;
4242

4343
use super::parquet_data_source::ParquetDataSource;
44+
use super::progressive_topk::ProgressiveTopKState;
4445
use super::read_data_source::ReadDataSource;
4546
use super::read_state::ReadState;
4647
use super::util::add_data_block_meta;
@@ -72,6 +73,7 @@ pub struct DeserializeDataTransform {
7273

7374
prewhere_info: Option<PrewhereInfo>,
7475
read_state: Option<ReadState>,
76+
progressive_topk: Option<Arc<ProgressiveTopKState>>,
7577
}
7678

7779
unsafe impl Send for DeserializeDataTransform {}
@@ -85,6 +87,7 @@ impl DeserializeDataTransform {
8587
output: Arc<OutputPort>,
8688
index_reader: Arc<Option<AggIndexReader>>,
8789
virtual_reader: Arc<Option<VirtualColumnReader>>,
90+
progressive_topk: Option<Arc<ProgressiveTopKState>>,
8891
) -> Result<ProcessorPtr> {
8992
let scan_progress = ctx.get_scan_progress();
9093

@@ -129,6 +132,7 @@ impl DeserializeDataTransform {
129132
block_meta_options: plan.block_meta_options.clone(),
130133
prewhere_info,
131134
read_state: None,
135+
progressive_topk,
132136
})))
133137
}
134138
}
@@ -244,6 +248,10 @@ impl Processor for DeserializeDataTransform {
244248
)?;
245249
}
246250

251+
if let Some(progressive_topk) = &self.progressive_topk {
252+
progressive_topk.complete_part(&data_block);
253+
}
254+
247255
// Perf.
248256
{
249257
metrics_inc_remote_io_deserialize_milliseconds(

src/query/storages/fuse/src/operations/read/partition_stream.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::collections::VecDeque;
1617
use std::sync::Arc;
1718
use std::task::Poll;
1819

1920
use async_channel::Receiver;
21+
use databend_common_base::runtime::profile::Profile;
22+
use databend_common_base::runtime::profile::ProfileStatisticsName;
2023
use databend_common_catalog::plan::PartInfoPtr;
2124
use databend_common_catalog::plan::StealablePartitions;
2225
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
@@ -33,8 +36,11 @@ use databend_common_pipeline::core::ProcessorPtr;
3336
use databend_common_pipeline::core::SyncTaskHandle;
3437
use databend_common_pipeline::core::SyncTaskSet;
3538
use databend_common_sql::IndexType;
39+
use parking_lot::Mutex;
3640

41+
use crate::FuseBlockPartInfo;
3742
use crate::operations::read::block_partition_meta::BlockPartitionMeta;
43+
use crate::operations::read::progressive_topk::ProgressiveTopKState;
3844
use crate::operations::read::runtime_filter_wait::wait_runtime_filters;
3945

4046
#[async_trait::async_trait]
@@ -63,6 +69,47 @@ impl PartitionStream for StealPartitionStream {
6369
}
6470
}
6571

72+
pub struct ProgressiveTopKPartitionStream {
73+
partitions: Mutex<VecDeque<PartInfoPtr>>,
74+
state: Arc<ProgressiveTopKState>,
75+
}
76+
77+
impl ProgressiveTopKPartitionStream {
78+
pub fn new(partitions: Vec<PartInfoPtr>, state: Arc<ProgressiveTopKState>) -> Self {
79+
Self {
80+
partitions: Mutex::new(VecDeque::from(partitions)),
81+
state,
82+
}
83+
}
84+
}
85+
86+
#[async_trait::async_trait]
87+
impl PartitionStream for ProgressiveTopKPartitionStream {
88+
async fn fetch(&self, _id: usize) -> Result<Option<Vec<PartInfoPtr>>> {
89+
self.state.wait_for_previous_part().await;
90+
91+
let mut partitions = self.partitions.lock();
92+
let Some(part) = partitions.front() else {
93+
return Ok(None);
94+
};
95+
96+
let fuse_part = FuseBlockPartInfo::from_part(part)?;
97+
if let Some(sort_min_max) = &fuse_part.sort_min_max
98+
&& self.state.never_match(sort_min_max)
99+
{
100+
let skipped_blocks = partitions.len();
101+
partitions.clear();
102+
self.state.record_skipped_blocks(skipped_blocks);
103+
return Ok(None);
104+
}
105+
106+
let part = partitions.pop_front().unwrap();
107+
self.state.record_scheduled_part();
108+
Profile::record_usize_profile(ProfileStatisticsName::ScanPartitions, 1);
109+
Ok(Some(vec![part]))
110+
}
111+
}
112+
66113
pub struct ReceiverPartitionStream {
67114
receiver: Receiver<Result<PartInfoPtr>>,
68115
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::sync::atomic::AtomicUsize;
17+
use std::sync::atomic::Ordering;
18+
19+
use databend_common_base::runtime::profile::Profile;
20+
use databend_common_base::runtime::profile::ProfileStatisticsName;
21+
use databend_common_catalog::plan::TopK;
22+
use databend_common_expression::DataBlock;
23+
use databend_common_expression::DataSchema;
24+
use databend_common_expression::Scalar;
25+
use databend_common_expression::TopKSorter;
26+
use databend_common_expression::types::DataType;
27+
use databend_common_expression::types::MutableBitmap;
28+
use parking_lot::Mutex;
29+
use tokio::sync::Notify;
30+
31+
const MAX_PROGRESSIVE_TOPK_LIMIT: usize = 1000;
32+
33+
pub struct ProgressiveTopKState {
34+
sorter: Mutex<TopKSorter>,
35+
sort_column_offset: usize,
36+
scheduled_parts: AtomicUsize,
37+
completed_parts: AtomicUsize,
38+
skipped_blocks: AtomicUsize,
39+
notify: Notify,
40+
}
41+
42+
impl ProgressiveTopKState {
43+
pub(crate) fn try_create(
44+
top_k: Option<&TopK>,
45+
output_schema: &DataSchema,
46+
) -> Option<Arc<Self>> {
47+
let top_k = top_k?;
48+
if top_k.limit > MAX_PROGRESSIVE_TOPK_LIMIT {
49+
return None;
50+
}
51+
52+
let data_type = DataType::from(top_k.field.data_type());
53+
if !supports_progressive_topk(&data_type) {
54+
return None;
55+
}
56+
57+
let sort_column_offset = output_schema.index_of(top_k.field.name()).ok()?;
58+
Some(Arc::new(Self {
59+
sorter: Mutex::new(TopKSorter::new(top_k.limit, top_k.asc)),
60+
sort_column_offset,
61+
scheduled_parts: AtomicUsize::new(0),
62+
completed_parts: AtomicUsize::new(0),
63+
skipped_blocks: AtomicUsize::new(0),
64+
notify: Notify::new(),
65+
}))
66+
}
67+
68+
pub(crate) async fn wait_for_previous_part(&self) {
69+
loop {
70+
let scheduled = self.scheduled_parts.load(Ordering::Acquire);
71+
let completed = self.completed_parts.load(Ordering::Acquire);
72+
if scheduled == completed {
73+
return;
74+
}
75+
self.notify.notified().await;
76+
}
77+
}
78+
79+
pub(crate) fn record_scheduled_part(&self) {
80+
self.scheduled_parts.fetch_add(1, Ordering::AcqRel);
81+
}
82+
83+
pub(crate) fn complete_part(&self, data_block: &DataBlock) {
84+
self.update(data_block);
85+
self.completed_parts.fetch_add(1, Ordering::AcqRel);
86+
self.notify.notify_waiters();
87+
}
88+
89+
fn update(&self, data_block: &DataBlock) {
90+
let rows = data_block.num_rows();
91+
if rows == 0 {
92+
return;
93+
}
94+
95+
let column = data_block
96+
.get_by_offset(self.sort_column_offset)
97+
.to_column();
98+
let mut bitmap = MutableBitmap::from_len_set(rows);
99+
self.sorter.lock().push_column(&column, &mut bitmap);
100+
}
101+
102+
pub(crate) fn never_match(&self, sort_min_max: &(Scalar, Scalar)) -> bool {
103+
self.sorter.lock().never_match(sort_min_max)
104+
}
105+
106+
pub(crate) fn record_skipped_blocks(&self, skipped_blocks: usize) {
107+
if skipped_blocks == 0 {
108+
return;
109+
}
110+
111+
self.skipped_blocks
112+
.fetch_add(skipped_blocks, Ordering::Relaxed);
113+
Profile::record_usize_profile(
114+
ProfileStatisticsName::ProgressiveTopKPruneParts,
115+
skipped_blocks,
116+
);
117+
}
118+
}
119+
120+
fn supports_progressive_topk(data_type: &DataType) -> bool {
121+
matches!(
122+
data_type,
123+
DataType::Number(_) | DataType::Date | DataType::Timestamp
124+
)
125+
}

src/query/storages/fuse/src/operations/read_data.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ impl FuseTable {
100100
let topk = plan
101101
.push_downs
102102
.as_ref()
103-
.filter(|_| self.is_native()) // Only native format supports topk push down.
104103
.and_then(|x| x.top_k(plan.schema().as_ref()));
105104

106105
let index_reader = Arc::new(

0 commit comments

Comments
 (0)