Skip to content

Commit 43a821f

Browse files
author
Rafał Hibner
committed
Merge branch 'asof_join_pause' into combined2
2 parents 81d1b24 + 51596f5 commit 43a821f

2 files changed

Lines changed: 19 additions & 31 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,28 +1063,28 @@ class AsofJoinNode : public ExecNode {
10631063
// Process batches while we have data
10641064
for (;;) {
10651065
backpressure_future_.Wait();
1066+
Result<std::shared_ptr<RecordBatch>> result;
10661067
{
10671068
std::lock_guard<std::mutex> guard(gate_);
10681069
if (!CheckEnded()) {
10691070
return false;
10701071
}
1071-
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1072-
1073-
if (result.ok()) {
1074-
auto out_rb = *result;
1075-
if (!out_rb) break;
1076-
ExecBatch out_b(*out_rb);
1077-
out_b.index = batches_produced_++;
1078-
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1079-
out_rb->ToString(), DEBUG_MANIP(std::endl));
1080-
Status st = output_->InputReceived(this, std::move(out_b));
1081-
if (!st.ok()) {
1082-
EndFromProcessThread(std::move(st));
1083-
}
1084-
} else {
1085-
EndFromProcessThread(result.status());
1086-
return false;
1072+
result = ProcessInner();
1073+
}
1074+
if (result.ok()) {
1075+
auto out_rb = *result;
1076+
if (!out_rb) break;
1077+
ExecBatch out_b(*out_rb);
1078+
out_b.index = batches_produced_++;
1079+
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1080+
out_rb->ToString(), DEBUG_MANIP(std::endl));
1081+
Status st = output_->InputReceived(this, std::move(out_b));
1082+
if (!st.ok()) {
1083+
EndFromProcessThread(std::move(st));
10871084
}
1085+
} else {
1086+
EndFromProcessThread(result.status());
1087+
return false;
10881088
}
10891089
}
10901090

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,24 +1659,12 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16591659
arrow::io::internal::GetIOThreadPool()->WaitForIdle();
16601660
arrow::internal::GetCpuThreadPool()->WaitForIdle();
16611661

1662-
EXPECT_FALSE(is_l_paused());
1663-
EXPECT_FALSE(is_r_paused());
1664-
EXPECT_TRUE(backpressure_monitor->is_paused());
1665-
1666-
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1667-
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1668-
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1669-
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1670-
1671-
EXPECT_FALSE(is_l_paused());
1672-
EXPECT_FALSE(is_r_paused());
1673-
EXPECT_TRUE(backpressure_monitor->is_paused());
1674-
16751662
// Fill up the inputs of the asof join node
1676-
for (uint32_t i = 1; i < thresholdOfBackpressureAsof; i++) {
1663+
for (uint32_t i = 0; i < thresholdOfBackpressureAsof; i++) {
16771664
SleepABit();
1678-
EXPECT_FALSE(is_l_paused()) << i;
1665+
EXPECT_FALSE(is_l_paused());
16791666
EXPECT_FALSE(is_r_paused());
1667+
EXPECT_TRUE(backpressure_monitor->is_paused());
16801668
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
16811669
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
16821670
}

0 commit comments

Comments
 (0)