Skip to content

Commit 34c5b62

Browse files
authored
Receiver lifetime (#241)
* added a sender to avoid environments getting stale too early * clang format * applied review feedback (partly from AI) * fixed a few issues - remove fatal error when homebrew isn't used - added a simple build procedure to the Makefile - added an example using suspend_never - added completion signatures to inline_scheduler - removed some gibberish from store_receiver.hpp * use modules in suspend_never example when enabled * formatting changes * another attempt at fixing suspend_never * disabling the suspend_never example for now * removing trailing spaces
1 parent cdcdf66 commit 34c5b62

13 files changed

Lines changed: 278 additions & 32 deletions

File tree

Makefile

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,23 @@ endif
107107
# TODO: beman.execution.examples.modules
108108
# FIXME: beman.execution.execution-module.test beman.execution.stop-token-module.test
109109

110-
default: module
110+
default: simple
111+
112+
SIMPLE_BUILD = build/simple-$(shell uname -s)
113+
114+
.PHONY: simple simple-configure simple-build simple-test
115+
116+
simple: simple-test
117+
118+
simple-config:
119+
cmake -G Ninja -S . -B $(SIMPLE_BUILD) -DBEMAN_USE_MODULES=OFF -DCXXFLAGS=
120+
121+
simple-build: simple-config
122+
CXXFLAGS= cmake --build $(SIMPLE_BUILD)
123+
124+
simple-test: simple-build
125+
ctest --test-dir $(SIMPLE_BUILD)
126+
111127

112128
all: $(SANITIZERS)
113129

cmake/prelude.cmake

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,6 @@ if(
123123
)
124124
# gersemi: on
125125
else()
126-
message(
127-
FATAL_ERROR
128-
"File does NOT EXISTS! ${CMAKE_CXX_STDLIB_MODULES_JSON}"
129-
)
126+
message(STATUS "File does NOT EXISTS! ${CMAKE_CXX_STDLIB_MODULES_JSON}")
130127
endif()
131128
endif()

examples/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ if(PROJECT_IS_TOP_LEVEL)
1717
enable_testing()
1818
endif()
1919

20+
set(TODO stop_token) #-dk:TODO including that causes a linker error
21+
set(TODO suspend_never) #-dk:TODO including that causes ASAN errors
22+
2023
set(EXAMPLES
2124
allocator
2225
doc-just
@@ -29,7 +32,6 @@ set(EXAMPLES
2932
playground
3033
sender-demo
3134
stackoverflow
32-
stop_token
3335
stopping
3436
when_all-cancel
3537
)

examples/suspend_never.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#ifdef BEMAN_HAS_IMPORT_STD
2+
import std;
3+
#else
4+
#include <coroutine>
5+
#include <iostream>
6+
#include <new>
7+
#include <memory>
8+
#endif
9+
#include <beman/execution/execution.hpp>
10+
11+
namespace ex = beman::execution;
12+
13+
void* operator new(std::size_t n) {
14+
auto p = std::malloc(n);
15+
std::cout << "global new(" << n << ")->" << p << "\n";
16+
return p;
17+
}
18+
void operator delete(void* ptr) noexcept {
19+
std::cout << "global operator delete()" << ptr << "\n";
20+
std::free(ptr);
21+
}
22+
23+
int main() {
24+
struct resource : std::pmr::memory_resource {
25+
void* do_allocate(std::size_t n, std::size_t a) override {
26+
auto p{std::malloc(n)};
27+
std::cout << "resource::allocate(" << n << ", " << a << ") -> " << p << "\n";
28+
return p;
29+
}
30+
void do_deallocate(void* p, std::size_t n, std::size_t a) override {
31+
std::cout << "resource::deallocate(" << p << ", " << n << ", " << a << ")\n";
32+
std::free(p);
33+
}
34+
bool do_is_equal(const std::pmr::memory_resource& other) const noexcept override { return this == &other; }
35+
} res{};
36+
37+
ex::sync_wait(ex::write_env(
38+
std::suspend_never(), ex::env{ex::prop{ex::get_allocator, std::pmr::polymorphic_allocator<std::byte>(&res)}}));
39+
}

include/beman/execution/detail/affine_on.hpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import beman.execution.detail.get_completion_signatures;
2424
import beman.execution.detail.get_domain_early;
2525
import beman.execution.detail.get_scheduler;
2626
import beman.execution.detail.get_stop_token;
27-
import beman.execution.detail.join_env;
2827
import beman.execution.detail.make_sender;
2928
import beman.execution.detail.never_stop_token;
3029
import beman.execution.detail.nested_sender_has_affine_on;
@@ -37,8 +36,10 @@ import beman.execution.detail.sender_adaptor_closure;
3736
import beman.execution.detail.sender_for;
3837
import beman.execution.detail.sender_has_affine_on;
3938
import beman.execution.detail.set_value;
39+
import beman.execution.detail.store_receiver;
4040
import beman.execution.detail.tag_of_t;
4141
import beman.execution.detail.transform_sender;
42+
import beman.execution.detail.unstoppable;
4243
import beman.execution.detail.write_env;
4344
#else
4445
#include <beman/execution/detail/env.hpp>
@@ -47,7 +48,6 @@ import beman.execution.detail.write_env;
4748
#include <beman/execution/detail/get_domain_early.hpp>
4849
#include <beman/execution/detail/get_scheduler.hpp>
4950
#include <beman/execution/detail/get_stop_token.hpp>
50-
#include <beman/execution/detail/join_env.hpp>
5151
#include <beman/execution/detail/make_sender.hpp>
5252
#include <beman/execution/detail/never_stop_token.hpp>
5353
#include <beman/execution/detail/prop.hpp>
@@ -57,8 +57,11 @@ import beman.execution.detail.write_env;
5757
#include <beman/execution/detail/sender_adaptor_closure.hpp>
5858
#include <beman/execution/detail/sender_for.hpp>
5959
#include <beman/execution/detail/sender_has_affine_on.hpp>
60+
#include <beman/execution/detail/set_value.hpp>
61+
#include <beman/execution/detail/store_receiver.hpp>
6062
#include <beman/execution/detail/tag_of_t.hpp>
6163
#include <beman/execution/detail/transform_sender.hpp>
64+
#include <beman/execution/detail/unstoppable.hpp>
6265
#include <beman/execution/detail/write_env.hpp>
6366
#endif
6467

@@ -136,13 +139,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
136139
static auto transform_sender(Sender&& sender, const Env& ev) {
137140
static_assert(requires {
138141
{
139-
::beman::execution::get_completion_signatures<
140-
decltype(::beman::execution::schedule(::beman::execution::get_scheduler(ev))),
141-
decltype(::beman::execution::detail::join_env(
142-
::beman::execution::env{::beman::execution::prop{
143-
::beman::execution::get_stop_token, ::beman::execution::never_stop_token{}, {}}},
144-
ev))>()
145-
} -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>;
142+
::beman::execution::get_completion_signatures<decltype(::beman::execution::unstoppable(
143+
::beman::execution::schedule(
144+
::beman::execution::get_scheduler(ev))),
145+
ev)>()
146+
} //-dk:TODO ->
147+
//::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>
148+
;
146149
});
147150
//[[maybe_unused]] auto& [tag, data, child] = sender;
148151
auto& child = sender.template get<2>();
@@ -152,11 +155,13 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
152155
constexpr child_tag_t t{};
153156
return t.affine_on(::beman::execution::detail::forward_like<Sender>(child), ev);
154157
} else {
155-
return ::beman::execution::write_env(
156-
::beman::execution::schedule_from(
157-
::beman::execution::get_scheduler(ev),
158-
::beman::execution::write_env(::beman::execution::detail::forward_like<Sender>(child), ev)),
159-
beman::execution::detail::affine_on_env(ev));
158+
return ::beman::execution::detail::store_receiver(
159+
::beman::execution::detail::forward_like<Sender>(child),
160+
[]<typename Child>(Child&& child, const auto& ev) {
161+
return ::beman::execution::unstoppable(::beman::execution::schedule_from(
162+
::beman::execution::get_scheduler(ev),
163+
::beman::execution::write_env(::std::forward<Child>(child), ev)));
164+
});
160165
}
161166
}
162167
template <typename, typename...>

include/beman/execution/detail/inline_scheduler.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ struct inline_scheduler {
5353
struct sender {
5454
using sender_concept = ::beman::execution::sender_t;
5555
using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>;
56+
template <typename...>
57+
static consteval auto get_completion_signatures() noexcept -> completion_signatures {
58+
return {};
59+
}
5660

5761
static constexpr auto get_env() noexcept -> env { return {}; }
5862

include/beman/execution/detail/sender_awaitable.hpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import std;
1212
#include <concepts>
1313
#include <coroutine>
1414
#include <exception>
15-
#include <thread>
1615
#include <tuple>
1716
#include <type_traits>
1817
#include <utility>
@@ -49,20 +48,19 @@ import beman.execution.detail.unspecified_promise;
4948
namespace beman::execution::detail {
5049
template <class Sndr, class Promise>
5150
class sender_awaitable {
51+
inline static constexpr bool enable_defence{true};
5252
struct unit {};
5353
using value_type =
5454
::beman::execution::detail::single_sender_value_type<Sndr, ::beman::execution::env_of_t<Promise>>;
5555
using result_type = ::std::conditional_t<::std::is_void_v<value_type>, unit, value_type>;
5656
using variant_type = ::std::variant<::std::monostate, result_type, ::std::exception_ptr>;
57-
using data_type = ::std::tuple<variant_type, ::std::atomic<::std::thread::id>, ::std::coroutine_handle<Promise>>;
57+
using data_type = ::std::tuple<variant_type, ::std::atomic<bool>, ::std::coroutine_handle<Promise>>;
5858

5959
struct awaitable_receiver {
6060
using receiver_concept = ::beman::execution::receiver_t;
6161

6262
void resume() {
63-
std::thread::id id(::std::this_thread::get_id());
64-
if (not ::std::get<1>(*result_ptr_)
65-
.compare_exchange_strong(id, ::std::thread::id{}, std::memory_order_acq_rel)) {
63+
if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, std::memory_order_acq_rel)) {
6664
::std::get<2>(*result_ptr_).resume();
6765
}
6866
}
@@ -85,9 +83,7 @@ class sender_awaitable {
8583
}
8684

8785
void set_stopped() && noexcept {
88-
std::thread::id id(::std::this_thread::get_id());
89-
if (not ::std::get<1>(*result_ptr_)
90-
.compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
86+
if (not enable_defence || ::std::get<1>(*result_ptr_).exchange(true, ::std::memory_order_acq_rel)) {
9187
static_cast<::std::coroutine_handle<>>(::std::get<2>(*result_ptr_).promise().unhandled_stopped())
9288
.resume();
9389
}
@@ -107,16 +103,14 @@ class sender_awaitable {
107103

108104
public:
109105
sender_awaitable(Sndr&& sndr, Promise& p)
110-
: result{::std::monostate{}, ::std::this_thread::get_id(), ::std::coroutine_handle<Promise>::from_promise(p)},
106+
: result{::std::monostate{}, false, ::std::coroutine_handle<Promise>::from_promise(p)},
111107
state{::beman::execution::connect(::std::forward<Sndr>(sndr),
112108
sender_awaitable::awaitable_receiver{::std::addressof(result)})} {}
113109

114110
static constexpr bool await_ready() noexcept { return false; }
115111
::std::coroutine_handle<> await_suspend(::std::coroutine_handle<Promise> handle) noexcept {
116112
::beman::execution::start(state);
117-
::std::thread::id id(::std::this_thread::get_id());
118-
if (not ::std::get<1>(this->result)
119-
.compare_exchange_strong(id, ::std::thread::id{}, ::std::memory_order_acq_rel)) {
113+
if (enable_defence && ::std::get<1>(this->result).exchange(true, std::memory_order_acq_rel)) {
120114
if (::std::holds_alternative<::std::monostate>(::std::get<0>(this->result))) {
121115
return ::std::get<2>(this->result).promise().unhandled_stopped();
122116
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// include/beman/execution/detail/store_receiver.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER
5+
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_STORE_RECEIVER
6+
7+
#include <beman/execution/detail/common.hpp>
8+
#ifdef BEMAN_HAS_IMPORT_STD
9+
import std;
10+
#else
11+
#include <memory>
12+
#include <utility>
13+
#include <type_traits>
14+
#endif
15+
#ifdef BEMAN_HAS_MODULES
16+
import beman.execution.detail.connect;
17+
import beman.execution.detail.connect_result_t;
18+
import beman.execution.detail.env_of_t;
19+
import beman.execution.detail.get_completion_signatures;
20+
import beman.execution.detail.get_env;
21+
import beman.execution.detail.operation_state;
22+
import beman.execution.detail.receiver;
23+
import beman.execution.detail.sender;
24+
import beman.execution.detail.set_error;
25+
import beman.execution.detail.set_stopped;
26+
import beman.execution.detail.set_value;
27+
import beman.execution.detail.start;
28+
#else
29+
#include <beman/execution/detail/connect.hpp>
30+
#include <beman/execution/detail/connect_result_t.hpp>
31+
#include <beman/execution/detail/env_of_t.hpp>
32+
#include <beman/execution/detail/get_completion_signatures.hpp>
33+
#include <beman/execution/detail/get_env.hpp>
34+
#include <beman/execution/detail/sender.hpp>
35+
#include <beman/execution/detail/receiver.hpp>
36+
#include <beman/execution/detail/set_error.hpp>
37+
#include <beman/execution/detail/set_stopped.hpp>
38+
#include <beman/execution/detail/set_value.hpp>
39+
#include <beman/execution/detail/operation_state.hpp>
40+
#include <beman/execution/detail/start.hpp>
41+
#endif
42+
43+
// ----------------------------------------------------------------------------
44+
45+
namespace beman::execution::detail {
46+
struct store_receiver_t {
47+
template <::beman::execution::receiver Rcvr>
48+
struct receiver {
49+
using receiver_concept = ::beman::execution::receiver_t;
50+
Rcvr* rcvr;
51+
template <typename... Args>
52+
auto set_value(Args&&... args) && noexcept -> void {
53+
::beman::execution::set_value(::std::move(*this->rcvr), ::std::forward<Args>(args)...);
54+
}
55+
template <typename Error>
56+
auto set_error(Error&& error) && noexcept -> void {
57+
::beman::execution::set_error(::std::move(*this->rcvr), ::std::forward<Error>(error));
58+
}
59+
auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(*this->rcvr)); }
60+
auto get_env() const noexcept { return ::beman::execution::get_env(*this->rcvr); }
61+
};
62+
template <::beman::execution::sender Sndr, typename Trans, ::beman::execution::receiver Rcvr>
63+
struct state {
64+
using operation_state_concept = ::beman::execution::operation_state_t;
65+
using env_t = ::beman::execution::env_of_t<Rcvr>;
66+
using state_t = ::beman::execution::connect_result_t<decltype(::std::declval<Trans>()(
67+
::std::declval<Sndr>(), ::std::declval<env_t>())),
68+
receiver<Rcvr>>;
69+
Rcvr rcvr;
70+
state_t op_state;
71+
template <::beman::execution::sender S, typename T, ::beman::execution::receiver R>
72+
state(S&& sndr, T&& trans, R&& r)
73+
: rcvr(::std::forward<R>(r)),
74+
op_state(::beman::execution::connect(
75+
::std::forward<T>(trans)(::std::forward<S>(sndr), ::beman::execution::get_env(this->rcvr)),
76+
receiver<Rcvr>{::std::addressof(this->rcvr)})) {}
77+
auto start() & noexcept { ::beman::execution::start(this->op_state); }
78+
};
79+
template <::beman::execution::sender Sndr, typename Trans>
80+
struct sender {
81+
using sender_concept = ::beman::execution::sender_t;
82+
template <typename... Env>
83+
static consteval auto get_completion_signatures(Env&&... env) noexcept {
84+
return ::beman::execution::
85+
get_completion_signatures<decltype(::std::declval<Trans>()(::std::declval<Sndr>())), Env...>();
86+
}
87+
::std::remove_cvref_t<Sndr> sndr;
88+
::std::remove_cvref_t<Trans> trans;
89+
90+
template <::beman::execution::receiver Receiver>
91+
auto connect(Receiver&& r) && {
92+
static_assert(::beman::execution::operation_state<state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>>);
93+
return state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>(
94+
::std::move(this->sndr), ::std::move(this->trans), ::std::forward<Receiver>(r));
95+
}
96+
template <::beman::execution::receiver Receiver>
97+
auto connect(Receiver&& r) const& {
98+
static_assert(::beman::execution::operation_state<state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>>);
99+
return state<Sndr, Trans, ::std::remove_cvref_t<Receiver>>(
100+
this->sndr, this->trans, ::std::forward<Receiver>(r));
101+
}
102+
};
103+
template <::beman::execution::sender Sndr, typename Trans>
104+
auto operator()(Sndr&& sndr, Trans&& trans) const {
105+
static_assert(::beman::execution::sender<sender<Sndr, Trans>>);
106+
return sender<Sndr, Trans>{::std::forward<Sndr>(sndr), ::std::forward<Trans>(trans)};
107+
}
108+
};
109+
110+
inline constexpr store_receiver_t store_receiver{};
111+
} // namespace beman::execution::detail
112+
113+
// ----------------------------------------------------------------------------
114+
115+
#endif

0 commit comments

Comments
 (0)