Skip to content

Commit 21ca572

Browse files
committed
Rename repeat_effect_until to repeat_until with trampoline and sync row
Align with stdexec's naming: repeat_effect_until → repeat_until. Add depth-bounded trampoline (max_depth = 19) that converts deep recursion into an iterative drain loop for synchronous completions. Enable the synchronous benchmark row for Table 1 (sender pipeline), previously N/A due to stack overflow without a trampoline. Fix awaitable_sender bridge to handle synchronous completion from await_suspend returning the caller's handle — call complete() instead of discarding the returned handle.
1 parent 70442c2 commit 21ca572

4 files changed

Lines changed: 130 additions & 43 deletions

File tree

bench/beman/awaitable_sender.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,10 @@ struct awaitable_sender
314314
auto h = std::coroutine_handle<>::from_address(
315315
static_cast<void*>(&cb_));
316316

317-
detail::call_await_suspend(&aw_, h, &env_);
317+
auto resumed = detail::call_await_suspend(
318+
&aw_, h, &env_);
319+
if(resumed == h)
320+
complete();
318321
}
319322
};
320323

bench/beman/main.cpp

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// I/O Read Stream Benchmark
1212
//
1313
// Compares three execution models across three stream abstraction
14-
// levels. 100M read_some calls per cell, single thread.
14+
// levels. 20M read_some calls per cell, single thread.
1515
//
1616
// Table 1: sender pipeline (connect/start)
1717
// Table 2: capy::task (capy::thread_pool)
@@ -27,7 +27,7 @@
2727
#include "awaitable_sender.hpp"
2828
#include "ioaw_read_stream.hpp"
2929
#include "ioaw_io_read_stream.hpp"
30-
#include "repeat_effect_until.hpp"
30+
#include "repeat_until.hpp"
3131
#include "sender_awaitable.hpp"
3232
#include "sndr_any_read_stream.hpp"
3333
#include "sndr_io_read_stream.hpp"
@@ -248,12 +248,14 @@ int main()
248248
for (int run = 0; run <= NUM_RUNS; ++run)
249249
{
250250

251+
251252
// ---------------------------------------------------------------
252-
// Table 1: sender/receiver pipeline (repeat_effect_until)
253+
// Table 1: sender/receiver pipeline (repeat_until)
253254
// ---------------------------------------------------------------
254255

255256
// Col A: Sender (native)
256257

258+
257259
// Native — sndr_read_stream
258260
{
259261
sender_thread_pool pool(1);
@@ -265,7 +267,7 @@ int main()
265267
std::memory_order_relaxed);
266268
auto start = std::chrono::steady_clock::now();
267269
bex::sync_wait(bex::starts_on(sched,
268-
repeat_effect_until(
270+
repeat_until(
269271
bex::let_value(bex::just(), [&]() {
270272
return stream.read_some(
271273
capy::mutable_buffer(buf, sizeof(buf)));
@@ -293,7 +295,7 @@ int main()
293295
std::memory_order_relaxed);
294296
auto start = std::chrono::steady_clock::now();
295297
bex::sync_wait(bex::starts_on(sched,
296-
repeat_effect_until(
298+
repeat_until(
297299
bex::let_value(bex::just(), [&]() {
298300
return static_cast<sndr_io_read_stream&>(
299301
stream).read_some(
@@ -322,7 +324,7 @@ int main()
322324
std::memory_order_relaxed);
323325
auto start = std::chrono::steady_clock::now();
324326
bex::sync_wait(bex::starts_on(sched,
325-
repeat_effect_until(
327+
repeat_until(
326328
bex::let_value(bex::just(), [&]() {
327329
return stream.read_some(
328330
capy::mutable_buffer(buf, sizeof(buf)));
@@ -341,6 +343,7 @@ int main()
341343

342344
// Col B: Awaitable (via as_sender bridge)
343345

346+
344347
// Native — ioaw_read_stream
345348
{
346349
sender_thread_pool pool(1);
@@ -352,7 +355,7 @@ int main()
352355
std::memory_order_relaxed);
353356
auto start = std::chrono::steady_clock::now();
354357
bex::sync_wait(bex::starts_on(sched,
355-
repeat_effect_until(
358+
repeat_until(
356359
bex::let_value(bex::just(), [&]() {
357360
return capy::as_sender(stream.read_some(
358361
capy::mutable_buffer(buf, sizeof(buf))));
@@ -380,7 +383,7 @@ int main()
380383
std::memory_order_relaxed);
381384
auto start = std::chrono::steady_clock::now();
382385
bex::sync_wait(bex::starts_on(sched,
383-
repeat_effect_until(
386+
repeat_until(
384387
bex::let_value(bex::just(), [&]() {
385388
return capy::as_sender(
386389
static_cast<ioaw_io_read_stream&>(
@@ -412,7 +415,7 @@ int main()
412415
std::memory_order_relaxed);
413416
auto start = std::chrono::steady_clock::now();
414417
bex::sync_wait(bex::starts_on(sched,
415-
repeat_effect_until(
418+
repeat_until(
416419
bex::let_value(bex::just(), [&]() {
417420
return capy::as_sender(stream.read_some(
418421
capy::mutable_buffer(buf, sizeof(buf))));
@@ -429,16 +432,68 @@ int main()
429432
after - before};
430433
}
431434

432-
// Synchronous — sender pipeline cannot run synchronous
433-
// senders. repeat_effect_until requires a trampoline for
434-
// synchronous completions, and the trampoline interacts
435-
// poorly with the let_value/starts_on sender layering.
436-
// Table 1 SYNC_STREAM cells are left at zero.
435+
436+
// Synchronous — sndr_sync_read_stream (Col A)
437+
{
438+
sender_thread_pool pool(1);
439+
sndr_sync_read_stream stream;
440+
auto sched = pool.get_scheduler();
441+
int count = OPS_PER_CELL;
442+
char buf[64];
443+
auto before = g_alloc_count.load(
444+
std::memory_order_relaxed);
445+
auto start = std::chrono::steady_clock::now();
446+
bex::sync_wait(bex::starts_on(sched,
447+
repeat_until(
448+
bex::let_value(bex::just(), [&]() {
449+
return stream.read_some(
450+
capy::mutable_buffer(buf, sizeof(buf)));
451+
}),
452+
[&count]() { return --count == 0; })));
453+
pool.join();
454+
auto elapsed =
455+
std::chrono::steady_clock::now() - start;
456+
auto after = g_alloc_count.load(
457+
std::memory_order_relaxed);
458+
grid[run][SENDER_RECEIVER][SYNC_STREAM][NATIVE_EXEC_MODEL] = {
459+
std::chrono::duration_cast<
460+
std::chrono::nanoseconds>(elapsed).count(),
461+
after - before};
462+
}
463+
464+
// Synchronous — ioaw_sync_read_stream (Col B)
465+
{
466+
sender_thread_pool pool(1);
467+
ioaw_sync_read_stream stream;
468+
auto sched = pool.get_scheduler();
469+
int count = OPS_PER_CELL;
470+
char buf[64];
471+
auto before = g_alloc_count.load(
472+
std::memory_order_relaxed);
473+
auto start = std::chrono::steady_clock::now();
474+
bex::sync_wait(bex::starts_on(sched,
475+
repeat_until(
476+
bex::let_value(bex::just(), [&]() {
477+
return capy::as_sender(stream.read_some(
478+
capy::mutable_buffer(buf, sizeof(buf))));
479+
}),
480+
[&count]() { return --count == 0; })));
481+
pool.join();
482+
auto elapsed =
483+
std::chrono::steady_clock::now() - start;
484+
auto after = g_alloc_count.load(
485+
std::memory_order_relaxed);
486+
grid[run][SENDER_RECEIVER][SYNC_STREAM][BRIDGED_EXEC_MODEL] = {
487+
std::chrono::duration_cast<
488+
std::chrono::nanoseconds>(elapsed).count(),
489+
after - before};
490+
}
437491

438492
// ---------------------------------------------------------------
439493
// Table 2: capy::task (capy::thread_pool)
440494
// ---------------------------------------------------------------
441495

496+
442497
// Native — ioaw_read_stream
443498
{
444499
capy::thread_pool pool(1);
@@ -525,6 +580,7 @@ int main()
525580
// Table 3: beman::execution::task (bex::task<void, io_env>)
526581
// ---------------------------------------------------------------
527582

583+
528584
// Native — sndr_read_stream
529585
{
530586
sender_thread_pool pool(1);
@@ -676,18 +732,6 @@ int main()
676732

677733
for (int s = 0; s < NUM_STREAMS; ++s)
678734
{
679-
if (s == SYNC_STREAM &&
680-
table == SENDER_RECEIVER)
681-
{
682-
std::printf(
683-
" %-18s"
684-
" %-30s %-30s\n",
685-
row_labels[s],
686-
" N/A",
687-
" N/A");
688-
continue;
689-
}
690-
691735
double sum[NUM_COLUMNS]{};
692736
double sum2[NUM_COLUMNS]{};
693737
double al[NUM_COLUMNS]{};
Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
//
2-
// Adapted from beman execution examples (Apache-2.0 WITH LLVM-exception)
2+
// Adapted from stdexec (Apache-2.0 WITH LLVM-exception)
33
// for benchmark use.
44
//
55

6-
#ifndef BOOST_CAPY_BENCH_REPEAT_EFFECT_UNTIL_HPP
7-
#define BOOST_CAPY_BENCH_REPEAT_EFFECT_UNTIL_HPP
6+
#ifndef BOOST_CAPY_BENCH_REPEAT_UNTIL_HPP
7+
#define BOOST_CAPY_BENCH_REPEAT_UNTIL_HPP
88

99
#include <beman/execution/execution.hpp>
1010

@@ -29,7 +29,13 @@ struct repeat_connector
2929
auto start() & noexcept -> void { bex::start(op); }
3030
};
3131

32-
inline constexpr struct repeat_effect_until_t
32+
/// Sender algorithm that repeats a child sender until
33+
/// a predicate returns true. Predicate is called with
34+
/// no arguments (child values are discarded).
35+
///
36+
/// Includes a trampoline that bounds recursion depth
37+
/// for synchronous completions (max_depth = 19).
38+
inline constexpr struct repeat_until_t
3339
{
3440
template <bex::sender Child, typename Pred>
3541
struct sender
@@ -47,6 +53,8 @@ inline constexpr struct repeat_effect_until_t
4753
using operation_state_concept =
4854
bex::operation_state_t;
4955

56+
static constexpr std::size_t max_depth = 19;
57+
5058
struct own_receiver
5159
{
5260
using receiver_concept = bex::receiver_t;
@@ -97,24 +105,58 @@ inline constexpr struct repeat_effect_until_t
97105
std::optional<repeat_connector<
98106
std::remove_cvref_t<Child>,
99107
own_receiver>> child_op;
108+
std::size_t depth_ = 0;
109+
bool draining_ = false;
110+
bool again_ = false;
100111

101112
auto start() & noexcept -> void
102113
{
103-
run_one();
114+
drain();
104115
}
105116

106-
auto run_one() & noexcept -> void
117+
// Iterative trampoline that bounds stack
118+
// depth for synchronous completions
119+
auto drain() & noexcept -> void
107120
{
108-
child_op.emplace(child, own_receiver{this});
109-
child_op->start();
121+
draining_ = true;
122+
do
123+
{
124+
again_ = false;
125+
depth_ = 0;
126+
child_op.emplace(
127+
child, own_receiver{this});
128+
child_op->start();
129+
}
130+
while (again_);
131+
draining_ = false;
110132
}
111133

112134
auto next() & noexcept -> void
113135
{
114136
if (pred())
137+
{
115138
bex::set_value(std::move(receiver));
116-
else
117-
run_one();
139+
return;
140+
}
141+
142+
if (!draining_)
143+
{
144+
// Async completion — enter drain loop
145+
drain();
146+
return;
147+
}
148+
149+
if (++depth_ >= max_depth)
150+
{
151+
// Hit depth limit — trampoline
152+
again_ = true;
153+
return;
154+
}
155+
156+
// Within limit — recurse inline
157+
child_op.emplace(
158+
child, own_receiver{this});
159+
child_op->start();
118160
}
119161
};
120162

@@ -137,6 +179,6 @@ inline constexpr struct repeat_effect_until_t
137179
return {std::forward<Child>(child),
138180
std::forward<Pred>(pred)};
139181
}
140-
} repeat_effect_until{};
182+
} repeat_until{};
141183

142184
#endif

bench/beman/sndr_sync_read_stream.hpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
// as_awaitable path returns the coroutine handle
1616
// for symmetric transfer.
1717
//
18-
// WARNING: Using this sender in a loop algorithm
19-
// like repeat_effect_until will stack overflow —
20-
// there is no trampoline for synchronous
21-
// completions. Coroutines handle this via symmetric
22-
// transfer; sender pipelines cannot.
18+
// repeat_until's trampoline bounds stack depth for
19+
// sender pipelines. Coroutines handle this via
20+
// symmetric transfer.
2321
//
2422

2523
#ifndef BOOST_CAPY_BENCH_SNDR_SYNC_READ_STREAM_HPP

0 commit comments

Comments
 (0)