Skip to content

Commit 7bf6b30

Browse files
author
Rafał Hibner
committed
Move BackpressureController from Asofasof_join_node and sorted_merge_node to common backpressure
1 parent 8b53421 commit 7bf6b30

4 files changed

Lines changed: 29 additions & 30 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#ifndef NDEBUG
4747
# include "arrow/compute/function_internal.h"
4848
#endif
49+
#include "arrow/acero/backpressure.h"
4950
#include "arrow/acero/time_series_util.h"
5051
#include "arrow/compute/key_hash_internal.h"
5152
#include "arrow/compute/light_array_internal.h"
@@ -459,21 +460,6 @@ class KeyHasher {
459460
arrow::util::TempVectorStack stack_;
460461
};
461462

462-
class BackpressureController : public BackpressureControl {
463-
public:
464-
BackpressureController(ExecNode* node, ExecNode* output,
465-
std::atomic<int32_t>& backpressure_counter)
466-
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
467-
468-
void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
469-
void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
470-
471-
private:
472-
ExecNode* node_;
473-
ExecNode* output_;
474-
std::atomic<int32_t>& backpressure_counter_;
475-
};
476-
477463
class InputState : public util::SerialSequencingQueue::Processor {
478464
// InputState corresponds to an input
479465
// Input record batches are queued up in InputState until processed and

cpp/src/arrow/acero/backpressure.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,19 @@
1616
// under the License.
1717

1818
#include "arrow/acero/backpressure.h"
19+
#include "arrow/acero/exec_plan.h"
20+
1921
namespace arrow::acero {
22+
BackpressureController::BackpressureController(ExecNode* node, ExecNode* output,
23+
std::atomic<int32_t>& backpressure_counter)
24+
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
25+
void BackpressureController::Pause() {
26+
node_->PauseProducing(output_, ++backpressure_counter_);
27+
}
28+
void BackpressureController::Resume() {
29+
node_->ResumeProducing(output_, ++backpressure_counter_);
30+
}
31+
2032
BackpressureCombiner::BackpressureCombiner(
2133
std::unique_ptr<BackpressureControl> backpressure_control)
2234
: backpressure_control_(std::move(backpressure_control)) {}

cpp/src/arrow/acero/backpressure.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@
2121
#include <mutex>
2222
namespace arrow::acero {
2323

24+
// Generic backpressure controller for ExecNode
25+
class BackpressureController : public BackpressureControl {
26+
public:
27+
BackpressureController(ExecNode* node, ExecNode* output,
28+
std::atomic<int32_t>& backpressure_counter);
29+
30+
void Pause() override;
31+
void Resume() override;
32+
33+
private:
34+
ExecNode* node_;
35+
ExecNode* output_;
36+
std::atomic<int32_t>& backpressure_counter_;
37+
};
38+
2439
// Provides infrastructure of combining multiple backpressure sources and propagate the
2540
// result into BackpressureControl There are two types of Source: strong - pause on any
2641
// strong Source within controller

cpp/src/arrow/acero/sorted_merge_node.cc

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <unordered_map>
2525
#include <vector>
2626

27+
#include "arrow/acero/backpressure.h"
2728
#include "arrow/acero/concurrent_queue_internal.h"
2829
#include "arrow/acero/exec_plan.h"
2930
#include "arrow/acero/exec_plan_internal.h"
@@ -82,21 +83,6 @@ using col_index_t = int;
8283
constexpr bool kNewTask = true;
8384
constexpr bool kPoisonPill = false;
8485

85-
class BackpressureController : public BackpressureControl {
86-
public:
87-
BackpressureController(ExecNode* node, ExecNode* output,
88-
std::atomic<int32_t>& backpressure_counter)
89-
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
90-
91-
void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
92-
void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
93-
94-
private:
95-
ExecNode* node_;
96-
ExecNode* output_;
97-
std::atomic<int32_t>& backpressure_counter_;
98-
};
99-
10086
/// InputState corresponds to an input. Input record batches are queued up in InputState
10187
/// until processed and turned into output record batches.
10288
class InputState {

0 commit comments

Comments
 (0)