Skip to content

Commit 8eb25f8

Browse files
committed
fix(exchange): precreate inbound channel sets
Register and lazily create inbound channel sets for do_exchange receivers so remote-only exchange sources can build pipelines before network senders connect. - pre-register incoming exchange fragments during query env init - add get-or-create helpers on DataExchangeManager and QueryCoordinator - switch broadcast/global-shuffle receive paths to use get-or-create Validation: - cargo check -p databend-query --lib - db-slt --cluster --run-dir cluster --run-file subquery.test
1 parent 26b0101 commit 8eb25f8

4 files changed

Lines changed: 125 additions & 25 deletions

File tree

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ impl DataExchangeManager {
249249

250250
let mut request_exchanges = HashMap::new();
251251
let mut targets_exchanges = HashMap::<String, Vec<FlightExchange>>::new();
252+
let mut inbound_channel_sizes = HashMap::<String, usize>::new();
252253

253254
for index in env.dataflow_diagram.node_indices() {
254255
if env.dataflow_diagram[index].id == config.query.node_id {
@@ -297,9 +298,25 @@ impl DataExchangeManager {
297298
})
298299
}));
299300
}
300-
Edge::ExchangeFragment { .. } => {
301-
// Skip: remote sender will call do_exchange on us,
302-
// handled by handle_do_exchange → NetworkInboundSender
301+
Edge::ExchangeFragment {
302+
exchange_id,
303+
channels,
304+
} => {
305+
// Pre-register inbound channel sets so source-only exchange
306+
// receivers can build their pipelines before the first
307+
// do_exchange connection arrives.
308+
match inbound_channel_sizes.entry(exchange_id) {
309+
Entry::Occupied(v) => {
310+
if *v.get() != channels.len() {
311+
return Err(ErrorCode::Internal(
312+
"Mismatched inbound exchange parallelism",
313+
));
314+
}
315+
}
316+
Entry::Vacant(v) => {
317+
v.insert(channels.len());
318+
}
319+
}
303320
}
304321
}
305322
}
@@ -419,6 +436,7 @@ impl DataExchangeManager {
419436
query_coordinator.info = query_info;
420437
query_coordinator.is_request_server =
421438
GlobalConfig::instance().query.node_id == env.request_server_id;
439+
query_coordinator.register_inbound_channel_sets(&inbound_channel_sizes)?;
422440
query_coordinator.register_flight_channel_receiver(targets_exchanges)?;
423441
query_coordinator.register_ping_pong_exchanges(ping_pong_exchanges);
424442
query_coordinator.add_statistics_exchanges(request_exchanges)?;
@@ -428,6 +446,7 @@ impl DataExchangeManager {
428446
query_coordinator.info = query_info;
429447
query_coordinator.is_request_server =
430448
GlobalConfig::instance().query.node_id == env.request_server_id;
449+
query_coordinator.register_inbound_channel_sets(&inbound_channel_sizes)?;
431450
query_coordinator.register_flight_channel_receiver(targets_exchanges)?;
432451
query_coordinator.register_ping_pong_exchanges(ping_pong_exchanges);
433452
query_coordinator.add_statistics_exchanges(request_exchanges)?;
@@ -672,6 +691,25 @@ impl DataExchangeManager {
672691
}
673692
}
674693

694+
pub fn get_or_create_exchange_channel_set(
695+
&self,
696+
query_id: &str,
697+
channel_id: &str,
698+
num_threads: usize,
699+
) -> Result<Arc<NetworkInboundChannelSet>> {
700+
let queries_coordinator_guard = self.queries_coordinator.lock();
701+
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
702+
703+
match queries_coordinator.entry(query_id.to_string()) {
704+
Entry::Occupied(mut v) => v
705+
.get_mut()
706+
.get_or_create_inbound_channel_set(channel_id, num_threads),
707+
Entry::Vacant(v) => v
708+
.insert(QueryCoordinator::create())
709+
.get_or_create_inbound_channel_set(channel_id, num_threads),
710+
}
711+
}
712+
675713
/// Take the PingPongExchanges for a given query and channel.
676714
///
677715
/// Returns the exchanges that were created during init_query_env.
@@ -1034,6 +1072,41 @@ impl QueryCoordinator {
10341072
}
10351073
}
10361074

1075+
pub fn register_inbound_channel_sets(
1076+
&mut self,
1077+
channel_sizes: &HashMap<String, usize>,
1078+
) -> Result<()> {
1079+
for (channel_id, num_threads) in channel_sizes {
1080+
self.get_or_create_inbound_channel_set(channel_id, *num_threads)?;
1081+
}
1082+
1083+
Ok(())
1084+
}
1085+
1086+
pub fn get_or_create_inbound_channel_set(
1087+
&mut self,
1088+
channel_id: &str,
1089+
num_threads: usize,
1090+
) -> Result<Arc<NetworkInboundChannelSet>> {
1091+
match self.inbound_channel_sets.entry(channel_id.to_string()) {
1092+
Entry::Occupied(v) => {
1093+
if v.get().channels.len() != num_threads {
1094+
return Err(ErrorCode::Internal(format!(
1095+
"Mismatched inbound channel set parallelism, channel_id: {}, expected: {}, actual: {}",
1096+
channel_id,
1097+
num_threads,
1098+
v.get().channels.len()
1099+
)));
1100+
}
1101+
1102+
Ok(v.get().clone())
1103+
}
1104+
Entry::Vacant(v) => Ok(v
1105+
.insert(Arc::new(NetworkInboundChannelSet::new(num_threads)))
1106+
.clone()),
1107+
}
1108+
}
1109+
10371110
/// Create a NetworkInboundSender for a new do_exchange connection.
10381111
///
10391112
/// The `num_threads` value is provided by the coordinator via DoExchangeParams.

src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ impl ExchangeSink {
156156
let exchange_id = &params.exchange_id;
157157
let exchange_manager = DataExchangeManager::instance();
158158

159-
let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?;
159+
let channel_set = exchange_manager.get_or_create_exchange_channel_set(
160+
query_id,
161+
exchange_id,
162+
local_threads,
163+
)?;
160164
assert_eq!(channel_set.channels.len(), local_threads);
161165

162166
let local_outbound = create_local_channels(&channel_set);

src/query/service/src/servers/flight/v1/exchange/exchange_source.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,22 @@ fn add_source_pipe(pipeline: &mut Pipeline, source_items: Vec<PipeItem>) {
6262
pipeline.add_pipe(Pipe::create(last_output_len, items.len(), items));
6363
}
6464

65+
fn local_exchange_threads(
66+
executor_id: &str,
67+
destination_channels: &[(String, Vec<String>)],
68+
) -> Result<usize> {
69+
destination_channels
70+
.iter()
71+
.find_map(|(destination, channels)| (destination == executor_id).then_some(channels.len()))
72+
.filter(|threads| *threads > 0)
73+
.ok_or_else(|| {
74+
ErrorCode::Internal(format!(
75+
"Executor {} is not a destination of exchange",
76+
executor_id
77+
))
78+
})
79+
}
80+
6581
/// Add Exchange Source to the pipeline.
6682
pub fn via_exchange_source(
6783
ctx: Arc<QueryContext>,
@@ -117,8 +133,13 @@ pub fn via_hash_exchange_source(
117133
let query_id = &params.query_id;
118134
let exchange_id = &params.exchange_id;
119135
let exchange_manager = DataExchangeManager::instance();
136+
let local_threads = local_exchange_threads(&params.executor_id, &params.destination_channels)?;
120137

121-
let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?;
138+
let channel_set = exchange_manager.get_or_create_exchange_channel_set(
139+
query_id,
140+
exchange_id,
141+
local_threads,
142+
)?;
122143
let waker = pipeline.get_waker();
123144

124145
let num_receivers = channel_set.channels.len();
@@ -137,12 +158,17 @@ pub fn via_hash_exchange_source(
137158
}
138159

139160
pub fn via_broadcast_exchange_source(
161+
_ctx: &Arc<QueryContext>,
140162
params: &BroadcastExchangeParams,
141163
pipeline: &mut Pipeline,
142164
) -> Result<()> {
143165
let exchange_manager = DataExchangeManager::instance();
144-
let channel_set =
145-
exchange_manager.get_exchange_channel_set(&params.query_id, &params.exchange_id)?;
166+
let local_threads = local_exchange_threads(&params.executor_id, &params.destination_channels)?;
167+
let channel_set = exchange_manager.get_or_create_exchange_channel_set(
168+
&params.query_id,
169+
&params.exchange_id,
170+
local_threads,
171+
)?;
146172
let waker = pipeline.get_waker();
147173
let mut source_items = Vec::with_capacity(channel_set.channels.len());
148174

@@ -159,27 +185,15 @@ pub fn via_broadcast_exchange_source(
159185
}
160186

161187
pub fn via_shuffle_exchange_source(
162-
ctx: &Arc<QueryContext>,
188+
_ctx: &Arc<QueryContext>,
163189
params: &ShuffleExchangeParams,
164190
injector: Arc<dyn ExchangeInjector>,
165191
pipeline: &mut Pipeline,
166192
) -> Result<()> {
167-
let local_channels = params
168-
.destination_channels
169-
.iter()
170-
.find_map(|(destination, channels)| {
171-
(destination == &params.executor_id).then_some(channels.len())
172-
})
173-
.unwrap_or_default();
174-
175-
if local_channels == 0 {
176-
return Err(ErrorCode::Internal(format!(
177-
"Executor {} is not a destination of fragment {} exchange",
178-
params.executor_id, params.fragment_id
179-
)));
180-
}
193+
let _local_channels =
194+
local_exchange_threads(&params.executor_id, &params.destination_channels)?;
181195

182-
let exchange_manager = ctx.get_exchange_manager();
196+
let exchange_manager = DataExchangeManager::instance();
183197
let exchange_params = ExchangeParams::NodeShuffleExchange(params.clone());
184198
let flight_receivers = exchange_manager.get_flight_receiver(&exchange_params)?;
185199
let mut source_items = Vec::with_capacity(flight_receivers.len());
@@ -255,6 +269,7 @@ pub fn via_remote_exchange_source(
255269
pipeline,
256270
),
257271
DataExchange::Broadcast(exchange) => via_broadcast_exchange_source(
272+
&ctx,
258273
&create_broadcast_source_params(&ctx, query_id, schema, exchange),
259274
pipeline,
260275
),

src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,11 @@ impl ExchangeTransform {
144144
let exchange_id = &params.exchange_id;
145145
let exchange_manager = DataExchangeManager::instance();
146146

147-
let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?;
147+
let channel_set = exchange_manager.get_or_create_exchange_channel_set(
148+
query_id,
149+
exchange_id,
150+
local_threads,
151+
)?;
148152

149153
assert_eq!(channel_set.channels.len(), local_threads);
150154

@@ -205,7 +209,11 @@ impl ExchangeTransform {
205209
let exchange_id = &params.exchange_id;
206210
let exchange_manager = DataExchangeManager::instance();
207211

208-
let channel_set = exchange_manager.get_exchange_channel_set(query_id, exchange_id)?;
212+
let channel_set = exchange_manager.get_or_create_exchange_channel_set(
213+
query_id,
214+
exchange_id,
215+
local_threads,
216+
)?;
209217
assert_eq!(channel_set.channels.len(), local_threads);
210218

211219
let local_outbound = create_local_channels(&channel_set);

0 commit comments

Comments
 (0)