Skip to content

Commit d751490

Browse files
author
Rafał Hibner
committed
Use BackpressureCombiner
1 parent dfcd2c7 commit d751490

2 files changed

Lines changed: 23 additions & 59 deletions

File tree

cpp/src/arrow/acero/pipe_node.cc

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,24 @@ PipeSource::PipeSource() {}
236236
Status PipeSource::Initialize(Pipe* pipe) {
237237
if (pipe_) return Status::Invalid("Pipe:" + pipe->PipeName() + " has multiple sinks");
238238
pipe_ = pipe;
239+
backpressure_source_.AddController(pipe);
239240
return Status::OK();
240241
}
241242

242-
void PipeSource::Pause(int32_t counter) { pipe_->Pause(this, counter); }
243-
void PipeSource::Resume(int32_t counter) { pipe_->Resume(this, counter); }
243+
void PipeSource::Pause(int32_t counter) {
244+
auto lock = mutex_.Lock();
245+
if (backpressure_counter < counter) {
246+
backpressure_counter = counter;
247+
backpressure_source_.Pause();
248+
}
249+
}
250+
void PipeSource::Resume(int32_t counter) {
251+
auto lock = mutex_.Lock();
252+
if (backpressure_counter < counter) {
253+
backpressure_counter = counter;
254+
backpressure_source_.Resume();
255+
}
256+
}
244257
Status PipeSource::StopProducing() {
245258
if (pipe_) return pipe_->StopProducing(this);
246259
// stopped before initialization
@@ -264,67 +277,20 @@ Pipe::Pipe(ExecPlan* plan, std::string pipe_name,
264277
std::unique_ptr<BackpressureControl> ctrl,
265278
std::function<Status()> stopProducing, Ordering ordering, bool pause_on_any,
266279
bool stop_on_any)
267-
: plan_(plan),
280+
: BackpressureCombiner(std::move(ctrl), pause_on_any),
281+
plan_(plan),
268282
ordering_(ordering),
269283
pipe_name_(pipe_name),
270-
ctrl_(std::move(ctrl)),
271284
stopProducing_(stopProducing),
272-
pause_on_any_(pause_on_any),
273285
stop_on_any_(stop_on_any) {}
274286

275287
const Ordering& Pipe::ordering() const { return ordering_; }
276288

277-
void Pipe::Pause(PipeSource* output, int counter) {
278-
auto lock = mutex_.Lock();
279-
auto& state = state_[output];
280-
if (state.backpressure_counter < counter) {
281-
state.backpressure_counter = counter;
282-
if (!state.paused && !state.stopped) {
283-
state.paused = true;
284-
size_t paused_count = ++paused_count_;
285-
if (pause_on_any_) {
286-
if (paused_count == 1) {
287-
ctrl_->Pause();
288-
}
289-
} else {
290-
if (paused_count == CountSources() - stopped_count_) {
291-
ctrl_->Pause();
292-
}
293-
}
294-
}
295-
}
296-
}
297-
298-
void Pipe::Resume(PipeSource* output, int counter) {
299-
auto lock = mutex_.Lock();
300-
auto& state = state_[output];
301-
if (state.backpressure_counter < counter) {
302-
state.backpressure_counter = counter;
303-
DoResume(state);
304-
}
305-
}
306-
307-
void Pipe::DoResume(SourceState& state) {
308-
if (state.paused && !state.stopped) {
309-
state.paused = false;
310-
size_t paused_count = --paused_count_;
311-
if (pause_on_any_) {
312-
if (paused_count == 0) {
313-
ctrl_->Resume();
314-
}
315-
} else {
316-
if (paused_count == CountSources() - stopped_count_ - 1) {
317-
ctrl_->Resume();
318-
}
319-
}
320-
}
321-
}
322-
323289
Status Pipe::StopProducing(PipeSource* output) {
324290
auto lock = mutex_.Lock();
325291
auto& state = state_[output];
326292
DCHECK(!state.stopped);
327-
DoResume(state);
293+
BackpressureCombiner::Stop();
328294
state.stopped = true;
329295
size_t stopped_count = ++stopped_count_;
330296
if (stop_on_any_) {

cpp/src/arrow/acero/pipe_node.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919
#include <string>
20+
#include "arrow/acero/backpressure.h"
2021
#include "arrow/acero/exec_plan.h"
2122
#include "arrow/acero/options.h"
2223
#include "arrow/acero/visibility.h"
@@ -47,12 +48,15 @@ class PipeSource {
4748
virtual Status HandleInputFinished(int total_batches) = 0;
4849

4950
Pipe* pipe_{nullptr};
51+
BackpressureCombiner::Source backpressure_source_;
52+
Mutex mutex_;
53+
int backpressure_counter{0};
5054
};
5155

5256
/// @brief Provides pipe like infastructure for Acero. It isa center element for
5357
/// pipe_sink/pipe_tee and pipe_source infrastructure. Can also be used to create
5458
/// auxiliarty outputs(pipes) for ExecNodes.
55-
class ARROW_ACERO_EXPORT Pipe {
59+
class ARROW_ACERO_EXPORT Pipe : public BackpressureCombiner {
5660
public:
5761
Pipe(ExecPlan* plan, std::string pipe_name, std::unique_ptr<BackpressureControl> ctrl,
5862
std::function<Status()> stopProducing, Ordering ordering = Ordering::Unordered(),
@@ -96,16 +100,13 @@ class ARROW_ACERO_EXPORT Pipe {
96100
friend class PipeSource;
97101

98102
struct SourceState {
99-
bool paused{false};
100103
bool stopped{false};
101-
int backpressure_counter{0};
102104
};
103105

104106
// Backpresurre interface for PipeSource
105107
void Pause(PipeSource* output, int counter);
106108
// Backpresurre interface for PipeSource
107109
void Resume(PipeSource* output, int counter);
108-
void DoResume(SourceState& state);
109110
//
110111
Status StopProducing(PipeSource* output);
111112

@@ -118,13 +119,10 @@ class ARROW_ACERO_EXPORT Pipe {
118119
// backpressure
119120
std::unordered_map<PipeSource*, SourceState> state_;
120121
Mutex mutex_;
121-
std::atomic_size_t paused_count_{0};
122-
std::unique_ptr<BackpressureControl> ctrl_;
123122
// stopProducing
124123
std::atomic_size_t stopped_count_{0};
125124
std::function<Status()> stopProducing_;
126125

127-
const bool pause_on_any_;
128126
const bool stop_on_any_;
129127
};
130128

0 commit comments

Comments
 (0)