Skip to content

Commit 81d1b24

Browse files
author
Rafał Hibner
committed
Merge branch 'asof_join_pause' into combined2
2 parents 2eba454 + 208f61d commit 81d1b24

4 files changed

Lines changed: 64 additions & 58 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,30 +1060,31 @@ class AsofJoinNode : public ExecNode {
10601060
}
10611061

10621062
bool Process() {
1063-
std::lock_guard<std::mutex> guard(gate_);
1064-
if (!CheckEnded()) {
1065-
return false;
1066-
}
1067-
10681063
// Process batches while we have data
10691064
for (;;) {
10701065
backpressure_future_.Wait();
1071-
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072-
1073-
if (result.ok()) {
1074-
auto out_rb = *result;
1075-
if (!out_rb) break;
1076-
ExecBatch out_b(*out_rb);
1077-
out_b.index = batches_produced_++;
1078-
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079-
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080-
Status st = output_->InputReceived(this, std::move(out_b));
1081-
if (!st.ok()) {
1082-
EndFromProcessThread(std::move(st));
1066+
{
1067+
std::lock_guard<std::mutex> guard(gate_);
1068+
if (!CheckEnded()) {
1069+
return false;
1070+
}
1071+
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072+
1073+
if (result.ok()) {
1074+
auto out_rb = *result;
1075+
if (!out_rb) break;
1076+
ExecBatch out_b(*out_rb);
1077+
out_b.index = batches_produced_++;
1078+
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079+
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080+
Status st = output_->InputReceived(this, std::move(out_b));
1081+
if (!st.ok()) {
1082+
EndFromProcessThread(std::move(st));
1083+
}
1084+
} else {
1085+
EndFromProcessThread(result.status());
1086+
return false;
10831087
}
1084-
} else {
1085-
EndFromProcessThread(result.status());
1086-
return false;
10871088
}
10881089
}
10891090

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "arrow/compute/row/row_encoder_internal.h"
4646
#include "arrow/compute/test_util_internal.h"
4747
#include "arrow/io/util_internal.h"
48+
#include "arrow/testing/generator.h"
4849
#include "arrow/testing/gtest_util.h"
4950
#include "arrow/testing/matchers.h"
5051
#include "arrow/testing/random.h"
@@ -1404,15 +1405,28 @@ struct BackpressureCountingNode : public MapNode {
14041405
Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
14051406

14061407
void PauseProducing(ExecNode* output, int32_t counter) override {
1407-
++counters->pause_count;
1408+
std::lock_guard<std::mutex> lg(mutex_);
1409+
if (counter > backpressure_counter_) {
1410+
backpressure_counter_ = counter;
1411+
if (!paused) ++counters->pause_count;
1412+
paused = true;
1413+
}
14081414
inputs()[0]->PauseProducing(this, counter);
14091415
}
14101416
void ResumeProducing(ExecNode* output, int32_t counter) override {
1411-
++counters->resume_count;
1417+
std::lock_guard<std::mutex> lg(mutex_);
1418+
if (counter > backpressure_counter_) {
1419+
backpressure_counter_ = counter;
1420+
if (paused) ++counters->resume_count;
1421+
paused = false;
1422+
}
14121423
inputs()[0]->ResumeProducing(this, counter);
14131424
}
14141425

14151426
BackpressureCounters* counters;
1427+
std::mutex mutex_;
1428+
std::atomic<int32_t> backpressure_counter_{0};
1429+
bool paused{false};
14161430
};
14171431

14181432
AsyncGenerator<std::optional<ExecBatch>> GetGen(
@@ -1577,7 +1591,6 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
15771591
std::optional<ExecBatch> out = out_batch.batches[0];
15781592

15791593
constexpr uint32_t thresholdOfBackpressureAsof = 8;
1580-
constexpr uint32_t thresholdOfBackpressureAsofLow = 4;
15811594

15821595
EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
15831596
PushGenerator<std::optional<ExecBatch>> batch_producer_left;
@@ -1592,7 +1605,7 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
15921605

15931606
Declaration left{"source", SourceNodeOptions(l_schema, batch_producer_left)};
15941607
Declaration right{"source", SourceNodeOptions(r_schema, batch_producer_right)};
1595-
AsofJoinNodeOptions asof_join_opts({{{"time"}, {}}, {{"time"}, {}}}, 1);
1608+
AsofJoinNodeOptions asof_join_opts({{{"time"}, {}}, {{"time"}, {}}}, 0);
15961609

15971610
BackpressureCounters bp_countersl, bp_countersr;
15981611
BackpressureCountingNode::Register();
@@ -1634,23 +1647,22 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16341647
// Should be able to push kPauseIfAbove batches without triggering back pressure
16351648
uint32_t l_cnt = 0;
16361649
uint32_t r_cnt = 0;
1637-
// for (uint32_t i = 0; i < 1; i++) {
1650+
16381651
EXPECT_FALSE(is_l_paused());
16391652
EXPECT_FALSE(is_r_paused());
16401653
EXPECT_FALSE(backpressure_monitor->is_paused());
16411654
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
16421655
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1643-
// }
16441656

1645-
// One more batch should trigger back pressure
1646-
1647-
BusyWait(60.0, [&]() { return backpressure_monitor->is_paused(); });
1657+
// this should trigger pause on sink
1658+
BusyWait(3.0, [&]() { return backpressure_monitor->is_paused(); });
16481659
arrow::io::internal::GetIOThreadPool()->WaitForIdle();
16491660
arrow::internal::GetCpuThreadPool()->WaitForIdle();
16501661

16511662
EXPECT_FALSE(is_l_paused());
16521663
EXPECT_FALSE(is_r_paused());
16531664
EXPECT_TRUE(backpressure_monitor->is_paused());
1665+
16541666
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
16551667
arrow::internal::GetCpuThreadPool()->WaitForIdle();
16561668
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
@@ -1669,41 +1681,33 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16691681
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
16701682
}
16711683

1672-
std::optional<ExecBatch> opt_batch;
1684+
BusyWait(3.0, is_l_paused);
1685+
BusyWait(3.0, is_r_paused);
1686+
arrow::io::internal::GetIOThreadPool()->WaitForIdle();
16731687
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1674-
// Read the batches from the sink to open up input of the asof join node
1675-
for (uint32_t i = 0; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1676-
i++) {
1677-
SleepABit();
1678-
EXPECT_TRUE(is_l_paused());
1679-
EXPECT_TRUE(is_r_paused());
1680-
EXPECT_TRUE(backpressure_monitor->is_paused());
1688+
// Verify pause propagates
1689+
EXPECT_TRUE(is_l_paused());
1690+
EXPECT_TRUE(is_r_paused());
16811691

1682-
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1683-
EXPECT_TRUE(opt_batch);
1684-
}
1685-
1686-
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1692+
std::optional<ExecBatch> opt_batch;
16871693

1688-
// Finish the batches in the left and right producers
1689-
for (uint32_t i = 0; i < thresholdOfBackpressureAsofLow + 1; i++) {
1690-
SleepABit();
1691-
EXPECT_FALSE(is_l_paused());
1692-
EXPECT_FALSE(is_r_paused());
1694+
do {
16931695
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1694-
EXPECT_TRUE(opt_batch);
1695-
}
1696+
ASSERT_TRUE(opt_batch);
1697+
l_cnt -= opt_batch->length;
1698+
} while (l_cnt);
16961699

1700+
BusyWait(3.0, [&]() { return !is_l_paused(); });
1701+
BusyWait(3.0, [&]() { return !is_r_paused(); });
1702+
arrow::io::internal::GetIOThreadPool()->WaitForIdle();
1703+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
16971704
EXPECT_FALSE(is_l_paused());
16981705
EXPECT_FALSE(is_r_paused());
16991706
EXPECT_FALSE(backpressure_monitor->is_paused());
17001707

17011708
batch_producer_left.producer().Push(IterationEnd<std::optional<ExecBatch>>());
17021709
batch_producer_right.producer().Push(IterationEnd<std::optional<ExecBatch>>());
17031710

1704-
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1705-
EXPECT_FALSE(opt_batch);
1706-
17071711
ASSERT_THAT(fut, Finishes(Ok()));
17081712
}
17091713
template <typename BatchesMaker>

cpp/src/arrow/acero/options.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,20 +200,20 @@ class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOpt
200200
/// Create an instance from values
201201
RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
202202
arrow::internal::Executor* io_executor = NULLPTR,
203-
bool implicit_ordering=false)
204-
: reader(std::move(reader)), io_executor(io_executor),implicit_ordering(implicit_ordering) {}
203+
bool implicit_ordering = false)
204+
: reader(std::move(reader)),
205+
io_executor(io_executor),
206+
implicit_ordering(implicit_ordering) {}
205207

206208
/// \brief The RecordBatchReader which acts as the data source
207209
std::shared_ptr<RecordBatchReader> reader;
208210

209-
210211
/// \brief The executor to use for the reader
211212
///
212213
/// Defaults to the default I/O executor.
213214
arrow::internal::Executor* io_executor;
214215

215-
216-
bool implicit_ordering{false};
216+
bool implicit_ordering{false};
217217
};
218218

219219
/// a source node that reads from an iterator of array vectors

cpp/src/arrow/acero/source_node.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,8 @@ struct RecordBatchReaderSourceNode : public SourceNode {
416416
const auto& cast_options =
417417
checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
418418
auto& reader = cast_options.reader;
419-
Ordering ordering = cast_options.implicit_ordering?Ordering::Implicit():Ordering::Unordered();
419+
Ordering ordering =
420+
cast_options.implicit_ordering ? Ordering::Implicit() : Ordering::Unordered();
420421
auto io_executor = cast_options.io_executor;
421422

422423
if (reader == nullptr) {

0 commit comments

Comments
 (0)