Skip to content

Commit 5b0d17c

Browse files
committed
update
1 parent e73b65a commit 5b0d17c

File tree

5 files changed

+292
-23
lines changed

5 files changed

+292
-23
lines changed

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 121 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,53 @@ pub struct PlanFragment {
8282
}
8383

8484
impl PlanFragment {
85+
fn requires_coordinator_source(&self) -> bool {
86+
self.source_fragments
87+
.iter()
88+
.any(|fragment| !fragment.can_subscribe_on_all_executors())
89+
}
90+
91+
fn can_subscribe_on_all_executors(&self) -> bool {
92+
match &self.exchange {
93+
Some(DataExchange::Merge(_)) => false,
94+
Some(DataExchange::Broadcast(_))
95+
| Some(DataExchange::NodeToNodeExchange(_))
96+
| Some(DataExchange::GlobalShuffleExchange(_)) => true,
97+
None => !self.requires_coordinator_source(),
98+
}
99+
}
100+
101+
fn add_source_only_actions(
102+
&self,
103+
ctx: Arc<QueryContext>,
104+
fragment_actions: &mut QueryFragmentActions,
105+
) {
106+
let Some(exchange) = &self.exchange else {
107+
return;
108+
};
109+
110+
if !matches!(
111+
exchange,
112+
DataExchange::Broadcast(_)
113+
| DataExchange::NodeToNodeExchange(_)
114+
| DataExchange::GlobalShuffleExchange(_)
115+
) {
116+
return;
117+
}
118+
119+
let local_executor = Fragmenter::get_local_executor(ctx);
120+
for destination in exchange.get_destinations() {
121+
if destination == local_executor {
122+
continue;
123+
}
124+
125+
fragment_actions.add_action(QueryFragmentAction::create_source_only(
126+
destination,
127+
self.plan.clone(),
128+
));
129+
}
130+
}
131+
85132
pub fn get_actions(
86133
&self,
87134
ctx: Arc<QueryContext>,
@@ -92,6 +139,7 @@ impl PlanFragment {
92139
// }
93140

94141
let mut fragment_actions = QueryFragmentActions::create(self.fragment_id);
142+
let requires_coordinator_source = self.requires_coordinator_source();
95143

96144
match &self.fragment_type {
97145
FragmentType::Root => {
@@ -101,25 +149,19 @@ impl PlanFragment {
101149
);
102150
fragment_actions.add_action(action);
103151
}
152+
_ if requires_coordinator_source => {
153+
let action = QueryFragmentAction::create(
154+
Fragmenter::get_local_executor(ctx.clone()),
155+
self.plan.clone(),
156+
);
157+
fragment_actions.add_action(action);
158+
self.add_source_only_actions(ctx, &mut fragment_actions);
159+
}
104160
FragmentType::Intermediate => {
105-
if self
106-
.source_fragments
107-
.iter()
108-
.any(|fragment| matches!(&fragment.exchange, Some(DataExchange::Merge(_))))
109-
{
110-
// If this is a intermediate fragment with merge input,
111-
// we will only send it to coordinator node.
112-
let action = QueryFragmentAction::create(
113-
Fragmenter::get_local_executor(ctx),
114-
self.plan.clone(),
115-
);
161+
// Otherwise distribute the fragment to all the executors.
162+
for executor in Fragmenter::get_executors(ctx) {
163+
let action = QueryFragmentAction::create(executor, self.plan.clone());
116164
fragment_actions.add_action(action);
117-
} else {
118-
// Otherwise distribute the fragment to all the executors.
119-
for executor in Fragmenter::get_executors(ctx) {
120-
let action = QueryFragmentAction::create(executor, self.plan.clone());
121-
fragment_actions.add_action(action);
122-
}
123165
}
124166
}
125167
FragmentType::Source => {
@@ -553,6 +595,68 @@ impl PlanFragment {
553595
}
554596
}
555597

598+
#[cfg(test)]
599+
mod tests {
600+
use databend_common_expression::DataSchemaRefExt;
601+
602+
use super::*;
603+
use crate::physical_plans::ConstantTableScan;
604+
use crate::physical_plans::PhysicalPlanMeta;
605+
use crate::servers::flight::v1::exchange::BroadcastExchange;
606+
use crate::servers::flight::v1::exchange::MergeExchange;
607+
608+
fn mock_plan() -> PhysicalPlan {
609+
PhysicalPlan::new(ConstantTableScan {
610+
meta: PhysicalPlanMeta::new("ConstantTableScan"),
611+
values: vec![],
612+
num_rows: 0,
613+
output_schema: DataSchemaRefExt::create(vec![]),
614+
})
615+
}
616+
617+
#[test]
618+
fn test_merge_dependent_fragment_can_use_source_only_exchange() {
619+
let merge_source = PlanFragment {
620+
plan: mock_plan(),
621+
fragment_type: FragmentType::Source,
622+
fragment_id: 0,
623+
exchange: Some(MergeExchange::create(
624+
"coordinator".to_string(),
625+
false,
626+
true,
627+
)),
628+
query_id: "q".to_string(),
629+
source_fragments: vec![],
630+
};
631+
assert!(!merge_source.requires_coordinator_source());
632+
assert!(!merge_source.can_subscribe_on_all_executors());
633+
634+
let broadcast_fragment = PlanFragment {
635+
plan: mock_plan(),
636+
fragment_type: FragmentType::Intermediate,
637+
fragment_id: 1,
638+
exchange: Some(BroadcastExchange::create(
639+
vec!["coordinator".to_string(), "worker".to_string()],
640+
1,
641+
)),
642+
query_id: "q".to_string(),
643+
source_fragments: vec![merge_source],
644+
};
645+
assert!(broadcast_fragment.requires_coordinator_source());
646+
assert!(broadcast_fragment.can_subscribe_on_all_executors());
647+
648+
let downstream_fragment = PlanFragment {
649+
plan: mock_plan(),
650+
fragment_type: FragmentType::Intermediate,
651+
fragment_id: 2,
652+
exchange: None,
653+
query_id: "q".to_string(),
654+
source_fragments: vec![broadcast_fragment],
655+
};
656+
assert!(!downstream_fragment.requires_coordinator_source());
657+
}
658+
}
659+
556660
struct ConstTableColumn {
557661
columns: Vec<Column>,
558662
num_rows: usize,

src/query/service/src/schedulers/fragments/query_fragment_actions.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,26 @@ use crate::sessions::TableContext;
4141
pub struct QueryFragmentAction {
4242
pub executor: String,
4343
pub physical_plan: PhysicalPlan,
44+
pub source_only: bool,
4445
}
4546

4647
impl QueryFragmentAction {
4748
pub fn create(executor: String, physical_plan: PhysicalPlan) -> QueryFragmentAction {
4849
QueryFragmentAction {
4950
executor,
5051
physical_plan,
52+
source_only: false,
53+
}
54+
}
55+
56+
pub fn create_source_only(
57+
executor: String,
58+
physical_plan: PhysicalPlan,
59+
) -> QueryFragmentAction {
60+
QueryFragmentAction {
61+
executor,
62+
physical_plan,
63+
source_only: true,
5164
}
5265
}
5366
}
@@ -231,6 +244,10 @@ impl QueryFragmentsActions {
231244
let use_do_exchange = exchange.use_do_exchange();
232245

233246
for fragment_action in &fragment_actions.fragment_actions {
247+
if fragment_action.source_only {
248+
continue;
249+
}
250+
234251
let source = fragment_action.executor.to_string();
235252

236253
for destination in &destinations {
@@ -284,6 +301,7 @@ impl QueryFragmentsActions {
284301
fragment_actions.fragment_id,
285302
fragment_actions.data_exchange.clone(),
286303
fragment_action.physical_plan.clone(),
304+
fragment_action.source_only,
287305
);
288306

289307
match fragments_packets.entry(fragment_action.executor.clone()) {

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ use super::exchange_params::GlobalExchangeParams;
5353
use super::exchange_params::MergeExchangeParams;
5454
use super::exchange_params::ShuffleExchangeParams;
5555
use super::exchange_sink::ExchangeSink;
56+
use super::exchange_source::via_broadcast_exchange_source;
57+
use super::exchange_source::via_hash_exchange_source;
58+
use super::exchange_source::via_node_shuffle_exchange_source;
5659
use super::exchange_transform::ExchangeTransform;
5760
use super::statistics_receiver::StatisticsReceiver;
5861
use super::statistics_sender::StatisticsSender;
@@ -1087,6 +1090,11 @@ impl QueryCoordinator {
10871090
// Merge pipelines if exist locally pipeline
10881091
if let Some(mut fragment_coordinator) = self.fragments_coordinator.remove(&fragment_id) {
10891092
let info = self.info.as_ref().expect("QueryInfo is none");
1093+
1094+
if fragment_coordinator.source_only {
1095+
return fragment_coordinator.build_source_only_pipeline(ctx, info, injector);
1096+
}
1097+
10901098
fragment_coordinator.prepare_pipeline(ctx.clone())?;
10911099

10921100
if fragment_coordinator.pipeline_build_res.is_none() {
@@ -1163,16 +1171,28 @@ impl QueryCoordinator {
11631171
}
11641172
}
11651173

1166-
if self.fragments_coordinator.is_empty() {
1174+
let active_fragment_ids = self
1175+
.fragments_coordinator
1176+
.iter()
1177+
.filter_map(|(fragment_id, coordinator)| {
1178+
(!coordinator.source_only).then_some(*fragment_id)
1179+
})
1180+
.collect::<Vec<_>>();
1181+
1182+
if active_fragment_ids.is_empty() {
11671183
// Empty fragments if it is a request server, because the pipelines may have been linked.
11681184
return Ok(());
11691185
}
11701186

11711187
let max_threads = info.query_ctx.get_settings().get_max_threads()?;
1172-
let mut pipelines = Vec::with_capacity(self.fragments_coordinator.len());
1173-
1174-
let mut params = Vec::with_capacity(self.fragments_coordinator.len());
1175-
for coordinator in self.fragments_coordinator.values() {
1188+
let mut pipelines = Vec::with_capacity(active_fragment_ids.len());
1189+
1190+
let mut params = Vec::with_capacity(active_fragment_ids.len());
1191+
for fragment_id in &active_fragment_ids {
1192+
let coordinator = self
1193+
.fragments_coordinator
1194+
.get(fragment_id)
1195+
.expect("active fragment must exist");
11761196
params.push(
11771197
coordinator.create_exchange_params(
11781198
info,
@@ -1187,7 +1207,11 @@ impl QueryCoordinator {
11871207
);
11881208
}
11891209

1190-
for ((_, coordinator), params) in self.fragments_coordinator.iter_mut().zip(params) {
1210+
for (fragment_id, params) in active_fragment_ids.into_iter().zip(params) {
1211+
let coordinator = self
1212+
.fragments_coordinator
1213+
.get_mut(&fragment_id)
1214+
.expect("active fragment must exist");
11911215
if let Some(mut build_res) = coordinator.pipeline_build_res.take() {
11921216
build_res.set_max_threads(max_threads as usize);
11931217

@@ -1272,6 +1296,7 @@ impl QueryCoordinator {
12721296

12731297
struct FragmentCoordinator {
12741298
initialized: bool,
1299+
source_only: bool,
12751300
fragment_id: usize,
12761301
physical_plan: PhysicalPlan,
12771302
data_exchange: Option<DataExchange>,
@@ -1282,6 +1307,7 @@ impl FragmentCoordinator {
12821307
pub fn create(packet: &QueryFragment) -> Box<FragmentCoordinator> {
12831308
Box::new(FragmentCoordinator {
12841309
initialized: false,
1310+
source_only: packet.source_only,
12851311
physical_plan: packet.physical_plan.clone(),
12861312
fragment_id: packet.fragment_id,
12871313
data_exchange: packet.data_exchange.clone(),
@@ -1346,7 +1372,47 @@ impl FragmentCoordinator {
13461372
}
13471373
}
13481374

1375+
fn build_source_only_pipeline(
1376+
&self,
1377+
ctx: &Arc<QueryContext>,
1378+
info: &QueryInfo,
1379+
injector: Arc<dyn ExchangeInjector>,
1380+
) -> Result<PipelineBuildResult> {
1381+
let Some(exchange_params) = self.create_exchange_params(info, injector.clone())? else {
1382+
return Err(ErrorCode::Internal(
1383+
"source-only fragment must have data exchange",
1384+
));
1385+
};
1386+
1387+
let mut build_res = PipelineBuildResult::create();
1388+
match &exchange_params {
1389+
ExchangeParams::BroadcastExchange(params) => {
1390+
via_broadcast_exchange_source(ctx, params, &mut build_res.main_pipeline)?
1391+
}
1392+
ExchangeParams::NodeShuffleExchange(params) => via_node_shuffle_exchange_source(
1393+
ctx,
1394+
params,
1395+
injector,
1396+
&mut build_res.main_pipeline,
1397+
)?,
1398+
ExchangeParams::GlobalShuffleExchange(params) => {
1399+
via_hash_exchange_source(ctx, params, &mut build_res.main_pipeline)?
1400+
}
1401+
ExchangeParams::MergeExchange(_) => {
1402+
return Err(ErrorCode::Internal(
1403+
"merge exchange does not support source-only fragments",
1404+
));
1405+
}
1406+
}
1407+
1408+
Ok(build_res)
1409+
}
1410+
13491411
pub fn prepare_pipeline(&mut self, ctx: Arc<QueryContext>) -> Result<()> {
1412+
if self.source_only {
1413+
return Ok(());
1414+
}
1415+
13501416
if !self.initialized {
13511417
self.initialized = true;
13521418

0 commit comments

Comments
 (0)