Skip to content

Commit f34a1ba

Browse files
committed
Fix run_for blocking indefinitely with outstanding work
run_task() had no timeout parameter — the reactor always blocked forever or polled instantly, ignoring the timeout from do_one(). Propagate the timeout to each backend's I/O syscall (epoll_wait, select, kevent) and return after a timed reactor poll so run_one_until can recheck the deadline.
1 parent adca257 commit f34a1ba

File tree

5 files changed

+74
-12
lines changed

5 files changed

+74
-12
lines changed

include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
117117

118118
private:
119119
void
120-
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
120+
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
121+
long timeout_us) override;
121122
void interrupt_reactor() const override;
122123
void update_timerfd() const;
123124

@@ -294,9 +295,16 @@ epoll_scheduler::update_timerfd() const
294295
}
295296

296297
inline void
297-
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
298+
epoll_scheduler::run_task(
299+
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
298300
{
299-
int timeout_ms = task_interrupted_ ? 0 : -1;
301+
int timeout_ms;
302+
if (task_interrupted_)
303+
timeout_ms = 0;
304+
else if (timeout_us < 0)
305+
timeout_ms = -1;
306+
else
307+
timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
300308

301309
if (lock.owns_lock())
302310
lock.unlock();

include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base
139139

140140
private:
141141
void
142-
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
142+
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
143+
long timeout_us) override;
143144
void interrupt_reactor() const override;
144145
long calculate_timeout(long requested_timeout_us) const;
145146

@@ -285,9 +286,10 @@ kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
285286

286287
inline void
287288
kqueue_scheduler::run_task(
288-
std::unique_lock<std::mutex>& lock, context_type* ctx)
289+
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
289290
{
290-
long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
291+
long effective_timeout_us =
292+
task_interrupted_ ? 0 : calculate_timeout(timeout_us);
291293

292294
if (lock.owns_lock())
293295
lock.unlock();

include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ class reactor_scheduler_base
279279

280280
/// Run the platform-specific reactor poll.
281281
virtual void
282-
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
282+
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
283+
long timeout_us) = 0;
283284

284285
/// Wake a blocked reactor (e.g. write to eventfd or pipe).
285286
virtual void interrupt_reactor() const = 0;
@@ -775,15 +776,16 @@ reactor_scheduler_base::do_one(
775776
return 0;
776777
}
777778

778-
task_interrupted_ = more_handlers || timeout_us == 0;
779+
long task_timeout_us = more_handlers ? 0 : timeout_us;
780+
task_interrupted_ = task_timeout_us == 0;
779781
task_running_.store(true, std::memory_order_release);
780782

781783
if (more_handlers)
782784
unlock_and_signal_one(lock);
783785

784786
try
785787
{
786-
run_task(lock, ctx);
788+
run_task(lock, ctx, task_timeout_us);
787789
}
788790
catch (...)
789791
{
@@ -793,6 +795,8 @@ reactor_scheduler_base::do_one(
793795

794796
task_running_.store(false, std::memory_order_relaxed);
795797
completed_ops_.push(&task_op_);
798+
if (timeout_us > 0)
799+
return 0;
796800
continue;
797801
}
798802

include/boost/corosio/native/detail/select/select_scheduler.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
127127

128128
private:
129129
void
130-
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
130+
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
131+
long timeout_us) override;
131132
void interrupt_reactor() const override;
132133
long calculate_timeout(long requested_timeout_us) const;
133134

@@ -302,9 +303,10 @@ select_scheduler::calculate_timeout(long requested_timeout_us) const
302303

303304
inline void
304305
select_scheduler::run_task(
305-
std::unique_lock<std::mutex>& lock, context_type* ctx)
306+
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
306307
{
307-
long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
308+
long effective_timeout_us =
309+
task_interrupted_ ? 0 : calculate_timeout(timeout_us);
308310

309311
// Snapshot registered descriptors while holding lock.
310312
// Record which fds need write monitoring to avoid a hot loop:

test/unit/io_context.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,50 @@ struct io_context_test
463463
BOOST_TEST(counter == 1);
464464
}
465465

466+
void testRunForWithOutstandingWork()
467+
{
468+
io_context ioc;
469+
auto ex = ioc.get_executor();
470+
471+
// Simulate persistent outstanding work (like a listening acceptor)
472+
ex.on_work_started();
473+
474+
auto start = std::chrono::steady_clock::now();
475+
std::size_t n = ioc.run_for(std::chrono::milliseconds(200));
476+
auto elapsed = std::chrono::steady_clock::now() - start;
477+
478+
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
479+
.count();
480+
481+
// Must return after ~200ms, not block forever
482+
BOOST_TEST(n == 0);
483+
BOOST_TEST(ms >= 150);
484+
BOOST_TEST(ms < 1000);
485+
486+
ex.on_work_finished();
487+
}
488+
489+
void testRunOneForWithOutstandingWork()
490+
{
491+
io_context ioc;
492+
auto ex = ioc.get_executor();
493+
494+
ex.on_work_started();
495+
496+
auto start = std::chrono::steady_clock::now();
497+
std::size_t n = ioc.run_one_for(std::chrono::milliseconds(200));
498+
auto elapsed = std::chrono::steady_clock::now() - start;
499+
500+
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
501+
.count();
502+
503+
BOOST_TEST(n == 0);
504+
BOOST_TEST(ms >= 150);
505+
BOOST_TEST(ms < 1000);
506+
507+
ex.on_work_finished();
508+
}
509+
466510
void testExecutorRunningInThisThread()
467511
{
468512
io_context ioc;
@@ -610,6 +654,8 @@ struct io_context_test
610654
testRunOneFor();
611655
testRunOneUntil();
612656
testRunFor();
657+
testRunForWithOutstandingWork();
658+
testRunOneForWithOutstandingWork();
613659
testExecutorRunningInThisThread();
614660
testMultithreaded();
615661
testMultithreadedStress();

0 commit comments

Comments
 (0)