@@ -1413,15 +1413,28 @@ struct BackpressureCountingNode : public MapNode {
14131413 Result<ExecBatch> ProcessBatch (ExecBatch batch) override { return batch; }
14141414
14151415 void PauseProducing (ExecNode* output, int32_t counter) override {
1416- ++counters->pause_count ;
1416+ std::lock_guard<std::mutex> lg (mutex_);
1417+ if (counter > backpressure_counter_) {
1418+ backpressure_counter_ = counter;
1419+ if (!paused) ++counters->pause_count ;
1420+ paused = true ;
1421+ }
14171422 inputs ()[0 ]->PauseProducing (this , counter);
14181423 }
14191424 void ResumeProducing (ExecNode* output, int32_t counter) override {
1420- ++counters->resume_count ;
1425+ std::lock_guard<std::mutex> lg (mutex_);
1426+ if (counter > backpressure_counter_) {
1427+ backpressure_counter_ = counter;
1428+ if (paused) ++counters->resume_count ;
1429+ paused = false ;
1430+ }
14211431 inputs ()[0 ]->ResumeProducing (this , counter);
14221432 }
14231433
14241434 BackpressureCounters* counters;
1435+ std::mutex mutex_;
1436+ std::atomic<int32_t > backpressure_counter_{0 };
1437+ bool paused{false };
14251438};
14261439
14271440AsyncGenerator<std::optional<ExecBatch>> GetGen (
@@ -1586,7 +1599,6 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
15861599 std::optional<ExecBatch> out = out_batch.batches [0 ];
15871600
15881601 constexpr uint32_t thresholdOfBackpressureAsof = 8 ;
1589- constexpr uint32_t thresholdOfBackpressureAsofLow = 4 ;
15901602
15911603 EXPECT_OK_AND_ASSIGN (std::shared_ptr<ExecPlan> plan, ExecPlan::Make ());
15921604 PushGenerator<std::optional<ExecBatch>> batch_producer_left;
@@ -1601,7 +1613,7 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16011613
16021614 Declaration left{" source" , SourceNodeOptions (l_schema, batch_producer_left)};
16031615 Declaration right{" source" , SourceNodeOptions (r_schema, batch_producer_right)};
1604- AsofJoinNodeOptions asof_join_opts ({{{" time" }, {}}, {{" time" }, {}}}, 1 );
1616+ AsofJoinNodeOptions asof_join_opts ({{{" time" }, {}}, {{" time" }, {}}}, 0 );
16051617
16061618 BackpressureCounters bp_countersl, bp_countersr;
16071619 BackpressureCountingNode::Register ();
@@ -1643,22 +1655,22 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16431655 // Should be able to push kPauseIfAbove batches without triggering back pressure
16441656 uint32_t l_cnt = 0 ;
16451657 uint32_t r_cnt = 0 ;
1646- // for (uint32_t i = 0; i < 1; i++) {
1658+
16471659 EXPECT_FALSE (is_l_paused ());
16481660 EXPECT_FALSE (is_r_paused ());
16491661 EXPECT_FALSE (backpressure_monitor->is_paused ());
16501662 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16511663 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
16521664
16531665 // this should trigger pause on sink
1654-
16551666 BusyWait (3.0 , [&]() { return backpressure_monitor->is_paused (); });
16561667 arrow::io::internal::GetIOThreadPool ()->WaitForIdle ();
16571668 arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
16581669
16591670 EXPECT_FALSE (is_l_paused ());
16601671 EXPECT_FALSE (is_r_paused ());
16611672 EXPECT_TRUE (backpressure_monitor->is_paused ());
1673+
16621674 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16631675 arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
16641676 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
@@ -1685,27 +1697,24 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16851697 EXPECT_TRUE (is_l_paused ());
16861698 EXPECT_TRUE (is_r_paused ());
16871699
1688- batch_producer_left.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1689- batch_producer_right.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
1690-
16911700 std::optional<ExecBatch> opt_batch;
16921701
1693- for (uint32_t i = 1 ; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
1694- i++) {
1702+ do {
16951703 ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
1696- EXPECT_TRUE (opt_batch);
1697- }
1704+ ASSERT_TRUE (opt_batch);
1705+ l_cnt -= opt_batch->length ;
1706+ } while (l_cnt);
1707+
16981708 BusyWait (3.0 , [&]() { return !is_l_paused (); });
16991709 BusyWait (3.0 , [&]() { return !is_r_paused (); });
17001710 arrow::io::internal::GetIOThreadPool ()->WaitForIdle ();
17011711 arrow::internal::GetCpuThreadPool ()->WaitForIdle ();
1702-
17031712 EXPECT_FALSE (is_l_paused ());
17041713 EXPECT_FALSE (is_r_paused ());
17051714 EXPECT_FALSE (backpressure_monitor->is_paused ());
17061715
1707- ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
1708- EXPECT_FALSE (opt_batch );
1716+ batch_producer_left. producer (). Push (IterationEnd<std::optional<ExecBatch>> ());
1717+ batch_producer_right. producer (). Push (IterationEnd<std::optional<ExecBatch>>() );
17091718
17101719 ASSERT_THAT (fut, Finishes (Ok ()));
17111720}
0 commit comments