Skip to content

Commit 057ad5b

Browse files
author
Rafał Hibner
committed
Merge branch 'PauseProducingAsof' of https://github.com/mroz45/arrow into asof_join_pause
2 parents 639af09 + 5a7ae6c commit 057ad5b

1 file changed

Lines changed: 62 additions & 45 deletions

File tree

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,7 +1583,9 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
15831583
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(50, r_schema, 1));
15841584
std::optional<ExecBatch> out = out_batch.batches[0];
15851585

1586-
constexpr uint32_t thresholdOfBackpressure = 8;
1586+
constexpr uint32_t thresholdOfBackpressureAsof = 8;
1587+
constexpr uint32_t thresholdOfBackpressureAsofLow = 4;
1588+
15871589
constexpr uint32_t kPauseIfAbove = 4;
15881590
constexpr uint32_t kResumeIfBelow = 2;
15891591
uint32_t pause_if_above_bytes =
@@ -1608,6 +1610,12 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16081610
BackpressureCounters bp_countersl, bp_countersr;
16091611
BackpressureCountingNode::Register();
16101612

1613+
auto wait = [](uint32_t miliseconds) {
1614+
for (size_t i = 0; i < miliseconds; i++) {
1615+
SleepABit();
1616+
}
1617+
};
1618+
16111619
Declaration left_count{"backpressure_count",
16121620
{std::move(left)},
16131621
BackpressureCountingNodeOptions(&bp_countersl)};
@@ -1633,73 +1641,82 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16331641
plan->StartProducing();
16341642
auto fut = plan->finished();
16351643

1636-
ASSERT_FALSE(backpressure_monitor->is_paused());
1644+
EXPECT_FALSE(backpressure_monitor->is_paused());
16371645

1638-
auto has_bp_been_applied = [&] {
1639-
for (size_t i = 0; i < 2; i++) {
1640-
const auto& counters = (i == 0) ? bp_countersl : bp_countersr;
1641-
if (counters.pause_count > 0) return true;
1642-
}
1643-
return false;
1646+
auto is_l_paused = [&]() {
1647+
return bp_countersl.pause_count != bp_countersl.resume_count;
1648+
};
1649+
auto is_r_paused = [&]() {
1650+
return bp_countersr.pause_count != bp_countersr.resume_count;
16441651
};
16451652

16461653
// Should be able to push kPauseIfAbove batches without triggering back pressure
1647-
uint32_t cnt = 0;
1654+
uint32_t l_cnt = 0;
1655+
uint32_t r_cnt = 0;
16481656
for (uint32_t i = 0; i < kPauseIfAbove; i++) {
1649-
batch_producer_left.producer().Push(l_batches.batches[i]);
1650-
batch_producer_right.producer().Push(r0_batches.batches[i]);
1651-
cnt = i;
1657+
wait(10);
1658+
EXPECT_FALSE(backpressure_monitor->is_paused());
1659+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1660+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
16521661
}
1653-
cnt++;
1654-
1655-
SleepABit();
1656-
ASSERT_FALSE(backpressure_monitor->is_paused());
16571662

1663+
wait(10);
1664+
EXPECT_FALSE(backpressure_monitor->is_paused());
16581665
// One more batch should trigger back pressure
1659-
batch_producer_right.producer().Push(r0_batches.batches[cnt]);
1660-
batch_producer_left.producer().Push(l_batches.batches[cnt]);
1666+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1667+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
16611668

1662-
BusyWait(10, [&] { return backpressure_monitor->is_paused(); });
1663-
ASSERT_TRUE(backpressure_monitor->is_paused());
1669+
wait(10);
1670+
EXPECT_TRUE(backpressure_monitor->is_paused());
16641671

16651672
// Fill up the inputs of the asof join node
1666-
cnt++;
1667-
for (uint32_t i = cnt; i < thresholdOfBackpressure + cnt; i++) {
1668-
batch_producer_left.producer().Push(l_batches.batches[i]);
1669-
batch_producer_right.producer().Push(r0_batches.batches[i]);
1673+
for (uint32_t i = 0; i < thresholdOfBackpressureAsof; i++) {
1674+
wait(10);
1675+
EXPECT_FALSE(is_l_paused());
1676+
EXPECT_FALSE(is_r_paused());
1677+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1678+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
16701679
}
16711680

1672-
BusyWait(20.0, has_bp_been_applied);
1673-
ASSERT_TRUE(has_bp_been_applied());
1681+
std::optional<ExecBatch> opt_batch;
16741682

1675-
// Reading as much as we can while keeping it paused
1676-
for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) {
1677-
ASSERT_FINISHES_OK(sink_gen());
1683+
// Read the batches from the sink to open up input of the asof join node
1684+
for (uint32_t i = 0; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1685+
i++) {
1686+
wait(10);
1687+
EXPECT_TRUE(is_l_paused());
1688+
EXPECT_TRUE(is_r_paused());
1689+
EXPECT_TRUE(backpressure_monitor->is_paused());
1690+
1691+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1692+
EXPECT_TRUE(opt_batch);
16781693
}
1679-
SleepABit();
1680-
ASSERT_TRUE(backpressure_monitor->is_paused());
16811694

1682-
// Reading one more item should open up backpressure
1683-
ASSERT_FINISHES_OK(sink_gen());
1684-
BusyWait(10, [&] { return !backpressure_monitor->is_paused(); });
1685-
ASSERT_FALSE(backpressure_monitor->is_paused());
1695+
// Finish the batches in the left and right producers
1696+
for (uint32_t i = 0; i < thresholdOfBackpressureAsofLow + kResumeIfBelow + 2; i++) {
1697+
wait(10);
1698+
EXPECT_FALSE(is_l_paused());
1699+
EXPECT_FALSE(is_r_paused());
1700+
EXPECT_TRUE(backpressure_monitor->is_paused());
16861701

1687-
// Cleanup
1688-
for (size_t i = 0; i < cnt - 3; i++) {
1689-
ASSERT_FINISHES_OK(sink_gen());
1702+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1703+
EXPECT_TRUE(opt_batch);
16901704
}
1705+
wait(10);
1706+
EXPECT_FALSE(is_l_paused());
1707+
EXPECT_FALSE(is_r_paused());
1708+
EXPECT_FALSE(backpressure_monitor->is_paused());
1709+
1710+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1711+
EXPECT_TRUE(opt_batch);
16911712

16921713
batch_producer_left.producer().Push(IterationEnd<std::optional<ExecBatch>>());
16931714
batch_producer_right.producer().Push(IterationEnd<std::optional<ExecBatch>>());
16941715

1695-
//finish gracefully
1696-
ASSERT_TRUE(batch_producer_left.producer().Close());
1697-
ASSERT_TRUE(batch_producer_right.producer().Close());
1716+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1717+
EXPECT_FALSE(opt_batch);
16981718

1699-
ASSERT_TRUE(fut.Wait(kDefaultAssertFinishesWaitSeconds));
1700-
if (!fut.status().ok()) {
1701-
ASSERT_TRUE(fut.status().IsCancelled());
1702-
}
1719+
ASSERT_THAT(fut, Finishes(Ok()));
17031720
}
17041721
template <typename BatchesMaker>
17051722
void TestSequencing(BatchesMaker maker, int num_batches, int batch_size) {

0 commit comments

Comments
 (0)