Skip to content

Commit 83f1064

Browse files
committed
Add wrap_executor for capy::executor integration
- Add wrap_executor() to adapt Asio executors to capy::executor - Integrate capy::executor into route_params and spawn for executor affinity - Refactor http_stream, plain_worker, and worker_ssl for new executor model - Add comprehensive wrap_executor unit tests - Remove obsolete route_handler_asio.hpp and post_work files
1 parent 2707ced commit 83f1064

13 files changed

Lines changed: 927 additions & 224 deletions

File tree

CLAUDE.md

Lines changed: 0 additions & 64 deletions
This file was deleted.

example/server/main.cpp

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
//
99

1010
#include "certificate.hpp"
11-
#include "post_work.hpp"
1211
#include "serve_detached.hpp"
1312
#include "serve_log_admin.hpp"
1413
#include <boost/beast2/asio_io_context.hpp>
@@ -17,9 +16,12 @@
1716
#include <boost/beast2/server/serve_static.hpp>
1817
#include <boost/beast2/error.hpp>
1918
#include <boost/capy/application.hpp>
19+
#include <boost/capy/async_result.hpp>
20+
#include <boost/capy/thread_pool.hpp>
2021
#include <boost/http_proto/request_parser.hpp>
2122
#include <boost/http_proto/serializer.hpp>
2223
#include <boost/http_proto/server/cors.hpp>
24+
#include <boost/capy/bcrypt.hpp>
2325
#include <boost/capy/brotli/decode.hpp>
2426
#include <boost/capy/brotli/encode.hpp>
2527
#include <boost/capy/zlib/deflate.hpp>
@@ -30,6 +32,8 @@
3032
namespace boost {
3133
namespace beast2 {
3234

35+
capy::thread_pool g_tp;
36+
3337
void install_services(capy::application& app)
3438
{
3539
#ifdef BOOST_CAPY_HAS_BROTLI
@@ -102,7 +106,7 @@ struct do_json_rpc
102106
return http::route::next;
103107
return rp.read_body(
104108
json_sink(),
105-
[this, &rp](
109+
[this](
106110
json::value jv) ->
107111
http::route_result
108112
{
@@ -121,6 +125,46 @@ struct do_json_rpc
121125

122126
};
123127

128+
#ifdef BOOST_BEAST2_HAS_CORO
129+
auto
130+
my_coro(
131+
http::route_params& rp) ->
132+
capy::task<http::route_result>
133+
{
134+
(void)rp;
135+
asio::thread_pool tp(1);
136+
co_await capy::make_async_result<void>(
137+
[&tp](auto&& handler)
138+
{
139+
asio::post(tp.get_executor(),
140+
[handler = std::move(handler)]() mutable
141+
{
142+
// Simulate some asynchronous work
143+
std::this_thread::sleep_for(std::chrono::seconds(1));
144+
handler();
145+
});
146+
});
147+
co_return http::route::next;
148+
}
149+
150+
auto
151+
do_bcrypt(
152+
http::route_params& rp) ->
153+
capy::task<http::route_result>
154+
{
155+
std::string password = "boost";
156+
//auto rv = co_await capy::bcrypt::async_hash(password);
157+
co_await g_tp.get_executor().submit(
158+
[]()
159+
{
160+
std::this_thread::sleep_for(std::chrono::seconds(1));
161+
});
162+
co_return http::route::next;
163+
}
164+
165+
166+
#endif
167+
124168
int server_main( int argc, char* argv[] )
125169
{
126170
try
@@ -150,6 +194,14 @@ int server_main( int argc, char* argv[] )
150194
http::cors(opts),
151195
do_json_rpc()
152196
);
197+
#ifdef BOOST_BEAST2_HAS_CORO
198+
srv.wwwroot.use(
199+
"/spawn",
200+
http::co_route(my_coro));
201+
srv.wwwroot.use(
202+
"/bcrypt",
203+
http::co_route(do_bcrypt));
204+
#endif
153205
srv.wwwroot.use("/", serve_static( argv[3] ));
154206

155207
app.start();

example/server/post_work.cpp

Lines changed: 0 additions & 43 deletions
This file was deleted.

example/server/post_work.hpp

Lines changed: 0 additions & 29 deletions
This file was deleted.

include/boost/beast2.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <boost/beast2/client.hpp>
3535
#include <boost/beast2/endpoint.hpp>
3636
#include <boost/beast2/error.hpp>
37+
#include <boost/beast2/wrap_executor.hpp>
3738
#include <boost/beast2/format.hpp>
3839
#include <boost/beast2/log_service.hpp>
3940
#include <boost/beast2/logger.hpp>

include/boost/beast2/detail/config.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ namespace beast2 {
3939
//------------------------------------------------
4040

4141
#if defined(__cpp_lib_coroutine) && __cpp_lib_coroutine >= 201902L
42-
# define BOOST_BEAST_HAS_CORO 1
42+
# define BOOST_BEAST2_HAS_CORO 1
4343
#elif defined(__cpp_impl_coroutine) && __cpp_impl_coroutines >= 201902L
44-
# define BOOST_BEAST_HAS_CORO 1
44+
# define BOOST_BEAST2_HAS_CORO 1
4545
#endif
4646

4747
//------------------------------------------------

include/boost/beast2/server/http_stream.hpp

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212

1313
#include <boost/beast2/detail/config.hpp>
1414
#include <boost/beast2/log_service.hpp>
15+
#include <boost/beast2/error.hpp>
1516
#include <boost/beast2/format.hpp>
1617
#include <boost/beast2/read.hpp>
17-
#include <boost/beast2/write.hpp>
1818
#include <boost/beast2/server/any_lambda.hpp>
1919
#include <boost/beast2/server/route_handler_asio.hpp>
2020
#include <boost/beast2/server/router_asio.hpp>
21-
#include <boost/beast2/error.hpp>
21+
#include <boost/beast2/wrap_executor.hpp>
22+
#include <boost/beast2/write.hpp>
2223
#include <boost/beast2/detail/except.hpp>
2324
#include <boost/capy/application.hpp>
2425
#include <boost/http_proto/request_parser.hpp>
@@ -101,7 +102,8 @@ class http_stream
101102
std::size_t bytes_transferred);
102103
void on_complete();
103104
http::resumer do_suspend() override;
104-
void do_resume(http::route_result const& ec) override;
105+
void do_resume(http::route_result const& rv) override;
106+
void do_resume(std::exception_ptr ep) override;
105107
void do_close();
106108
void do_fail(core::string_view s,
107109
system::error_code const& ec);
@@ -182,6 +184,7 @@ http_stream(
182184

183185
rp_.serializer = http::serializer(app);
184186
rp_.suspend = http::suspender(*this);
187+
rp_.ex = wrap_executor(stream_.get_executor());
185188
}
186189

187190
// called to start a new HTTP session.
@@ -444,6 +447,40 @@ do_resume(
444447
});
445448
}
446449

450+
// called by resume(ep)
451+
template<class AsyncStream>
452+
void
453+
http_stream<AsyncStream>::
454+
do_resume(
455+
std::exception_ptr ep)
456+
{
457+
asio::dispatch(
458+
stream_.get_executor(),
459+
[this, ep]
460+
{
461+
BOOST_ASSERT(pwg_.get() != nullptr);
462+
pwg_.reset();
463+
464+
rp_.status(http::status::internal_server_error);
465+
try
466+
{
467+
std::rethrow_exception(ep);
468+
}
469+
catch(std::exception const& e)
470+
{
471+
std::string s;
472+
format_to(s, "An internal server error occurred: {}", e.what());
473+
rp_.set_body(s);
474+
}
475+
catch(...)
476+
{
477+
rp_.set_body("An internal server error occurred");
478+
}
479+
rp_.res.set_keep_alive(false);
480+
do_write();
481+
});
482+
}
483+
447484
// called when a non-recoverable error occurs
448485
template<class AsyncStream>
449486
void

0 commit comments

Comments
 (0)