Skip to content

Commit 8826f6e

Browse files
author
Rafał Hibner
committed
Merge branch 'asof_join_pause' into combined2
2 parents fb07302 + 5109d5b commit 8826f6e

2 files changed

Lines changed: 163 additions & 2 deletions

File tree

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,7 @@ class AsofJoinNode : public ExecNode {
10671067

10681068
// Process batches while we have data
10691069
for (;;) {
1070+
backpressure_future_.Wait();
10701071
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
10711072

10721073
if (result.ok()) {
@@ -1106,7 +1107,6 @@ class AsofJoinNode : public ExecNode {
11061107
EndFromProcessThread();
11071108
return;
11081109
}
1109-
backpressure_future_.Wait();
11101110
if (!Process()) {
11111111
return;
11121112
}

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "arrow/compute/cast.h"
4545
#include "arrow/compute/row/row_encoder_internal.h"
4646
#include "arrow/compute/test_util_internal.h"
47-
#include "arrow/testing/generator.h"
47+
#include "arrow/io/util_internal.h"
4848
#include "arrow/testing/gtest_util.h"
4949
#include "arrow/testing/matchers.h"
5050
#include "arrow/testing/random.h"
@@ -1545,6 +1545,167 @@ TEST(AsofJoinTest, BackpressureWithBatches) {
15451545
/*num_r0_batches=*/50, /*num_r1_batches=*/20, /*slow_r0=*/true);
15461546
}
15471547

1548+
TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
1549+
int batch_size = 1;
1550+
auto make_shift = [batch_size](int num_batches, const std::shared_ptr<Schema>& schema,
1551+
int shift) {
1552+
return MakeIntegerBatches(
1553+
{[](int row) -> int64_t { return row; },
1554+
[num_batches](int row) -> int64_t { return row / num_batches; },
1555+
[shift](int row) -> int64_t { return row * 10 + shift; }},
1556+
schema, num_batches, batch_size);
1557+
};
1558+
auto l_schema =
1559+
schema({field("time", int64()), field("key", int64()), field("l_value", int64())});
1560+
auto r_schema =
1561+
schema({field("time", int64()), field("key", int64()), field("r0_value", int64())});
1562+
1563+
auto output_schema =
1564+
schema({field("time", int64()), field("key", int64()), field("l_value", int64()),
1565+
field("key", int64()), field("r0_value", int64())});
1566+
1567+
ASSERT_OK_AND_ASSIGN(auto out_batch,
1568+
MakeIntegerBatches({[](int row) -> int64_t { return row; },
1569+
[](int row) -> int64_t { return row; },
1570+
[](int row) -> int64_t { return row / 20; },
1571+
[](int row) -> int64_t { return row / 20; },
1572+
[](int row) -> int64_t { return row * 10; }},
1573+
output_schema, 20, batch_size))
1574+
1575+
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(50, l_schema, 2));
1576+
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(50, r_schema, 1));
1577+
std::optional<ExecBatch> out = out_batch.batches[0];
1578+
1579+
constexpr uint32_t thresholdOfBackpressureAsof = 8;
1580+
constexpr uint32_t thresholdOfBackpressureAsofLow = 4;
1581+
1582+
EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
1583+
PushGenerator<std::optional<ExecBatch>> batch_producer_left;
1584+
PushGenerator<std::optional<ExecBatch>> batch_producer_right;
1585+
1586+
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
1587+
BackpressureMonitor* backpressure_monitor;
1588+
BackpressureOptions backpressure_options(1, 2);
1589+
std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
1590+
1591+
BackpressureCountingNode::Register();
1592+
1593+
Declaration left{"source", SourceNodeOptions(l_schema, batch_producer_left)};
1594+
Declaration right{"source", SourceNodeOptions(r_schema, batch_producer_right)};
1595+
AsofJoinNodeOptions asof_join_opts({{{"time"}, {}}, {{"time"}, {}}}, 1);
1596+
1597+
BackpressureCounters bp_countersl, bp_countersr;
1598+
BackpressureCountingNode::Register();
1599+
1600+
Declaration left_count{"backpressure_count",
1601+
{std::move(left)},
1602+
BackpressureCountingNodeOptions(&bp_countersl)};
1603+
1604+
Declaration right_count{"backpressure_count",
1605+
{std::move(right)},
1606+
BackpressureCountingNodeOptions(&bp_countersr)};
1607+
1608+
Declaration asof_join{"asofjoin",
1609+
{std::move(left_count), std::move(right_count)},
1610+
std::move(asof_join_opts)};
1611+
1612+
ARROW_EXPECT_OK(
1613+
acero::Declaration::Sequence(
1614+
{
1615+
std::move(asof_join),
1616+
{"sink", SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
1617+
backpressure_options, &backpressure_monitor}},
1618+
})
1619+
.AddToPlan(plan.get()));
1620+
1621+
ASSERT_TRUE(backpressure_monitor);
1622+
plan->StartProducing();
1623+
auto fut = plan->finished();
1624+
1625+
EXPECT_FALSE(backpressure_monitor->is_paused());
1626+
1627+
auto is_l_paused = [&]() {
1628+
return bp_countersl.pause_count != bp_countersl.resume_count;
1629+
};
1630+
auto is_r_paused = [&]() {
1631+
return bp_countersr.pause_count != bp_countersr.resume_count;
1632+
};
1633+
1634+
// Should be able to push kPauseIfAbove batches without triggering back pressure
1635+
uint32_t l_cnt = 0;
1636+
uint32_t r_cnt = 0;
1637+
// for (uint32_t i = 0; i < 1; i++) {
1638+
EXPECT_FALSE(is_l_paused());
1639+
EXPECT_FALSE(is_r_paused());
1640+
EXPECT_FALSE(backpressure_monitor->is_paused());
1641+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1642+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1643+
// }
1644+
1645+
// One more batch should trigger back pressure
1646+
1647+
BusyWait(60.0, [&]() { return backpressure_monitor->is_paused(); });
1648+
arrow::io::internal::GetIOThreadPool()->WaitForIdle();
1649+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1650+
1651+
EXPECT_FALSE(is_l_paused());
1652+
EXPECT_FALSE(is_r_paused());
1653+
EXPECT_TRUE(backpressure_monitor->is_paused());
1654+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1655+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1656+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1657+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1658+
1659+
EXPECT_FALSE(is_l_paused());
1660+
EXPECT_FALSE(is_r_paused());
1661+
EXPECT_TRUE(backpressure_monitor->is_paused());
1662+
1663+
// Fill up the inputs of the asof join node
1664+
for (uint32_t i = 1; i < thresholdOfBackpressureAsof; i++) {
1665+
SleepABit();
1666+
EXPECT_FALSE(is_l_paused()) << i;
1667+
EXPECT_FALSE(is_r_paused());
1668+
batch_producer_left.producer().Push(l_batches.batches[l_cnt++]);
1669+
batch_producer_right.producer().Push(r0_batches.batches[r_cnt++]);
1670+
}
1671+
1672+
std::optional<ExecBatch> opt_batch;
1673+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1674+
// Read the batches from the sink to open up input of the asof join node
1675+
for (uint32_t i = 0; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1676+
i++) {
1677+
SleepABit();
1678+
EXPECT_TRUE(is_l_paused());
1679+
EXPECT_TRUE(is_r_paused());
1680+
EXPECT_TRUE(backpressure_monitor->is_paused());
1681+
1682+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1683+
EXPECT_TRUE(opt_batch);
1684+
}
1685+
1686+
arrow::internal::GetCpuThreadPool()->WaitForIdle();
1687+
1688+
// Finish the batches in the left and right producers
1689+
for (uint32_t i = 0; i < thresholdOfBackpressureAsofLow + 1; i++) {
1690+
SleepABit();
1691+
EXPECT_FALSE(is_l_paused());
1692+
EXPECT_FALSE(is_r_paused());
1693+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1694+
EXPECT_TRUE(opt_batch);
1695+
}
1696+
1697+
EXPECT_FALSE(is_l_paused());
1698+
EXPECT_FALSE(is_r_paused());
1699+
EXPECT_FALSE(backpressure_monitor->is_paused());
1700+
1701+
batch_producer_left.producer().Push(IterationEnd<std::optional<ExecBatch>>());
1702+
batch_producer_right.producer().Push(IterationEnd<std::optional<ExecBatch>>());
1703+
1704+
ASSERT_FINISHES_OK_AND_ASSIGN(opt_batch, sink_gen());
1705+
EXPECT_FALSE(opt_batch);
1706+
1707+
ASSERT_THAT(fut, Finishes(Ok()));
1708+
}
15481709
template <typename BatchesMaker>
15491710
void TestSequencing(BatchesMaker maker, int num_batches, int batch_size) {
15501711
auto l_schema =

0 commit comments

Comments
 (0)