Skip to content

Commit def7758

Browse files
committed
fix(query): avoid waiting idle fragment nodes
1 parent 0bca004 commit def7758

7 files changed

Lines changed: 152 additions & 63 deletions

File tree

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl PlanFragment {
161161

162162
let data_sources = self.collect_data_sources()?;
163163

164-
let executors = Fragmenter::get_executors_nodes(ctx);
164+
let executors = Fragmenter::get_executors_nodes(ctx.clone());
165165

166166
let mut executor_partitions: HashMap<String, HashMap<u32, DataSource>> = HashMap::new();
167167

@@ -213,12 +213,38 @@ impl PlanFragment {
213213
}
214214
}
215215

216+
let local_executor = Fragmenter::get_local_executor(ctx);
217+
let mut empty_action_sources = None;
218+
let mut non_empty_actions = Vec::new();
219+
216220
for (executor, sources) in executor_partitions {
221+
let is_empty = sources.values().all(DataSource::is_empty);
222+
223+
if is_empty {
224+
if executor == local_executor {
225+
empty_action_sources = Some((executor, sources));
226+
}
227+
continue;
228+
}
229+
230+
non_empty_actions.push((executor, sources));
231+
}
232+
233+
if non_empty_actions.is_empty() {
234+
if let Some((executor, sources)) = empty_action_sources {
235+
let mut handle = ReadSourceDeriveHandle::create(sources);
236+
let plan = self.plan.derive_with(&mut handle);
237+
fragment_actions.add_action(QueryFragmentAction::create(executor, plan));
238+
}
239+
return Ok(());
240+
}
241+
242+
for (executor, sources) in non_empty_actions {
217243
// Replace `ReadDataSourcePlan` with rewritten one and generate new fragment for it.
218244
let mut handle = ReadSourceDeriveHandle::create(sources);
219245
let plan = self.plan.derive_with(&mut handle);
220246

221-
fragment_actions.add_action(QueryFragmentAction::create(executor.clone(), plan));
247+
fragment_actions.add_action(QueryFragmentAction::create(executor, plan));
222248
}
223249

224250
Ok(())
@@ -564,6 +590,15 @@ enum DataSource {
564590
ConstTable(ConstTableColumn),
565591
}
566592

593+
impl DataSource {
594+
fn is_empty(&self) -> bool {
595+
match self {
596+
DataSource::Table(plan) => plan.parts.is_empty(),
597+
DataSource::ConstTable(columns) => columns.num_rows == 0,
598+
}
599+
}
600+
}
601+
567602
impl TryFrom<DataSource> for DataSourcePlan {
568603
type Error = ErrorCode;
569604

src/query/service/src/schedulers/fragments/query_fragment_actions.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::collections::HashSet;
1617
use std::collections::hash_map::Entry;
1718
use std::fmt::Debug;
1819
use std::fmt::Formatter;
@@ -189,7 +190,8 @@ impl QueryFragmentsActions {
189190

190191
pub fn get_query_env(&self) -> Result<QueryEnv> {
191192
let workload_group = self.ctx.get_current_session().get_current_workload_group();
192-
let mut builder = DataflowDiagramBuilder::create(self.ctx.get_cluster().nodes.clone());
193+
let mut builder =
194+
DataflowDiagramBuilder::create(self.query_env_nodes().into_values().collect());
193195

194196
self.fragments_connections(&mut builder)?;
195197
self.statistics_connections(&mut builder)?;
@@ -213,7 +215,7 @@ impl QueryFragmentsActions {
213215
}
214216

215217
pub fn prepared_query(&self) -> Result<HashMap<String, String>> {
216-
let nodes_info = Self::nodes_info(&self.ctx);
218+
let nodes_info = self.execution_nodes();
217219
let mut execute_partial_query_packets = HashMap::with_capacity(nodes_info.len());
218220

219221
for node_id in nodes_info.keys() {
@@ -258,13 +260,49 @@ impl QueryFragmentsActions {
258260
fn statistics_connections(&self, builder: &mut DataflowDiagramBuilder) -> Result<()> {
259261
let local_id = self.ctx.get_cluster().local_id.clone();
260262

261-
for (_id, node_info) in Self::nodes_info(&self.ctx) {
263+
for (_id, node_info) in self.execution_nodes() {
262264
builder.add_statistics_edge(&node_info.id, &local_id)?;
263265
}
264266

265267
Ok(())
266268
}
267269

270+
fn query_env_nodes(&self) -> HashMap<String, Arc<NodeInfo>> {
271+
let mut node_ids = self.execution_node_ids();
272+
node_ids.insert(self.get_local_executor());
273+
274+
for fragment_actions in &self.fragments_actions {
275+
if let Some(exchange) = &fragment_actions.data_exchange {
276+
node_ids.extend(exchange.get_destinations());
277+
}
278+
}
279+
280+
Self::nodes_info(&self.ctx)
281+
.into_iter()
282+
.filter(|(node_id, _)| node_ids.contains(node_id))
283+
.collect()
284+
}
285+
286+
fn execution_nodes(&self) -> HashMap<String, Arc<NodeInfo>> {
287+
let node_ids = self.execution_node_ids();
288+
Self::nodes_info(&self.ctx)
289+
.into_iter()
290+
.filter(|(node_id, _)| node_ids.contains(node_id))
291+
.collect()
292+
}
293+
294+
fn execution_node_ids(&self) -> HashSet<String> {
295+
let mut node_ids = HashSet::new();
296+
297+
for fragment_actions in &self.fragments_actions {
298+
for fragment_action in &fragment_actions.fragment_actions {
299+
node_ids.insert(fragment_action.executor.clone());
300+
}
301+
}
302+
303+
node_ids
304+
}
305+
268306
fn nodes_info(ctx: &Arc<QueryContext>) -> HashMap<String, Arc<NodeInfo>> {
269307
let nodes = ctx.get_cluster().get_nodes();
270308
let mut nodes_info = HashMap::with_capacity(nodes.len());

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,36 @@ impl DataExchangeManager {
899899
}
900900
}
901901

902+
pub fn get_merge_flight_receiver_if_exists(
903+
&self,
904+
params: &MergeExchangeParams,
905+
) -> Result<Vec<FlightReceiver>> {
906+
let queries_coordinator_guard = self.queries_coordinator.lock();
907+
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
908+
909+
match queries_coordinator.get_mut(&params.query_id) {
910+
None => Err(ErrorCode::Internal("Query not exists.")),
911+
Some(coordinator) => {
912+
Ok(params.take_flight_receiver_if_exists(&mut coordinator.flight_data_receivers))
913+
}
914+
}
915+
}
916+
917+
pub fn get_flight_receiver_if_exists(
918+
&self,
919+
params: &ShuffleExchangeParams,
920+
) -> Result<Vec<FlightReceiver>> {
921+
let queries_coordinator_guard = self.queries_coordinator.lock();
922+
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
923+
924+
match queries_coordinator.get_mut(&params.query_id) {
925+
None => Err(ErrorCode::Internal("Query not exists.")),
926+
Some(coordinator) => {
927+
Ok(params.take_flight_receiver_if_exists(&mut coordinator.flight_data_receivers))
928+
}
929+
}
930+
}
931+
902932
pub fn get_fragment_source(
903933
&self,
904934
query_id: &str,

src/query/service/src/servers/flight/v1/exchange/exchange_params.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ impl MergeExchangeParams {
152152

153153
Ok(receivers)
154154
}
155+
156+
pub fn take_flight_receiver_if_exists(
157+
&self,
158+
receivers: &mut HashMap<String, Vec<FlightReceiver>>,
159+
) -> Vec<FlightReceiver> {
160+
receivers.remove(&self.channel_id).unwrap_or_default()
161+
}
155162
}
156163

157164
impl BroadcastExchangeParams {
@@ -251,15 +258,36 @@ impl ShuffleExchangeParams {
251258
for (destination, channels) in &self.destination_channels {
252259
if destination == &self.executor_id {
253260
for channel in channels {
254-
// Local shuffle data stays in-process. Missing receivers here only
255-
// means no remote executor targets this local destination.
261+
let Some(receivers) = receivers.remove(channel) else {
262+
return Err(ErrorCode::UnknownFragmentExchange(format!(
263+
"Unknown fragment flight receiver, {}, {}",
264+
self.executor_id, self.fragment_id
265+
)));
266+
};
267+
exchanges.extend(receivers.into_iter());
268+
}
269+
}
270+
}
271+
272+
Ok(exchanges)
273+
}
274+
275+
pub fn take_flight_receiver_if_exists(
276+
&self,
277+
receivers: &mut HashMap<String, Vec<FlightReceiver>>,
278+
) -> Vec<FlightReceiver> {
279+
let mut exchanges = Vec::with_capacity(self.destination_channels.len());
280+
281+
for (destination, channels) in &self.destination_channels {
282+
if destination == &self.executor_id {
283+
for channel in channels {
256284
if let Some(receivers) = receivers.remove(channel) {
257285
exchanges.extend(receivers.into_iter());
258286
}
259287
}
260288
}
261289
}
262290

263-
Ok(exchanges)
291+
exchanges
264292
}
265293
}

src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,16 @@ use std::sync::Arc;
1717
use databend_common_base::runtime::GlobalIORuntime;
1818
use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
20-
use databend_common_expression::BlockMetaInfoDowncast;
21-
use databend_common_expression::DataBlock;
2220
use databend_common_pipeline::core::Pipe;
23-
use databend_common_pipeline::core::PipeItem;
2421
use databend_common_pipeline::core::Pipeline;
25-
use databend_common_pipeline::core::ProcessorPtr;
2622

2723
use super::broadcast_send_sink::BroadcastSendSink;
2824
use super::exchange_params::BroadcastExchangeParams;
2925
use super::exchange_params::ExchangeParams;
3026
use super::exchange_params::GlobalExchangeParams;
3127
use super::exchange_sink_writer::create_writer_item;
32-
use super::exchange_sorting::ExchangeSorting;
33-
use super::exchange_sorting::TransformExchangeSorting;
3428
use super::exchange_transform_shuffle::exchange_shuffle;
3529
use super::hash_send_sink::HashSendSink;
36-
use super::serde::ExchangeSerializeMeta;
3730
use crate::clusters::ClusterHelper;
3831
use crate::servers::flight::v1::exchange::DataExchangeManager;
3932
use crate::servers::flight::v1::network::OutboundChannel;
@@ -65,28 +58,6 @@ impl ExchangeSink {
6558
)));
6659
}
6760

68-
let exchange_injector = &params.exchange_injector;
69-
70-
if !params.ignore_exchange {
71-
let settings = ctx.get_settings();
72-
let compression = settings.get_query_flight_compression()?;
73-
exchange_injector.apply_merge_serializer(params, compression, pipeline)?;
74-
}
75-
76-
if !params.ignore_exchange && exchange_injector.exchange_sorting().is_some() {
77-
let output_len = pipeline.output_len();
78-
let sorting = SinkExchangeSorting::create();
79-
let transform = TransformExchangeSorting::create(output_len, sorting);
80-
81-
let output = transform.get_output();
82-
let inputs = transform.get_inputs();
83-
pipeline.add_pipe(Pipe::create(output_len, 1, vec![PipeItem::create(
84-
ProcessorPtr::create(Box::new(transform)),
85-
inputs,
86-
vec![output],
87-
)]));
88-
}
89-
9061
let exchange_manager = ctx.get_exchange_manager();
9162
let senders = exchange_manager
9263
.get_flight_sender(&ExchangeParams::MergeExchange(params.clone()))?;
@@ -230,30 +201,6 @@ impl ExchangeSink {
230201
}
231202
}
232203

233-
struct SinkExchangeSorting;
234-
235-
impl SinkExchangeSorting {
236-
pub fn create() -> Arc<dyn ExchangeSorting> {
237-
Arc::new(SinkExchangeSorting {})
238-
}
239-
}
240-
241-
impl ExchangeSorting for SinkExchangeSorting {
242-
fn block_number(&self, data_block: &DataBlock) -> Result<isize> {
243-
let block_meta = data_block.get_meta();
244-
let shuffle_meta = block_meta
245-
.and_then(ExchangeSerializeMeta::downcast_ref_from)
246-
.ok_or_else(|| {
247-
ErrorCode::Internal(format!(
248-
"Failed to downcast ExchangeSerializeMeta from BlockMeta: {:?}",
249-
block_meta
250-
))
251-
})?;
252-
253-
Ok(shuffle_meta.block_number)
254-
}
255-
}
256-
257204
/// Build OutboundChannels for broadcast exchange using PingPongExchange.
258205
pub(super) fn build_broadcast_outbound_channels(
259206
params: &BroadcastExchangeParams,

src/query/service/src/servers/flight/v1/exchange/exchange_source.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,17 @@ pub fn via_exchange_source(
6262

6363
let exchange_params = ExchangeParams::MergeExchange(params.clone());
6464
let exchange_manager = ctx.get_exchange_manager();
65-
let flight_receivers = exchange_manager.get_flight_receiver(&exchange_params)?;
65+
let had_local_outputs = pipeline.output_len() > 0;
6666

6767
let last_output_len = pipeline.output_len();
68+
let flight_receivers = if had_local_outputs {
69+
// A locally subscribed merge fragment can legitimately have no remote
70+
// senders; in that case the local in-process output is the entire
71+
// fragment input for the root.
72+
exchange_manager.get_merge_flight_receiver_if_exists(params)?
73+
} else {
74+
exchange_manager.get_flight_receiver(&exchange_params)?
75+
};
6876
let mut items = Vec::with_capacity(last_output_len + flight_receivers.len());
6977

7078
for _index in 0..last_output_len {

src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ impl ExchangeTransform {
103103
}
104104

105105
let mut nodes_source = 0;
106-
let receivers = exchange_manager.get_flight_receiver(&exchange_params)?;
106+
// When the shuffle source fragment also runs locally, the local bucket
107+
// stays in-process via the dummy/resize branch above. In that case it is
108+
// valid for this executor to have no remote receiver registered.
109+
let receivers = exchange_manager.get_flight_receiver_if_exists(params)?;
107110
for receiver in receivers {
108111
nodes_source += 1;
109112
items.push(create_reader_item(receiver));

0 commit comments

Comments
 (0)