@@ -1610,12 +1610,6 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16101610 BackpressureCounters bp_countersl, bp_countersr;
16111611 BackpressureCountingNode::Register ();
16121612
1613- auto wait = [](uint32_t miliseconds) {
1614- for (size_t i = 0 ; i < miliseconds; i++) {
1615- SleepABit ();
1616- }
1617- };
1618-
16191613 Declaration left_count{" backpressure_count" ,
16201614 {std::move (left)},
16211615 BackpressureCountingNodeOptions (&bp_countersl)};
@@ -1654,24 +1648,24 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16541648 uint32_t l_cnt = 0 ;
16551649 uint32_t r_cnt = 0 ;
16561650 for (uint32_t i = 0 ; i < kPauseIfAbove ; i++) {
1657- wait ( 10 );
1651+ SleepABit ( );
16581652 EXPECT_FALSE (backpressure_monitor->is_paused ());
16591653 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16601654 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
16611655 }
16621656
1663- wait ( 10 );
1657+ SleepABit ( );
16641658 EXPECT_FALSE (backpressure_monitor->is_paused ());
16651659 // One more batch should trigger back pressure
16661660 batch_producer_right.producer ().Push (r0_batches.batches [r_cnt++]);
16671661 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
16681662
1669- wait ( 10 );
1663+ SleepABit ( );
16701664 EXPECT_TRUE (backpressure_monitor->is_paused ());
16711665
16721666 // Fill up the inputs of the asof join node
16731667 for (uint32_t i = 0 ; i < thresholdOfBackpressureAsof; i++) {
1674- wait ( 10 );
1668+ SleepABit ( );
16751669 EXPECT_FALSE (is_l_paused ());
16761670 EXPECT_FALSE (is_r_paused ());
16771671 batch_producer_left.producer ().Push (l_batches.batches [l_cnt++]);
@@ -1683,7 +1677,7 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16831677 // Read the batches from the sink to open up input of the asof join node
16841678 for (uint32_t i = 0 ; i < thresholdOfBackpressureAsof - thresholdOfBackpressureAsofLow;
16851679 i++) {
1686- wait ( 10 );
1680+ SleepABit ( );
16871681 EXPECT_TRUE (is_l_paused ());
16881682 EXPECT_TRUE (is_r_paused ());
16891683 EXPECT_TRUE (backpressure_monitor->is_paused ());
@@ -1692,17 +1686,22 @@ TEST(AsofJoinTest, PauseProducingAsofJoinSource) {
16921686 EXPECT_TRUE (opt_batch);
16931687 }
16941688
1689+ BusyWait (5.0 , [&]() { return !is_l_paused (); });
1690+ BusyWait (5.0 , [&]() { return !is_r_paused (); });
1691+
16951692 // Finish the batches in the left and right producers
16961693 for (uint32_t i = 0 ; i < thresholdOfBackpressureAsofLow + kResumeIfBelow + 2 ; i++) {
1697- wait ( 10 );
1694+ SleepABit ( );
16981695 EXPECT_FALSE (is_l_paused ());
16991696 EXPECT_FALSE (is_r_paused ());
17001697 EXPECT_TRUE (backpressure_monitor->is_paused ());
17011698
17021699 ASSERT_FINISHES_OK_AND_ASSIGN (opt_batch, sink_gen ());
17031700 EXPECT_TRUE (opt_batch);
17041701 }
1705- wait (10 );
1702+
1703+ BusyWait (5.0 , [&]() { return !backpressure_monitor->is_paused (); });
1704+
17061705 EXPECT_FALSE (is_l_paused ());
17071706 EXPECT_FALSE (is_r_paused ());
17081707 EXPECT_FALSE (backpressure_monitor->is_paused ());
0 commit comments