Skip to content

Commit 58d8f88

Browse files
committed
refactor(query): align profiles with physical plan scope
1 parent 6f74e46 commit 58d8f88

46 files changed

Lines changed: 479 additions & 91 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/query/catalog/src/table_context/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub trait TableContextQueryInfo: Send + Sync {
6868
pub trait TableContextQueryProfile: Send + Sync {
6969
fn get_queries_profile(&self) -> HashMap<String, Vec<PlanProfile>>;
7070

71-
fn add_query_profiles(&self, profiles: &HashMap<u32, PlanProfile>);
71+
fn add_query_profiles(&self, profile_scope_id: &str, profiles: &HashMap<u32, PlanProfile>);
7272

7373
fn get_query_profiles(&self) -> Vec<PlanProfile>;
7474
}

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,8 @@ async fn execute_complete_pipeline(
702702
if build_res.main_pipeline.is_complete_pipeline()? {
703703
let mut pipelines = build_res.sources_pipelines;
704704
pipelines.push(build_res.main_pipeline);
705-
let executor_settings = ExecutorSettings::try_create(ctx)?;
705+
let executor_settings =
706+
ExecutorSettings::try_create(ctx, build_res.profile_scope_id.clone())?;
706707
let complete_executor =
707708
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
708709
complete_executor.execute().await?;

src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async fn setup_table_with_virtual_columns(num_blocks: usize) -> anyhow::Result<T
6565
.await?;
6666
let settings = ctx.get_settings();
6767
build_res.set_max_threads(settings.get_max_threads()? as usize);
68-
let settings = ExecutorSettings::try_create(ctx.clone())?;
68+
let settings = ExecutorSettings::try_create_with_new_profile_scope(ctx.clone())?;
6969
if build_res.main_pipeline.is_complete_pipeline()? {
7070
let mut pipelines = build_res.sources_pipelines;
7171
pipelines.push(build_res.main_pipeline);
@@ -131,7 +131,7 @@ async fn test_fuse_do_refresh_virtual_column() -> anyhow::Result<()> {
131131

132132
let settings = table_ctx.get_settings();
133133
build_res.set_max_threads(settings.get_max_threads()? as usize);
134-
let settings = ExecutorSettings::try_create(table_ctx.clone())?;
134+
let settings = ExecutorSettings::try_create_with_new_profile_scope(table_ctx.clone())?;
135135

136136
if build_res.main_pipeline.is_complete_pipeline()? {
137137
let mut pipelines = build_res.sources_pipelines;

src/query/pipeline/src/core/finished_chain.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,22 @@ pub enum CallbackType {
3737
pub struct ExecutionInfo {
3838
pub res: Result<()>,
3939
pub profiling: HashMap<u32, PlanProfile>,
40+
// Profiles produced by implementation-only child executors can share the same scope
41+
// as their parent physical plan.
42+
pub profile_scope_id: String,
4043
}
4144

4245
impl ExecutionInfo {
43-
pub fn create(res: Result<()>, profiling: HashMap<u32, PlanProfile>) -> ExecutionInfo {
44-
ExecutionInfo { res, profiling }
46+
pub fn create(
47+
res: Result<()>,
48+
profiling: HashMap<u32, PlanProfile>,
49+
profile_scope_id: impl Into<String>,
50+
) -> ExecutionInfo {
51+
ExecutionInfo {
52+
res,
53+
profiling,
54+
profile_scope_id: profile_scope_id.into(),
55+
}
4556
}
4657
}
4758

@@ -314,7 +325,7 @@ mod tests {
314325
);
315326
}
316327

317-
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new()))?;
328+
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new(), String::new()))?;
318329

319330
assert_eq!(seq.load(Ordering::SeqCst), 10);
320331

@@ -378,7 +389,7 @@ mod tests {
378389
}),
379390
);
380391

381-
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new()))?;
392+
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new(), String::new()))?;
382393

383394
assert_eq!(seq.load(Ordering::SeqCst), 13);
384395

@@ -443,7 +454,7 @@ mod tests {
443454
}),
444455
);
445456

446-
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new()))?;
457+
chain.apply(ExecutionInfo::create(Ok(()), HashMap::new(), String::new()))?;
447458

448459
assert_eq!(seq.load(Ordering::SeqCst), 13);
449460

@@ -522,7 +533,7 @@ mod tests {
522533

523534
assert!(
524535
chain
525-
.apply(ExecutionInfo::create(Ok(()), HashMap::new()))
536+
.apply(ExecutionInfo::create(Ok(()), HashMap::new(), String::new(),))
526537
.is_err()
527538
);
528539

src/query/pipeline/src/core/pipeline.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -646,9 +646,11 @@ impl Drop for Pipeline {
646646
"Pipeline illegal state: not successfully shutdown.",
647647
));
648648

649-
let _ = self
650-
.on_finished_chain
651-
.apply(ExecutionInfo::create(cause, HashMap::new()));
649+
let _ = self.on_finished_chain.apply(ExecutionInfo::create(
650+
cause,
651+
HashMap::new(),
652+
String::new(),
653+
));
652654
})
653655
}
654656
}

src/query/pipeline/src/core/profile.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,9 @@ impl PlanScope {
213213
name: String,
214214
title: Arc<String>,
215215
labels: Arc<Vec<ProfileLabel>>,
216+
parent_id: Option<u32>,
216217
) -> Arc<PlanScope> {
217-
let parent_id = PlanScope::get_plan_scope().map(|x| x.id);
218+
let parent_id = parent_id.or_else(|| PlanScope::get_plan_scope().map(|x| x.id));
218219
Arc::new(PlanScope {
219220
id,
220221
labels,

src/query/service/src/interpreters/common/finish_hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl QueryFinishHooks {
9393
ctx.collect_local_perf_counters(node_id);
9494
}
9595
if self.collect_profiles {
96-
ctx.add_query_profiles(&info.profiling);
96+
ctx.add_query_profiles(&info.profile_scope_id, &info.profiling);
9797
}
9898
let hooks_res = if self.run_hooks {
9999
run_hooks(ctx.clone())

src/query/service/src/interpreters/hook/analyze_hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async fn do_analyze(ctx: Arc<QueryContext>, desc: AnalyzeDesc) -> Result<()> {
8989
false,
9090
)?;
9191
pipeline.set_max_threads(ctx.get_settings().get_max_threads()? as usize);
92-
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
92+
let executor_settings = ExecutorSettings::try_create_with_new_profile_scope(ctx.clone())?;
9393
let pipelines = vec![pipeline];
9494
let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
9595
ctx.set_executor(complete_executor.get_inner())?;

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ async fn compact_table(
199199
));
200200

201201
build_res.set_max_threads(settings.get_max_threads()? as usize);
202-
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;
202+
let executor_settings =
203+
ExecutorSettings::try_create(ctx.clone(), build_res.profile_scope_id.clone())?;
203204

204205
let mut pipelines = build_res.sources_pipelines;
205206
pipelines.push(build_res.main_pipeline);

src/query/service/src/interpreters/hook/refresh_hook.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
125125

126126
let settings = ctx_cloned.get_settings();
127127
build_res.set_max_threads(settings.get_max_threads()? as usize);
128-
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;
128+
let settings = ExecutorSettings::try_create(
129+
ctx_cloned.clone(),
130+
build_res.profile_scope_id.clone(),
131+
)?;
129132

130133
if build_res.main_pipeline.is_complete_pipeline()? {
131134
let query_ctx = ctx_cloned.clone();
@@ -162,7 +165,10 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
162165

163166
let settings = ctx_cloned.get_settings();
164167
build_res.set_max_threads(settings.get_max_threads()? as usize);
165-
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;
168+
let settings = ExecutorSettings::try_create(
169+
ctx_cloned.clone(),
170+
build_res.profile_scope_id.clone(),
171+
)?;
166172

167173
if build_res.main_pipeline.is_complete_pipeline()? {
168174
let query_ctx = ctx_cloned.clone();

0 commit comments

Comments
 (0)