Skip to content

Commit efe3909

Browse files
committed
Add failing tests for executor-hopping bug in run()
Pins the required semantics of capy::run at a cross-executor boundary: the forward trip must enqueue the target task, and the return trip must enqueue the caller's continuation. Covers five scenarios: - run(pe)(inner) from a handler on pe must not let inner cut the queue ahead of other pending work. - When the target runs synchronously, the return trip must still tick the caller's executor, so higher-priority work there runs before the caller resumes. - run(inner)(work) from inside a strand must release the strand while work runs. - A handler that does co_await run(strand)(task) must be outside the strand after the await returns. - An io loop that does co_await run(compute_pool)(task) must resume on the io thread, not on a compute worker. All five fail against the current dispatch-based run() and will pass once run() posts on both trips. Adds a test-only priority_executor support header used by the first three.
1 parent cead800 commit efe3909

4 files changed

Lines changed: 544 additions & 0 deletions

File tree

test/unit/ex/priority_executor.hpp

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
//
2+
// Copyright (c) 2026 Steve Gerbino
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#ifndef BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP
11+
#define BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP
12+
13+
#include <boost/capy/concept/executor.hpp>
14+
#include <boost/capy/continuation.hpp>
15+
#include <boost/capy/ex/execution_context.hpp>
16+
#include <boost/capy/ex/executor_ref.hpp>
17+
#include <boost/capy/ex/frame_allocator.hpp>
18+
19+
#include <atomic>
20+
#include <coroutine>
21+
#include <exception>
22+
#include <mutex>
23+
#include <thread>
24+
#include <type_traits>
25+
#include <utility>
26+
27+
namespace boost {
28+
namespace capy {
29+
namespace test {
30+
31+
/** Test-only strand-shaped executor that drains high before low.
32+
*/
33+
struct priority_executor_state
34+
{
35+
std::mutex mutex;
36+
continuation* high_head = nullptr;
37+
continuation* high_tail = nullptr;
38+
continuation* low_head = nullptr;
39+
continuation* low_tail = nullptr;
40+
bool locked = false;
41+
std::atomic<std::thread::id> dispatch_thread{};
42+
};
43+
44+
namespace detail {
45+
46+
struct priority_invoker
47+
{
48+
struct promise_type
49+
{
50+
continuation self;
51+
52+
priority_invoker get_return_object() noexcept
53+
{
54+
return {std::coroutine_handle<promise_type>::from_promise(*this)};
55+
}
56+
57+
std::suspend_always initial_suspend() noexcept { return {}; }
58+
std::suspend_never final_suspend() noexcept { return {}; }
59+
void return_void() noexcept {}
60+
void unhandled_exception() { std::terminate(); }
61+
};
62+
63+
std::coroutine_handle<promise_type> h_;
64+
};
65+
66+
inline void
67+
drain_list(continuation* head) noexcept
68+
{
69+
while(head)
70+
{
71+
continuation* c = head;
72+
head = c->next;
73+
c->next = nullptr;
74+
::boost::capy::safe_resume(c->h);
75+
}
76+
}
77+
78+
inline priority_invoker
79+
make_priority_invoker(priority_executor_state* s)
80+
{
81+
for(;;)
82+
{
83+
s->dispatch_thread.store(
84+
std::this_thread::get_id(),
85+
std::memory_order_release);
86+
87+
continuation* high_head;
88+
continuation* low_head;
89+
{
90+
std::lock_guard<std::mutex> lk(s->mutex);
91+
high_head = s->high_head;
92+
low_head = s->low_head;
93+
s->high_head = nullptr;
94+
s->high_tail = nullptr;
95+
s->low_head = nullptr;
96+
s->low_tail = nullptr;
97+
}
98+
99+
drain_list(high_head);
100+
drain_list(low_head);
101+
102+
{
103+
std::lock_guard<std::mutex> lk(s->mutex);
104+
if(!s->high_head && !s->low_head)
105+
{
106+
s->locked = false;
107+
s->dispatch_thread.store(
108+
std::thread::id{},
109+
std::memory_order_release);
110+
co_return;
111+
}
112+
}
113+
}
114+
}
115+
116+
} // namespace detail
117+
118+
/** Executor view over priority_executor_state. Dispatch has the same
119+
thread-check fast path as strand; post defaults to the low queue.
120+
*/
121+
template<class Ex>
122+
class priority_executor
123+
{
124+
priority_executor_state* state_;
125+
Ex inner_ex_;
126+
127+
enum class priority { high, low };
128+
129+
void
130+
enqueue_under_lock(continuation& c, priority p) const noexcept
131+
{
132+
c.next = nullptr;
133+
if(p == priority::high)
134+
{
135+
if(state_->high_tail) state_->high_tail->next = &c;
136+
else state_->high_head = &c;
137+
state_->high_tail = &c;
138+
}
139+
else
140+
{
141+
if(state_->low_tail) state_->low_tail->next = &c;
142+
else state_->low_head = &c;
143+
state_->low_tail = &c;
144+
}
145+
}
146+
147+
void
148+
post_with_priority(continuation& c, priority p) const
149+
{
150+
bool first;
151+
{
152+
std::lock_guard<std::mutex> lk(state_->mutex);
153+
enqueue_under_lock(c, p);
154+
first = !state_->locked;
155+
if(first) state_->locked = true;
156+
}
157+
if(first)
158+
post_invoker();
159+
}
160+
161+
void
162+
post_invoker() const
163+
{
164+
auto inv = detail::make_priority_invoker(state_);
165+
auto& self = inv.h_.promise().self;
166+
self.h = inv.h_;
167+
self.next = nullptr;
168+
inner_ex_.post(self);
169+
}
170+
171+
public:
172+
priority_executor(priority_executor_state& state, Ex inner) noexcept(
173+
std::is_nothrow_move_constructible_v<Ex>)
174+
: state_(&state)
175+
, inner_ex_(std::move(inner))
176+
{
177+
}
178+
179+
priority_executor(priority_executor const&) noexcept(
180+
std::is_nothrow_copy_constructible_v<Ex>) = default;
181+
priority_executor(priority_executor&&) noexcept(
182+
std::is_nothrow_move_constructible_v<Ex>) = default;
183+
priority_executor& operator=(priority_executor const&) = default;
184+
priority_executor& operator=(priority_executor&&) noexcept(
185+
std::is_nothrow_move_assignable_v<Ex>) = default;
186+
187+
bool
188+
operator==(priority_executor const& other) const noexcept
189+
{
190+
return state_ == other.state_;
191+
}
192+
193+
auto&
194+
context() const noexcept
195+
{
196+
return inner_ex_.context();
197+
}
198+
199+
void on_work_started() const noexcept { inner_ex_.on_work_started(); }
200+
void on_work_finished() const noexcept { inner_ex_.on_work_finished(); }
201+
202+
bool
203+
running_in_this_thread() const noexcept
204+
{
205+
return state_->dispatch_thread.load(std::memory_order_acquire)
206+
== std::this_thread::get_id();
207+
}
208+
209+
std::coroutine_handle<>
210+
dispatch(continuation& c) const
211+
{
212+
if(running_in_this_thread())
213+
return c.h;
214+
post_with_priority(c, priority::low);
215+
return std::noop_coroutine();
216+
}
217+
218+
void
219+
post(continuation& c) const
220+
{
221+
post_with_priority(c, priority::low);
222+
}
223+
224+
void
225+
post_high(continuation& c) const
226+
{
227+
post_with_priority(c, priority::high);
228+
}
229+
230+
void
231+
post_low(continuation& c) const
232+
{
233+
post_with_priority(c, priority::low);
234+
}
235+
};
236+
237+
} // namespace test
238+
} // namespace capy
239+
} // namespace boost
240+
241+
#endif

test/unit/ex/run.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
#include <boost/capy/ex/strand.hpp>
2323
#include <boost/capy/ex/thread_pool.hpp>
24+
#include <boost/capy/test/run_blocking.hpp>
2425

2526
#include <latch>
2627
#include <memory>
28+
#include <thread>
2729

2830
namespace boost {
2931
namespace capy {
@@ -500,6 +502,35 @@ struct run_test
500502
pool.join();
501503
}
502504

505+
// co_await run(compute_exec)(...) from an io loop must return
506+
// the caller to the io thread, not leave it on a compute worker.
507+
void
508+
testHopsBackToIoThread()
509+
{
510+
thread_pool compute_pool(2, "compute-");
511+
512+
std::thread::id io_tid = std::this_thread::get_id();
513+
std::thread::id compute_tid{};
514+
std::thread::id parent_tid_after_run{};
515+
516+
test::run_blocking()([&]() -> task<void> {
517+
auto compute_exec = compute_pool.get_executor();
518+
519+
co_await capy::run(compute_exec)([&]() -> task<void> {
520+
compute_tid = std::this_thread::get_id();
521+
co_return;
522+
}());
523+
524+
parent_tid_after_run = std::this_thread::get_id();
525+
}());
526+
527+
BOOST_TEST(compute_tid != std::thread::id{});
528+
BOOST_TEST(compute_tid != io_tid);
529+
BOOST_TEST_EQ(parent_tid_after_run, io_tid);
530+
531+
compute_pool.join();
532+
}
533+
503534
void
504535
run()
505536
{
@@ -523,6 +554,7 @@ struct run_test
523554
testAllocatorPropagation();
524555
testAllocatorPropagationThroughRun();
525556
testRunExStrandFirstInstruction();
557+
testHopsBackToIoThread();
526558
}
527559
};
528560

0 commit comments

Comments
 (0)