@@ -1660,7 +1660,7 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16601660 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
16611661 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16621662
1663- SleepABit ( );
1663+ BusyWait ( 5 , [&] { return backpressure_monitor-> is_paused (); } );
16641664 EXPECT_TRUE (backpressure_monitor->is_paused ());
16651665
16661666 // Fill up the inputs of the asof join node
@@ -1673,7 +1673,8 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16731673 }
16741674
16751675 std::optional<ExecBatch> opt_batch;
1676-
1676+ BusyWait (5.0 , [&]() { return is_l_paused (); });
1677+ BusyWait (5.0 , [&]() { return is_r_paused (); });
16771678 // Read the batches from the sink to open up input of the asof join node
16781679 for (uint32_t i = 0 ; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
16791680 i++) {
@@ -1690,25 +1691,18 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16901691 BusyWait (5.0 , [&]() { return !is_r_paused (); });
16911692
16921693 // Finish the batches in the left and right producers
1693- for (uint32_t i = 0 ; i < thresholdOfBackpressureAsofLow + kResumeIfBelow + 2 ; i++) {
1694+ for (uint32_t i = 0 ; i < thresholdOfBackpressureAsofLow + kResumeIfBelow + 3 ; i++) {
16941695 SleepABit ();
16951696 EXPECT_FALSE (is_l_paused ());
16961697 EXPECT_FALSE (is_r_paused ());
1697- EXPECT_TRUE (backpressure_monitor->is_paused ());
1698-
16991698 ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
17001699 EXPECT_TRUE (opt_batch);
17011700 }
17021701
1703- BusyWait (5.0 , [&]() { return !backpressure_monitor->is_paused (); });
1704-
17051702 EXPECT_FALSE (is_l_paused ());
17061703 EXPECT_FALSE (is_r_paused ());
17071704 EXPECT_FALSE (backpressure_monitor->is_paused ());
17081705
1709- ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
1710- EXPECT_TRUE (opt_batch);
1711-
17121706 batch_producer_left.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
17131707 batch_producer_right.producer ().Push (IterationEnd<std::optional<ExecBatch>>());
17141708
0 commit comments