Skip to content

Commit 4138e90

Browse files
committed
Improved awaitable-sender
1 parent 902bb19 commit 4138e90

2 files changed

Lines changed: 243 additions & 17 deletions

File tree

example/awaitable-sender/awaitable_sender.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <chrono>
1717
#include <iostream>
1818
#include <latch>
19+
#include <stop_token>
1920
#include <system_error>
2021
#include <thread>
2122

@@ -95,4 +96,67 @@ int main()
9596

9697
done.wait();
9798
std::cout << " delay completed\n";
99+
100+
// Test cancellation via stop token
101+
std::cout << "\n--- cancellation test ---\n";
102+
std::stop_source ss;
103+
std::latch done2(1);
104+
105+
auto sndr2 = capy::as_sender(capy::delay(5s));
106+
auto op2 = ex::connect(
107+
std::move(sndr2),
108+
demo_receiver{
109+
{pool.get_executor(), ss.get_token()},
110+
&done2});
111+
112+
std::cout << " starting 5s delay...\n";
113+
ex::start(op2);
114+
115+
std::this_thread::sleep_for(100ms);
116+
std::cout << " requesting stop...\n";
117+
ss.request_stop();
118+
119+
done2.wait();
120+
std::cout << " cancellation test done\n";
121+
122+
// Test split_ec with success (error_code == 0)
123+
std::cout << "\n--- split_ec success test ---\n";
124+
std::latch done3(1);
125+
126+
auto sndr3 = capy::split_ec(
127+
capy::as_sender(capy::delay(100ms)));
128+
auto op3 = ex::connect(
129+
std::move(sndr3),
130+
demo_receiver{
131+
{pool.get_executor(), std::stop_token{}},
132+
&done3});
133+
134+
ex::start(op3);
135+
done3.wait();
136+
std::cout << " split_ec success test done\n";
137+
138+
// Test split_ec with error (error_code != 0)
139+
std::cout << "\n--- split_ec error test ---\n";
140+
std::latch done4(1);
141+
142+
auto make_ec_sender = [&pool]() {
143+
auto task = [](capy::executor_ref)
144+
-> capy::task<std::error_code>
145+
{
146+
co_return std::make_error_code(
147+
std::errc::connection_reset);
148+
}(pool.get_executor());
149+
return capy::as_sender(std::move(task));
150+
};
151+
152+
auto sndr4 = capy::split_ec(make_ec_sender());
153+
auto op4 = ex::connect(
154+
std::move(sndr4),
155+
demo_receiver{
156+
{pool.get_executor(), std::stop_token{}},
157+
&done4});
158+
159+
ex::start(op4);
160+
done4.wait();
161+
std::cout << " split_ec error test done\n";
98162
}

example/awaitable-sender/awaitable_sender.hpp

Lines changed: 179 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -279,40 +279,59 @@ struct awaitable_sender
279279

280280
env_ = io_env{ex, st, nullptr};
281281

282-
bridge_ = [](IoAw aw, Receiver rcvr)
282+
bridge_ = [](
283+
IoAw aw,
284+
Receiver rcvr,
285+
std::stop_token const* st)
283286
-> detail::bridge_task<IoAw, Receiver>
284287
{
285288
try
286289
{
287290
if constexpr (std::is_void_v<result_type>)
288291
{
289292
co_await std::move(aw);
290-
beman::execution::set_value(
291-
std::move(rcvr));
293+
if (st->stop_requested())
294+
beman::execution::set_stopped(
295+
std::move(rcvr));
296+
else
297+
beman::execution::set_value(
298+
std::move(rcvr));
292299
}
293300
else if constexpr (
294301
detail::is_ec_outcome_v<result_type>)
295302
{
296303
auto result = co_await std::move(aw);
297-
std::error_code ec;
298-
if constexpr (std::is_same_v<
299-
result_type, std::error_code>)
300-
ec = result;
301-
else
302-
ec = get<0>(result);
303-
if (!ec)
304-
beman::execution::set_value(
304+
if (st->stop_requested())
305+
{
306+
beman::execution::set_stopped(
305307
std::move(rcvr));
308+
}
306309
else
307-
beman::execution::set_error(
308-
std::move(rcvr), ec);
310+
{
311+
std::error_code ec;
312+
if constexpr (std::is_same_v<
313+
result_type, std::error_code>)
314+
ec = result;
315+
else
316+
ec = get<0>(result);
317+
if (!ec)
318+
beman::execution::set_value(
319+
std::move(rcvr));
320+
else
321+
beman::execution::set_error(
322+
std::move(rcvr), ec);
323+
}
309324
}
310325
else
311326
{
312327
auto result = co_await std::move(aw);
313-
beman::execution::set_value(
314-
std::move(rcvr),
315-
std::move(result));
328+
if (st->stop_requested())
329+
beman::execution::set_stopped(
330+
std::move(rcvr));
331+
else
332+
beman::execution::set_value(
333+
std::move(rcvr),
334+
std::move(result));
316335
}
317336
}
318337
catch(...)
@@ -321,7 +340,8 @@ struct awaitable_sender
321340
std::move(rcvr),
322341
std::current_exception());
323342
}
324-
}(std::move(aw_), std::move(rcvr_));
343+
}(std::move(aw_), std::move(rcvr_),
344+
&env_.stop_token);
325345

326346
bridge_.h_.promise().env_ = &env_;
327347
bridge_.h_.resume();
@@ -385,6 +405,148 @@ auto as_sender(IoAw&& aw)
385405
std::forward<IoAw>(aw)};
386406
}
387407

408+
// -------------------------------------------------------
409+
// split_ec: sender adapter that routes error_code to
410+
// set_value() or set_error(ec) at runtime.
411+
// -------------------------------------------------------
412+
413+
namespace detail {
414+
415+
template<class Sender>
416+
struct split_ec_sender
417+
{
418+
using sender_concept = beman::execution::sender_t;
419+
420+
using completion_signatures =
421+
beman::execution::completion_signatures<
422+
beman::execution::set_value_t(),
423+
beman::execution::set_error_t(std::error_code),
424+
beman::execution::set_error_t(std::exception_ptr),
425+
beman::execution::set_stopped_t()>;
426+
427+
Sender sndr_;
428+
429+
template<class Receiver>
430+
struct ec_receiver
431+
{
432+
using receiver_concept = beman::execution::receiver_t;
433+
434+
Receiver rcvr_;
435+
436+
auto get_env() const noexcept
437+
{
438+
return beman::execution::get_env(rcvr_);
439+
}
440+
441+
void set_value(std::error_code ec) && noexcept
442+
{
443+
if (!ec)
444+
beman::execution::set_value(
445+
std::move(rcvr_));
446+
else
447+
beman::execution::set_error(
448+
std::move(rcvr_), ec);
449+
}
450+
451+
void set_value() && noexcept
452+
{
453+
beman::execution::set_value(
454+
std::move(rcvr_));
455+
}
456+
457+
template<class E>
458+
void set_error(E&& e) && noexcept
459+
{
460+
beman::execution::set_error(
461+
std::move(rcvr_),
462+
std::forward<E>(e));
463+
}
464+
465+
void set_stopped() && noexcept
466+
{
467+
beman::execution::set_stopped(
468+
std::move(rcvr_));
469+
}
470+
};
471+
472+
template<class Receiver>
473+
struct op_state
474+
{
475+
using operation_state_concept =
476+
beman::execution::operation_state_t;
477+
478+
using inner_op_t = decltype(
479+
beman::execution::connect(
480+
std::declval<Sender>(),
481+
std::declval<ec_receiver<Receiver>>()));
482+
483+
inner_op_t op_;
484+
485+
op_state(Sender sndr, Receiver rcvr)
486+
: op_(beman::execution::connect(
487+
std::move(sndr),
488+
ec_receiver<Receiver>{std::move(rcvr)}))
489+
{
490+
}
491+
492+
op_state(op_state const&) = delete;
493+
op_state(op_state&&) = delete;
494+
op_state& operator=(op_state const&) = delete;
495+
op_state& operator=(op_state&&) = delete;
496+
497+
void start() noexcept
498+
{
499+
beman::execution::start(op_);
500+
}
501+
};
502+
503+
template<class Receiver>
504+
auto connect(Receiver rcvr) &&
505+
-> op_state<Receiver>
506+
{
507+
return op_state<Receiver>(
508+
std::move(sndr_), std::move(rcvr));
509+
}
510+
511+
template<class Receiver>
512+
auto connect(Receiver rcvr) const&
513+
-> op_state<Receiver>
514+
{
515+
return op_state<Receiver>(
516+
sndr_, std::move(rcvr));
517+
}
518+
};
519+
520+
} // namespace detail
521+
522+
/** Split an `error_code` value channel into success and error channels.
523+
524+
Takes a sender that completes with `set_value(error_code)` and
525+
routes it at runtime: `set_value()` when the code is zero,
526+
`set_error(ec)` otherwise. No exceptions.
527+
528+
@par Example
529+
@code
530+
do_read(sock, buf)
531+
| split_ec()
532+
| ex::upon_error(
533+
[](std::error_code ec) {
534+
// reachable, no exceptions
535+
});
536+
@endcode
537+
538+
@param sndr The predecessor sender.
539+
@return A sender completing with `set_value()`,
540+
`set_error(error_code)`, or `set_stopped()`.
541+
*/
542+
template<class Sender>
543+
auto split_ec(Sender&& sndr)
544+
{
545+
return detail::split_ec_sender<
546+
std::decay_t<Sender>>{
547+
std::forward<Sender>(sndr)};
548+
}
549+
388550
} // namespace boost::capy
389551

390552
#endif

0 commit comments

Comments
 (0)