Skip to content

Commit b1f8ead

Browse files
committed
Replace sender_executor with P2300-realistic scheduler model
Remove sender_executor (no P2300 equivalent) and replace with pool_scheduler obtained via sender_thread_pool::get_scheduler(), matching beman::net's io_context pattern. Streams hold sender_thread_pool* directly. Bridge posting now goes through schedule → connect → start via heap-allocated scheduled_resume, since P2300 has no post(coroutine_handle<>).
1 parent 0b54d7f commit b1f8ead

File tree

5 files changed

+136
-148
lines changed

5 files changed

+136
-148
lines changed

bench/beman/main.cpp

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,8 @@ int main()
246246
// Native — sndr_read_stream
247247
{
248248
sender_thread_pool pool(1);
249-
auto ex = pool.get_executor();
250-
sndr_read_stream stream{ex};
251-
pool_scheduler sched{ex};
249+
sndr_read_stream stream{&pool};
250+
auto sched = pool.get_scheduler();
252251
int count = OPS_PER_CELL;
253252
char buf[64];
254253
auto before = g_alloc_count.load(
@@ -275,9 +274,8 @@ int main()
275274
// Abstract — sndr_io_read_stream
276275
{
277276
sender_thread_pool pool(1);
278-
auto ex = pool.get_executor();
279-
sndr_io_read_stream_impl stream{ex};
280-
pool_scheduler sched{ex};
277+
sndr_io_read_stream_impl stream{&pool};
278+
auto sched = pool.get_scheduler();
281279
int count = OPS_PER_CELL;
282280
char buf[64];
283281
auto before = g_alloc_count.load(
@@ -305,9 +303,8 @@ int main()
305303
// Type erased — sndr_any_read_stream
306304
{
307305
sender_thread_pool pool(1);
308-
auto ex = pool.get_executor();
309-
sndr_any_read_stream stream(sndr_read_stream{ex});
310-
pool_scheduler sched{ex};
306+
sndr_any_read_stream stream(sndr_read_stream{&pool});
307+
auto sched = pool.get_scheduler();
311308
int count = OPS_PER_CELL;
312309
char buf[64];
313310
auto before = g_alloc_count.load(
@@ -337,7 +334,7 @@ int main()
337334
{
338335
sender_thread_pool pool(1);
339336
ioaw_read_stream stream;
340-
pool_scheduler sched{pool.get_executor()};
337+
auto sched = pool.get_scheduler();
341338
int count = OPS_PER_CELL;
342339
char buf[64];
343340
auto before = g_alloc_count.load(
@@ -365,7 +362,7 @@ int main()
365362
{
366363
sender_thread_pool pool(1);
367364
ioaw_io_read_stream_impl stream;
368-
pool_scheduler sched{pool.get_executor()};
365+
auto sched = pool.get_scheduler();
369366
int count = OPS_PER_CELL;
370367
char buf[64];
371368
auto before = g_alloc_count.load(
@@ -397,7 +394,7 @@ int main()
397394
sender_thread_pool pool(1);
398395
ioaw_read_stream concrete;
399396
capy::any_read_stream stream(&concrete);
400-
pool_scheduler sched{pool.get_executor()};
397+
auto sched = pool.get_scheduler();
401398
int count = OPS_PER_CELL;
402399
char buf[64];
403400
auto before = g_alloc_count.load(
@@ -459,9 +456,8 @@ int main()
459456
// Native — sndr_read_stream
460457
{
461458
sender_thread_pool pool(1);
462-
auto ex = pool.get_executor();
463-
sender_as_capy_executor adapter{ex};
464-
sndr_read_stream stream{ex};
459+
sender_as_capy_executor adapter{&pool};
460+
sndr_read_stream stream{&pool};
465461
capy::run_async(adapter)(
466462
capy_accept_sndr(stream, grid[run][CAPY_TASK][NATIVE_STREAM][BRIDGED_EXEC_MODEL]));
467463
pool.join();
@@ -470,9 +466,8 @@ int main()
470466
// Abstract — sndr_io_read_stream
471467
{
472468
sender_thread_pool pool(1);
473-
auto ex = pool.get_executor();
474-
sender_as_capy_executor adapter{ex};
475-
sndr_io_read_stream_impl stream{ex};
469+
sender_as_capy_executor adapter{&pool};
470+
sndr_io_read_stream_impl stream{&pool};
476471
capy::run_async(adapter)(
477472
capy_accept_sndr(
478473
static_cast<sndr_io_read_stream&>(stream),
@@ -483,9 +478,8 @@ int main()
483478
// Type erased — sndr_any_read_stream
484479
{
485480
sender_thread_pool pool(1);
486-
auto ex = pool.get_executor();
487-
sender_as_capy_executor adapter{ex};
488-
sndr_any_read_stream stream(sndr_read_stream{ex});
481+
sender_as_capy_executor adapter{&pool};
482+
sndr_any_read_stream stream(sndr_read_stream{&pool});
489483
capy::run_async(adapter)(
490484
capy_accept_sndr(stream, grid[run][CAPY_TASK][TYPE_ERASED_STREAM][BRIDGED_EXEC_MODEL]));
491485
pool.join();
@@ -498,9 +492,8 @@ int main()
498492
// Native — sndr_read_stream
499493
{
500494
sender_thread_pool pool(1);
501-
auto ex = pool.get_executor();
502-
sndr_read_stream stream{ex};
503-
pool_scheduler sched{ex};
495+
sndr_read_stream stream{&pool};
496+
auto sched = pool.get_scheduler();
504497
auto* mr = capy::get_recycling_memory_resource();
505498
bex::sync_wait(bex::starts_on(sched,
506499
bex_accept(
@@ -513,9 +506,8 @@ int main()
513506
// Abstract — sndr_io_read_stream
514507
{
515508
sender_thread_pool pool(1);
516-
auto ex = pool.get_executor();
517-
sndr_io_read_stream_impl stream{ex};
518-
pool_scheduler sched{ex};
509+
sndr_io_read_stream_impl stream{&pool};
510+
auto sched = pool.get_scheduler();
519511
auto* mr = capy::get_recycling_memory_resource();
520512
bex::sync_wait(bex::starts_on(sched,
521513
bex_accept(
@@ -529,9 +521,8 @@ int main()
529521
// Type erased — sndr_any_read_stream
530522
{
531523
sender_thread_pool pool(1);
532-
auto ex = pool.get_executor();
533-
sndr_any_read_stream stream(sndr_read_stream{ex});
534-
pool_scheduler sched{ex};
524+
sndr_any_read_stream stream(sndr_read_stream{&pool});
525+
auto sched = pool.get_scheduler();
535526
auto* mr = capy::get_recycling_memory_resource();
536527
bex::sync_wait(bex::starts_on(sched,
537528
bex_accept(
@@ -546,9 +537,8 @@ int main()
546537
// Native — ioaw_read_stream
547538
{
548539
sender_thread_pool pool(1);
549-
auto ex = pool.get_executor();
550540
ioaw_read_stream stream;
551-
pool_scheduler sched{ex};
541+
auto sched = pool.get_scheduler();
552542
auto* mr = capy::get_recycling_memory_resource();
553543
bex::sync_wait(bex::starts_on(sched,
554544
bex_accept_ioaw(
@@ -561,9 +551,8 @@ int main()
561551
// Abstract — ioaw_io_read_stream
562552
{
563553
sender_thread_pool pool(1);
564-
auto ex = pool.get_executor();
565554
ioaw_io_read_stream_impl stream;
566-
pool_scheduler sched{ex};
555+
auto sched = pool.get_scheduler();
567556
auto* mr = capy::get_recycling_memory_resource();
568557
bex::sync_wait(bex::starts_on(sched,
569558
bex_accept_ioaw(
@@ -577,10 +566,9 @@ int main()
577566
// Type erased — capy::any_read_stream
578567
{
579568
sender_thread_pool pool(1);
580-
auto ex = pool.get_executor();
581569
ioaw_read_stream concrete;
582570
capy::any_read_stream stream(&concrete);
583-
pool_scheduler sched{ex};
571+
auto sched = pool.get_scheduler();
584572
auto* mr = capy::get_recycling_memory_resource();
585573
bex::sync_wait(bex::starts_on(sched,
586574
bex_accept_ioaw(

bench/beman/sender_io_env.hpp

Lines changed: 85 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
//
1111
// Beman execution environment for benchmarks.
1212
//
13-
// Provides a minimal scheduler, executor, and io_env that bridge
14-
// sender_thread_pool to beman::execution::task. Used by any
15-
// benchmark that co_awaits senders from an io_task.
13+
// Provides pool_scheduler (the P2300 scheduler for
14+
// sender_thread_pool), the capy executor adapter, and
15+
// the io_env for beman::execution::task.
1616
//
1717

1818
#ifndef BOOST_CAPY_BENCH_SENDER_IO_ENV_HPP
@@ -32,31 +32,28 @@
3232
#include <type_traits>
3333
#include <utility>
3434

35-
// Adapter making sender_executor satisfy capy's Executor
36-
// concept so capy::task can run on sender_thread_pool.
35+
// Adapter making sender_thread_pool satisfy capy's
36+
// Executor concept so capy::task can run on it.
3737
struct sender_as_capy_executor
3838
{
39-
sender_executor ex_;
39+
sender_thread_pool* pool_;
4040

4141
boost::capy::execution_context& context() const noexcept
4242
{
43-
return *ex_.pool_;
43+
return *pool_;
4444
}
4545

4646
void on_work_started() const noexcept
4747
{
48-
ex_.pool_->on_work_started();
48+
pool_->on_work_started();
4949
}
5050

5151
void on_work_finished() const noexcept
5252
{
53-
ex_.pool_->on_work_finished();
53+
pool_->on_work_finished();
5454
}
5555

56-
void post(boost::capy::continuation& c) const
57-
{
58-
ex_.post(c.h);
59-
}
56+
void post(boost::capy::continuation& c) const;
6057

6158
// Return the handle for symmetric transfer so the
6259
// caller resumes the coroutine inline. Posting would
@@ -78,17 +75,17 @@ struct pool_scheduler
7875
{
7976
using scheduler_concept = ex::scheduler_t;
8077

81-
sender_executor ex_;
78+
sender_thread_pool* pool_;
8279

8380
struct env
8481
{
85-
sender_executor ex_;
82+
sender_thread_pool* pool_;
8683

8784
auto query(
8885
ex::get_completion_scheduler_t<ex::set_value_t> const&
8986
) const noexcept
9087
{
91-
return pool_scheduler{ex_};
88+
return pool_scheduler{pool_};
9289
}
9390
};
9491

@@ -98,11 +95,11 @@ struct pool_scheduler
9895
using operation_state_concept = ex::operation_state_t;
9996

10097
std::remove_cvref_t<Receiver> rcvr_;
101-
sender_executor ex_;
98+
sender_thread_pool* pool_;
10299

103-
op_state(Receiver rcvr, sender_executor ex)
100+
op_state(Receiver rcvr, sender_thread_pool* pool)
104101
: rcvr_(std::move(rcvr))
105-
, ex_(ex)
102+
, pool_(pool)
106103
{}
107104

108105
op_state(op_state const&) = delete;
@@ -117,7 +114,7 @@ struct pool_scheduler
117114

118115
void start() & noexcept
119116
{
120-
ex_.enqueue(this);
117+
pool_->enqueue(this);
121118
}
122119
};
123120

@@ -127,35 +124,96 @@ struct pool_scheduler
127124
using completion_signatures =
128125
ex::completion_signatures<ex::set_value_t()>;
129126

130-
sender_executor ex_;
127+
sender_thread_pool* pool_;
131128

132-
auto get_env() const noexcept { return env{ex_}; }
129+
auto get_env() const noexcept { return env{pool_}; }
133130

134131
template <ex::receiver Receiver>
135132
auto connect(Receiver&& rcvr)
136133
-> op_state<std::remove_cvref_t<Receiver>>
137134
{
138-
return {std::forward<Receiver>(rcvr), ex_};
135+
return {std::forward<Receiver>(rcvr), pool_};
139136
}
140137
};
141138

142139
auto query(
143140
boost::capy::get_io_executor_t const&
144141
) const noexcept -> sender_as_capy_executor
145142
{
146-
return sender_as_capy_executor{ex_};
143+
return sender_as_capy_executor{pool_};
147144
}
148145

149-
auto schedule() -> sender { return {ex_}; }
146+
auto schedule() -> sender { return {pool_}; }
150147
bool operator==(pool_scheduler const&) const = default;
151148
};
152149

150+
inline pool_scheduler
151+
sender_thread_pool::get_scheduler() noexcept
152+
{
153+
return pool_scheduler{this};
154+
}
155+
156+
// P2300 has no post(coroutine_handle<>). To resume a
157+
// coroutine on a scheduler you must go through
158+
// schedule → connect → start. The operation state
159+
// must be heap-allocated because the coroutine is
160+
// suspended and cannot host it.
161+
struct scheduled_resume
162+
{
163+
struct receiver
164+
{
165+
using receiver_concept = ex::receiver_t;
166+
167+
scheduled_resume* self_;
168+
169+
void set_value() && noexcept
170+
{
171+
auto h = self_->h_;
172+
delete self_;
173+
h.resume();
174+
}
175+
176+
void set_error(auto&&) && noexcept
177+
{
178+
std::terminate();
179+
}
180+
181+
void set_stopped() && noexcept
182+
{
183+
std::terminate();
184+
}
185+
};
186+
187+
using op_state_t =
188+
pool_scheduler::op_state<receiver>;
189+
190+
std::coroutine_handle<> h_;
191+
op_state_t op_;
192+
193+
scheduled_resume(
194+
pool_scheduler sched,
195+
std::coroutine_handle<> h)
196+
: h_(h)
197+
, op_(ex::connect(
198+
sched.schedule(),
199+
receiver{this}))
200+
{}
201+
};
202+
203+
inline void sender_as_capy_executor::post(
204+
boost::capy::continuation& c) const
205+
{
206+
auto* p = new scheduled_resume(
207+
pool_scheduler{pool_}, c.h);
208+
ex::start(p->op_);
209+
}
210+
153211
struct io_env
154212
{
155213
using scheduler_type = pool_scheduler;
156214
using allocator_type = std::pmr::polymorphic_allocator<std::byte>;
157215

158-
sender_executor executor;
216+
sender_thread_pool* pool_ = nullptr;
159217

160218
io_env() = default;
161219

@@ -164,7 +222,7 @@ struct io_env
164222
pool_scheduler{ex::get_scheduler(e)};
165223
}
166224
io_env(Env const& e)
167-
: executor(pool_scheduler{ex::get_scheduler(e)}.ex_)
225+
: pool_(pool_scheduler{ex::get_scheduler(e)}.pool_)
168226
{}
169227
};
170228

0 commit comments

Comments
 (0)