File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -1962,4 +1962,36 @@ TEST(DefaultIfEmptyGenerator, Basics) {
19621962 actual, CollectAsyncGenerator (MakeDefaultIfEmptyGenerator (gen, TestInt (42 ))));
19631963 EXPECT_EQ (std::vector<TestInt>{42 }, actual);
19641964}
1965+
1966+ class MonitorBackpressureControl : public acero ::BackpressureControl {
1967+ public:
1968+ explicit MonitorBackpressureControl (std::atomic<bool >& paused) : paused(paused) {}
1969+ virtual void Pause () { paused = true ; }
1970+ virtual void Resume () { paused = false ; }
1971+ std::atomic<bool >& paused;
1972+ };
1973+
1974+ TEST (TestAsyncUtil, PushGeneratorBackpressure) {
1975+ std::atomic<bool > paused;
1976+ ASSERT_OK_AND_ASSIGN (auto handler,
1977+ acero::BackpressureHandler::Make (
1978+ 4 , 8 , std::make_unique<MonitorBackpressureControl>(paused)));
1979+
1980+ auto gen = PushGenerator<TestInt>(std::move (handler));
1981+ auto producer = gen.producer ();
1982+ ASSERT_FALSE (paused);
1983+ for (int i = 1 ; i < 8 ; ++i) {
1984+ producer.Push (TestInt (i));
1985+ ASSERT_FALSE (paused);
1986+ }
1987+ producer.Push (TestInt (9 ));
1988+ ASSERT_TRUE (paused);
1989+ for (int i = 1 ; i < 8 - 4 ; ++i) {
1990+ ASSERT_OK (gen ().result ());
1991+ ASSERT_TRUE (paused);
1992+ }
1993+ ASSERT_OK (gen ().result ());
1994+ ASSERT_FALSE (paused);
1995+ }
1996+
19651997} // namespace arrow
You can’t perform that action at this time.
0 commit comments