Skip to content

Commit 43e5f1e

Browse files
dqhl76zhang2014
andauthored
fix(cluster): close exchange buffer on send error (#19857)
* fix(cluster): close exchange buffer on send error * fix(query): address exchange close clippy issues * fix: special handle local channel --------- Co-authored-by: Winter Zhang <coswde@gmail.com>
1 parent 3bee295 commit 43e5f1e

8 files changed

Lines changed: 568 additions & 124 deletions

File tree

src/query/service/src/servers/flight/v1/exchange/broadcast_send_transform.rs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use databend_common_pipeline::core::Processor;
2727
use databend_common_pipeline::core::ProcessorPtr;
2828
use petgraph::prelude::NodeIndex;
2929

30-
use crate::servers::flight::v1::network::DummyOutboundChannel;
30+
use super::outbound_send_channels::OutboundSendChannels;
31+
use super::outbound_send_channels::OutboundSendHandle;
3132
use crate::servers::flight::v1::network::OutboundChannel;
32-
use crate::servers::flight::v1::network::SyncTaskHandle;
3333
use crate::servers::flight::v1::network::SyncTaskSet;
3434

3535
pub struct BroadcastSendTransform {
@@ -39,8 +39,8 @@ pub struct BroadcastSendTransform {
3939

4040
local_pos: usize,
4141
tasks: SyncTaskSet,
42-
channels: Vec<Arc<dyn OutboundChannel>>,
43-
handle: Option<SyncTaskHandle<'static, Result<Vec<()>>>>,
42+
channels: OutboundSendChannels,
43+
handle: Option<OutboundSendHandle>,
4444
}
4545

4646
impl BroadcastSendTransform {
@@ -54,7 +54,7 @@ impl BroadcastSendTransform {
5454
let output = OutputPort::create();
5555

5656
let processor = ProcessorPtr::create(Box::new(Self {
57-
channels,
57+
channels: OutboundSendChannels::create(channels),
5858
local_pos,
5959
input: input.clone(),
6060
output: output.clone(),
@@ -66,6 +66,10 @@ impl BroadcastSendTransform {
6666

6767
PipeItem::create(processor, vec![input], vec![output])
6868
}
69+
70+
fn no_active_downstream(&self) -> bool {
71+
self.output.is_finished() && self.channels.all_closed_except(self.local_pos)
72+
}
6973
}
7074

7175
impl Processor for BroadcastSendTransform {
@@ -81,9 +85,12 @@ impl Processor for BroadcastSendTransform {
8185
// Poll existing handle
8286
if let Some(mut handle) = self.handle.take() {
8387
match handle.poll(matches!(cause, EventCause::Other)) {
84-
Poll::Ready(Ok(_)) => {}
85-
Poll::Ready(Err(cause)) => {
86-
return Err(cause);
88+
Poll::Ready(results) => {
89+
self.channels.handle_send_results(results)?;
90+
if self.no_active_downstream() {
91+
self.input.finish();
92+
return Ok(Event::Finished);
93+
}
8794
}
8895
Poll::Pending => {
8996
self.handle = Some(handle);
@@ -92,19 +99,21 @@ impl Processor for BroadcastSendTransform {
9299
};
93100
}
94101

95-
if self.output.is_finished() {
96-
if self.channels.iter().all(|ch| ch.is_closed()) {
97-
self.input.finish();
98-
return Ok(Event::Finished);
99-
}
102+
if self.no_active_downstream() {
103+
self.input.finish();
104+
return Ok(Event::Finished);
100105
}
101106

102107
if self.input.has_data() {
103108
let data_block = self.input.pull_data().unwrap()?;
104109

105110
let mut futures = Vec::new();
106111

107-
for (idx, output_channel) in self.channels.iter().enumerate() {
112+
for (idx, output_channel) in self.channels.iter() {
113+
if output_channel.is_closed() {
114+
continue;
115+
}
116+
108117
if idx == self.local_pos {
109118
if self.output.is_finished() {
110119
continue;
@@ -119,20 +128,26 @@ impl Processor for BroadcastSendTransform {
119128
futures.push({
120129
let data_block = data_block.clone();
121130
let output_channel = output_channel.clone();
122-
async move { output_channel.add_block(data_block).await }
131+
async move { (idx, output_channel.add_block(data_block).await) }
123132
});
124133
}
125134

126135
if !futures.is_empty() {
127-
let joined = Box::pin(futures::future::try_join_all(futures));
136+
let joined = Box::pin(futures::future::join_all(futures));
128137
let mut handle = self.tasks.spawn(self.id, joined);
129138

130-
if matches!(
131-
handle.poll(matches!(cause, EventCause::Other)),
132-
Poll::Pending
133-
) {
134-
self.handle = Some(handle);
135-
return Ok(Event::NeedConsume);
139+
match handle.poll(matches!(cause, EventCause::Other)) {
140+
Poll::Ready(results) => {
141+
self.channels.handle_send_results(results)?;
142+
if self.no_active_downstream() {
143+
self.input.finish();
144+
return Ok(Event::Finished);
145+
}
146+
}
147+
Poll::Pending => {
148+
self.handle = Some(handle);
149+
return Ok(Event::NeedConsume);
150+
}
136151
}
137152
}
138153

@@ -145,12 +160,7 @@ impl Processor for BroadcastSendTransform {
145160
if self.input.is_finished() {
146161
self.output.finish();
147162

148-
for idx in 0..self.channels.len() {
149-
let mut closed_channel = DummyOutboundChannel::create();
150-
std::mem::swap(&mut self.channels[idx], &mut closed_channel);
151-
closed_channel.close();
152-
}
153-
163+
self.channels.close_all();
154164
return Ok(Event::Finished);
155165
}
156166

@@ -164,7 +174,7 @@ impl Processor for BroadcastSendTransform {
164174
self.handle.is_some(),
165175
self.channels
166176
.iter()
167-
.map(|x| x.is_closed())
177+
.map(|(_, x)| x.is_closed())
168178
.collect::<Vec<_>>()
169179
))
170180
}

src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use databend_common_pipeline::core::Processor;
2727
use databend_common_pipeline::core::ProcessorPtr;
2828
use petgraph::graph::NodeIndex;
2929

30-
use crate::servers::flight::v1::network::DummyOutboundChannel;
30+
use super::outbound_send_channels::OutboundSendChannels;
31+
use super::outbound_send_channels::OutboundSendHandle;
3132
use crate::servers::flight::v1::network::OutboundChannel;
32-
use crate::servers::flight::v1::network::SyncTaskHandle;
3333
use crate::servers::flight::v1::network::SyncTaskSet;
3434
use crate::servers::flight::v1::scatter::FlightScatter;
3535

@@ -39,8 +39,8 @@ pub struct HashSendSink {
3939
scatter: Arc<Box<dyn FlightScatter>>,
4040
partition_stream: BlockPartitionStream,
4141
tasks: SyncTaskSet,
42-
channels: Vec<Arc<dyn OutboundChannel>>,
43-
handle: Option<SyncTaskHandle<'static, Result<Vec<()>>>>,
42+
channels: OutboundSendChannels,
43+
handle: Option<OutboundSendHandle>,
4444
}
4545

4646
impl HashSendSink {
@@ -54,6 +54,7 @@ impl HashSendSink {
5454
) -> PipeItem {
5555
let input = InputPort::create();
5656
let scatter_size = channels.len();
57+
let channels = OutboundSendChannels::create(channels);
5758
let processor = ProcessorPtr::create(Box::new(Self {
5859
scatter,
5960
channels,
@@ -85,8 +86,13 @@ impl Processor for HashSendSink {
8586
// Poll existing handle
8687
if let Some(mut handle) = self.handle.take() {
8788
match handle.poll(matches!(cause, EventCause::Other)) {
88-
Poll::Ready(Ok(_)) => {}
89-
Poll::Ready(Err(cause)) => return Err(cause),
89+
Poll::Ready(results) => {
90+
self.channels.handle_send_results(results)?;
91+
if self.channels.all_closed() {
92+
self.input.finish();
93+
return Ok(Event::Finished);
94+
}
95+
}
9096
Poll::Pending => {
9197
self.handle = Some(handle);
9298
return Ok(Event::NeedConsume);
@@ -106,20 +112,32 @@ impl Processor for HashSendSink {
106112
if block.is_empty() {
107113
continue;
108114
}
115+
if self.channels.is_closed(partition_id) {
116+
continue;
117+
}
109118

110119
futures.push({
111-
let channel = self.channels[partition_id].clone();
112-
async move { channel.add_block(block).await }
120+
let channel = self.channels.channel(partition_id).clone();
121+
async move { (partition_id, channel.add_block(block).await) }
113122
});
114123
}
115124

116125
if !futures.is_empty() {
117-
let joined = Box::pin(futures::future::try_join_all(futures));
126+
let joined = Box::pin(futures::future::join_all(futures));
118127
let mut handle = self.tasks.spawn(self.id, joined);
119128

120-
if matches!(handle.poll(true), Poll::Pending) {
121-
self.handle = Some(handle);
122-
return Ok(Event::NeedConsume);
129+
match handle.poll(true) {
130+
Poll::Ready(results) => {
131+
self.channels.handle_send_results(results)?;
132+
if self.channels.all_closed() {
133+
self.input.finish();
134+
return Ok(Event::Finished);
135+
}
136+
}
137+
Poll::Pending => {
138+
self.handle = Some(handle);
139+
return Ok(Event::NeedConsume);
140+
}
123141
}
124142
}
125143
}
@@ -129,42 +147,39 @@ impl Processor for HashSendSink {
129147
let mut futures = Vec::new();
130148

131149
for partition_id in 0..self.channels.len() {
150+
if self.channels.is_closed(partition_id) {
151+
continue;
152+
}
153+
132154
if let Some(block) = self.partition_stream.finalize_partition(partition_id) {
133155
if block.is_empty() {
134156
continue;
135157
}
136158

137159
futures.push({
138-
let channel = self.channels[partition_id].clone();
139-
async move { channel.add_block(block).await }
160+
let channel = self.channels.channel(partition_id).clone();
161+
async move { (partition_id, channel.add_block(block).await) }
140162
});
141163
}
142164
}
143165

144166
if futures.is_empty() {
145-
for idx in 0..self.channels.len() {
146-
let mut closed = DummyOutboundChannel::create();
147-
std::mem::swap(&mut self.channels[idx], &mut closed);
148-
closed.close();
149-
}
150-
167+
self.channels.close_all();
151168
return Ok(Event::Finished);
152169
}
153170

154-
let joined = Box::pin(futures::future::try_join_all(futures));
171+
let joined = Box::pin(futures::future::join_all(futures));
155172
let mut handle = self.tasks.spawn(self.id, joined);
156173

157-
if matches!(handle.poll(true), Poll::Pending) {
158-
self.handle = Some(handle);
159-
return Ok(Event::NeedConsume);
160-
}
161-
162-
for idx in 0..self.channels.len() {
163-
let mut closed = DummyOutboundChannel::create();
164-
std::mem::swap(&mut self.channels[idx], &mut closed);
165-
closed.close();
174+
match handle.poll(true) {
175+
Poll::Ready(results) => self.channels.handle_send_results(results)?,
176+
Poll::Pending => {
177+
self.handle = Some(handle);
178+
return Ok(Event::NeedConsume);
179+
}
166180
}
167181

182+
self.channels.close_all();
168183
return Ok(Event::Finished);
169184
}
170185

0 commit comments

Comments
 (0)