Skip to content

Commit 3f47cfc

Browse files
committed
Add sender-bridge example
1 parent 5d9fa36 commit 3f47cfc

File tree

4 files changed

+348
-0
lines changed

4 files changed

+348
-0
lines changed

example/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ add_subdirectory(stream-pipeline)
2222
add_subdirectory(timeout-cancellation)
2323
add_subdirectory(type-erased-echo)
2424
add_subdirectory(when-any-cancellation)
25+
add_subdirectory(sender-bridge)
2526

2627
if(TARGET Boost::asio)
2728
add_subdirectory(asio)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#
2+
# Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3+
#
4+
# Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
#
7+
# Official repository: https://github.com/cppalliance/capy
8+
#
9+
10+
include(FetchContent)
11+
12+
FetchContent_Declare(
13+
beman-execution
14+
GIT_REPOSITORY https://github.com/bemanproject/execution.git
15+
GIT_TAG main
16+
SYSTEM
17+
)
18+
FetchContent_MakeAvailable(beman-execution)
19+
20+
file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp
21+
CMakeLists.txt
22+
Jamfile)
23+
24+
source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES})
25+
26+
add_executable(capy_example_sender_bridge ${PFILES})
27+
28+
set_property(TARGET capy_example_sender_bridge
29+
PROPERTY FOLDER "examples")
30+
31+
target_compile_features(capy_example_sender_bridge
32+
PRIVATE cxx_std_23)
33+
34+
target_link_libraries(capy_example_sender_bridge
35+
Boost::capy
36+
beman::execution_headers)
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
//
2+
// Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#ifndef BOOST_CAPY_EXAMPLE_SENDER_AWAITABLE_HPP
11+
#define BOOST_CAPY_EXAMPLE_SENDER_AWAITABLE_HPP
12+
13+
#include <boost/capy/ex/io_env.hpp>
14+
15+
#include <beman/execution/execution.hpp>
16+
17+
#include <coroutine>
18+
#include <cstring>
19+
#include <exception>
20+
#include <new>
21+
#include <stop_token>
22+
#include <tuple>
23+
#include <type_traits>
24+
#include <variant>
25+
26+
namespace boost::capy {
27+
28+
namespace detail {
29+
30+
struct stopped_t {};
31+
32+
// The receiver's environment exposes only the stop token.
33+
struct bridge_env
34+
{
35+
std::stop_token st_;
36+
37+
auto query(
38+
beman::execution::get_stop_token_t const&) const noexcept
39+
{
40+
return st_;
41+
}
42+
};
43+
44+
// Deduce the single value tuple type from a sender's completion
45+
// signatures using beman::execution::value_types_of_t.
46+
template<class Sender>
47+
using sender_single_value_t =
48+
beman::execution::value_types_of_t<
49+
Sender,
50+
bridge_env,
51+
std::tuple,
52+
std::type_identity_t>;
53+
54+
// Bridge receiver that stores the sender's completion result
55+
// and posts the coroutine handle back through the Capy executor.
56+
template<class ValueTuple>
57+
struct bridge_receiver
58+
{
59+
using receiver_concept = beman::execution::receiver_t;
60+
61+
std::variant<
62+
std::monostate,
63+
ValueTuple,
64+
std::exception_ptr,
65+
stopped_t>* result_;
66+
std::coroutine_handle<> cont_;
67+
io_env const* env_;
68+
69+
auto get_env() const noexcept -> bridge_env
70+
{
71+
return {env_->stop_token};
72+
}
73+
74+
template<class... Args>
75+
void set_value(Args&&... args) && noexcept
76+
{
77+
result_->template emplace<1>(
78+
std::forward<Args>(args)...);
79+
env_->executor.post(cont_);
80+
}
81+
82+
template<class E>
83+
void set_error(E&& e) && noexcept
84+
{
85+
if constexpr (
86+
std::is_same_v<
87+
std::decay_t<E>, std::exception_ptr>)
88+
result_->template emplace<2>(
89+
std::forward<E>(e));
90+
else
91+
result_->template emplace<2>(
92+
std::make_exception_ptr(
93+
std::forward<E>(e)));
94+
env_->executor.post(cont_);
95+
}
96+
97+
void set_stopped() && noexcept
98+
{
99+
result_->template emplace<3>(stopped_t{});
100+
env_->executor.post(cont_);
101+
}
102+
};
103+
104+
} // namespace detail
105+
106+
/** Awaitable that bridges a beman::execution sender into a Capy coroutine.
107+
108+
Satisfies IoAwaitable. When co_awaited inside a capy::task,
109+
connects the sender to a bridge receiver, starts the operation,
110+
and resumes the coroutine on the caller's executor when the
111+
sender completes.
112+
113+
Stop token propagation: the Capy coroutine's stop_token is
114+
forwarded to the sender through the bridge receiver's
115+
environment.
116+
117+
@tparam Sender The beman::execution sender type.
118+
*/
119+
template<class Sender>
120+
struct [[nodiscard]] sender_awaitable
121+
{
122+
using value_tuple = detail::sender_single_value_t<Sender>;
123+
using receiver_type = detail::bridge_receiver<value_tuple>;
124+
using op_state_type = decltype(
125+
beman::execution::connect(
126+
std::declval<Sender>(),
127+
std::declval<receiver_type>()));
128+
129+
Sender sndr_;
130+
131+
std::variant<
132+
std::monostate,
133+
value_tuple,
134+
std::exception_ptr,
135+
detail::stopped_t> result_{};
136+
137+
alignas(op_state_type)
138+
unsigned char op_buf_[sizeof(op_state_type)];
139+
bool op_constructed_ = false;
140+
141+
explicit sender_awaitable(Sender sndr)
142+
: sndr_(std::move(sndr))
143+
{
144+
}
145+
146+
// Movable only before await_suspend (op_state not yet constructed)
147+
sender_awaitable(sender_awaitable&& o) noexcept(
148+
std::is_nothrow_move_constructible_v<Sender>)
149+
: sndr_(std::move(o.sndr_))
150+
{
151+
}
152+
153+
sender_awaitable(sender_awaitable const&) = delete;
154+
sender_awaitable& operator=(sender_awaitable const&) = delete;
155+
sender_awaitable& operator=(sender_awaitable&&) = delete;
156+
157+
~sender_awaitable()
158+
{
159+
if(op_constructed_)
160+
std::launder(
161+
reinterpret_cast<op_state_type*>(
162+
op_buf_))->~op_state_type();
163+
}
164+
165+
bool await_ready() const noexcept { return false; }
166+
167+
std::coroutine_handle<>
168+
await_suspend(
169+
std::coroutine_handle<> h,
170+
io_env const* env)
171+
{
172+
::new(op_buf_) op_state_type(
173+
beman::execution::connect(
174+
std::move(sndr_),
175+
receiver_type{&result_, h, env}));
176+
op_constructed_ = true;
177+
beman::execution::start(
178+
*std::launder(
179+
reinterpret_cast<op_state_type*>(
180+
op_buf_)));
181+
return std::noop_coroutine();
182+
}
183+
184+
auto await_resume()
185+
{
186+
if(result_.index() == 2)
187+
std::rethrow_exception(
188+
std::get<2>(result_));
189+
if(result_.index() == 3)
190+
throw std::runtime_error(
191+
"sender completed with set_stopped");
192+
193+
if constexpr (std::tuple_size_v<value_tuple> == 0)
194+
return;
195+
else if constexpr (std::tuple_size_v<value_tuple> == 1)
196+
return std::get<0>(
197+
std::get<1>(std::move(result_)));
198+
else
199+
return std::get<1>(std::move(result_));
200+
}
201+
};
202+
203+
/** Create an IoAwaitable from a beman::execution sender.
204+
205+
@par Example
206+
@code
207+
capy::task<int> compute(auto sched)
208+
{
209+
auto result = co_await await_sender(
210+
beman::execution::schedule(sched)
211+
| beman::execution::then(
212+
[] { return 42; }));
213+
co_return result;
214+
}
215+
@endcode
216+
217+
@param sndr The sender to bridge.
218+
@return An IoAwaitable that can be co_awaited in a capy::task.
219+
*/
220+
template<class Sender>
221+
auto await_sender(Sender&& sndr)
222+
{
223+
return sender_awaitable<std::decay_t<Sender>>(
224+
std::forward<Sender>(sndr));
225+
}
226+
227+
} // namespace boost::capy
228+
229+
#endif
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
//
2+
// Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#include "sender_awaitable.hpp"
11+
12+
#include <boost/capy.hpp>
13+
14+
#include <beman/execution/execution.hpp>
15+
16+
#include <iostream>
17+
#include <latch>
18+
#include <thread>
19+
20+
namespace capy = boost::capy;
21+
namespace ex = beman::execution;
22+
23+
capy::task<int> compute(auto sched)
24+
{
25+
int result = co_await capy::await_sender(
26+
ex::schedule(sched)
27+
| ex::then([] {
28+
std::cout
29+
<< " sender running on thread "
30+
<< std::this_thread::get_id() << "\n";
31+
return 42 * 42;
32+
}));
33+
34+
std::cout
35+
<< " coroutine resumed on thread "
36+
<< std::this_thread::get_id() << "\n";
37+
38+
co_return result;
39+
}
40+
41+
int main()
42+
{
43+
std::cout
44+
<< "main thread: "
45+
<< std::this_thread::get_id() << "\n";
46+
47+
// Capy execution context
48+
capy::thread_pool pool;
49+
50+
// Beman execution context (run_loop on a dedicated thread)
51+
ex::run_loop loop;
52+
std::jthread loop_thread([&loop] {
53+
loop.run();
54+
});
55+
auto sched = loop.get_scheduler();
56+
57+
std::latch done(1);
58+
int answer = 0;
59+
60+
auto on_complete = [&](int v) {
61+
answer = v;
62+
done.count_down();
63+
};
64+
65+
auto on_error = [&](std::exception_ptr ep) {
66+
try { std::rethrow_exception(ep); }
67+
catch (std::exception const& e) {
68+
std::cerr << "error: " << e.what() << "\n";
69+
}
70+
done.count_down();
71+
};
72+
73+
capy::run_async(
74+
pool.get_executor(),
75+
on_complete,
76+
on_error)(compute(sched));
77+
78+
done.wait();
79+
loop.finish();
80+
81+
std::cout << "result: " << answer << "\n";
82+
}

0 commit comments

Comments
 (0)