Skip to content

Commit c9de809

Browse files
author
Rafał Hibner
committed
Merge branch 'BackpressureCombiner' into combined2
2 parents ee714bb + 935aaee commit c9de809

4 files changed

Lines changed: 56 additions & 2 deletions

File tree

cpp/src/arrow/acero/backpressure.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ void BackpressureCombiner::Resume(Source* output, bool strong_connection) {
6464

6565
void BackpressureCombiner::UpdatePauseStateUnlocked() {
6666
bool should_be_paused =
67-
strong_paused_count_ > 0 || weak_paused_count_ == weak_paused_.size();
67+
strong_paused_count_ > 0 ||
68+
(weak_paused_count_ > 0 && weak_paused_count_ == weak_paused_.size());
6869
if (should_be_paused) {
6970
if (!paused) {
7071
backpressure_control_->Pause();

cpp/src/arrow/acero/backpressure.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
namespace arrow::acero {
2323

2424
// Generic backpressure controller for ExecNode
25-
class BackpressureController : public BackpressureControl {
25+
class ARROW_ACERO_EXPORT BackpressureController : public BackpressureControl {
2626
public:
2727
BackpressureController(ExecNode* node, ExecNode* output,
2828
std::atomic<int32_t>& backpressure_counter);
@@ -36,6 +36,18 @@ class BackpressureController : public BackpressureControl {
3636
std::atomic<int32_t>& backpressure_counter_;
3737
};
3838

39+
template <typename T>
40+
class ARROW_ACERO_EXPORT BackpressureControlWrapper : public BackpressureControl {
41+
public:
42+
explicit BackpressureControlWrapper(T* obj) : obj_(obj) {}
43+
44+
void Pause() override { obj_->Pause(); }
45+
void Resume() override { obj_->Resume(); }
46+
47+
private:
48+
T* obj_;
49+
};
50+
3951
// Provides infrastructure of combining multiple backpressure sources and propagate the
4052
// result into BackpressureControl There are two types of Source: strong - pause on any
4153
// strong Source within controller

cpp/src/arrow/acero/backpressure_test.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,44 @@ TEST(BackpressureCombiner, Basic) {
8181
ASSERT_FALSE(paused);
8282
}
8383

84+
TEST(BackpressureCombiner, OnlyStrong) {
85+
std::atomic<bool> paused{false};
86+
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
87+
BackpressureCombiner::Source strong_source1(&combiner);
88+
BackpressureCombiner::Source strong_source2;
89+
strong_source2.AddController(&combiner);
90+
91+
// Any strong causes pause
92+
ASSERT_FALSE(paused);
93+
strong_source1.Pause();
94+
ASSERT_TRUE(paused);
95+
strong_source2.Pause();
96+
ASSERT_TRUE(paused);
97+
strong_source1.Resume();
98+
ASSERT_TRUE(paused);
99+
strong_source2.Resume();
100+
ASSERT_FALSE(paused);
101+
}
102+
103+
TEST(BackpressureCombiner, OnlyWeak) {
104+
std::atomic<bool> paused{false};
105+
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
106+
107+
BackpressureCombiner::Source weak_source1(&combiner, false);
108+
BackpressureCombiner::Source weak_source2;
109+
weak_source2.AddController(&combiner, false);
110+
111+
// All weak cause pause
112+
ASSERT_FALSE(paused);
113+
weak_source1.Pause();
114+
ASSERT_FALSE(paused);
115+
weak_source2.Pause();
116+
ASSERT_TRUE(paused);
117+
weak_source1.Resume();
118+
ASSERT_FALSE(paused);
119+
weak_source2.Resume();
120+
ASSERT_FALSE(paused);
121+
}
122+
84123
} // namespace acero
85124
} // namespace arrow

cpp/src/arrow/acero/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ arrow_acero_srcs = [
5151
'groupby_aggregate_node.cc',
5252
'aggregate_internal.cc',
5353
'asof_join_node.cc',
54+
'backpressure.cc',
5455
'bloom_filter.cc',
5556
'exec_plan.cc',
5657
'fetch_node.cc',
@@ -90,6 +91,7 @@ arrow_acero_dep = declare_dependency(link_with: [arrow_acero_lib])
9091
arrow_acero_testing_sources = ['test_nodes.cc', 'test_util_internal.cc']
9192

9293
arrow_acero_tests = {
94+
'backpressure-test': {'sources': ['backpressure_test.cc']},
9395
'plan-test': {'sources': ['plan_test.cc', 'test_nodes_test.cc']},
9496
'source-node-test': {'sources': ['source_node_test.cc']},
9597
'fetch-node-test': {'sources': ['fetch_node_test.cc']},

0 commit comments

Comments
 (0)