Skip to content

Commit c08b654

Browse files
authored
Merge pull request #1518 from NVIDIA/remove-ye-olde-submit-algorithm
Remove ye olde `submit` algorithm
2 parents 3720eec + f3fed52 commit c08b654

8 files changed

Lines changed: 155 additions & 288 deletions

File tree

include/nvexec/stream/start_detached.cuh

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,106 @@
1818

1919
#pragma once
2020

21+
#include "../../stdexec/execution.hpp"
22+
#include "../../exec/scope.hpp"
23+
2124
#include <exception>
25+
#include <memory>
2226

2327
#include "common.cuh"
24-
#include "submit.cuh"
2528

2629
namespace nvexec::_strm {
2730
namespace _start_detached {
28-
template <class Env>
29-
struct detached_receiver_t : stream_receiver_base {
30-
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
31+
template <class SenderId, class EnvId>
32+
struct operation {
33+
using Sender = __cvref_t<SenderId>;
34+
using Env = __t<EnvId>;
3135

32-
template <class... _Args>
33-
void set_value(_Args&&...) noexcept {
36+
explicit operation(Sender&& sndr, Env env)
37+
: env_(static_cast<Env&&>(env))
38+
, op_state_(connect(static_cast<Sender&&>(sndr), receiver{{}, this})) {
3439
}
3540

36-
template <class _Error>
37-
[[noreturn]]
38-
void set_error(_Error&&) noexcept {
39-
std::terminate();
41+
// If the operation state was allocated with a user-provided allocator, then we must
42+
// use the allocator stored within the operation state to destroy the operation
43+
// state. This is a good time to use C++20's destroying delete operation.
44+
static void destroy_delete(operation* self) noexcept {
45+
if constexpr (__callable<get_allocator_t, Env>) {
46+
auto alloc = stdexec::get_allocator(self->env_);
47+
using Alloc = decltype(alloc);
48+
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<operation>;
49+
OpAlloc op_alloc{alloc};
50+
std::allocator_traits<OpAlloc>::destroy(op_alloc, self);
51+
std::allocator_traits<OpAlloc>::deallocate(op_alloc, self, 1);
52+
} else {
53+
delete self;
54+
}
4055
}
4156

42-
void set_stopped() noexcept {
43-
}
57+
// The start_detached receiver deletes the operation state.
58+
struct receiver : stream_receiver_base {
59+
using receiver_concept = receiver_t;
60+
using t = receiver;
61+
using id = receiver;
4462

45-
auto get_env() const noexcept -> const Env& {
46-
return env_;
47-
}
48-
};
63+
template <class... As>
64+
void set_value(As&&...) noexcept {
65+
operation::destroy_delete(op_); // NB: invalidates *this
66+
}
67+
68+
template <class Error>
69+
[[noreturn]]
70+
void set_error(Error&&) noexcept {
71+
// A detached operation failed. There is noplace for the error to go.
72+
// This is unrecoverable, so we terminate.
73+
std::terminate();
74+
}
4975

76+
void set_stopped() noexcept {
77+
operation::destroy_delete(op_); // NB: invalidates *this
78+
}
79+
80+
auto get_env() const noexcept -> const Env& {
81+
return op_->env_;
82+
}
83+
84+
operation* op_;
85+
};
86+
87+
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
88+
connect_result_t<Sender, receiver> op_state_;
89+
};
5090
} // namespace _start_detached
5191

5292
template <>
5393
struct apply_sender_for<start_detached_t> {
5494
template <class Sender, class Env = __root_env>
55-
void operator()(Sender&& sndr, Env&& env = {}) const {
56-
using _receiver_t = _start_detached::detached_receiver_t<__decay_t<Env>>;
57-
_submit::submit_t{}(static_cast<Sender&&>(sndr), _receiver_t{{}, static_cast<Env&&>(env)});
95+
void operator()(Sender&& sndr, Env&& env = {}) const noexcept(false) {
96+
using Op = _start_detached::operation<__cvref_id<Sender>, __id<__decay_t<Env>>>;
97+
// Use the provided allocator, if any, to allocate the operation state.
98+
if constexpr (__callable<get_allocator_t, Env>) {
99+
auto alloc = get_allocator(env);
100+
using Alloc = decltype(alloc);
101+
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<Op>;
102+
// We use the allocator to allocate the operation state and also to construct it.
103+
OpAlloc op_alloc{alloc};
104+
Op* op = std::allocator_traits<OpAlloc>::allocate(op_alloc, 1);
105+
exec::scope_guard g{[op, &op_alloc]() noexcept {
106+
std::allocator_traits<OpAlloc>::deallocate(op_alloc, op, 1);
107+
}};
108+
// This can potentially throw. If it does, the scope guard will deallocate the
109+
// storage automatically.
110+
std::allocator_traits<OpAlloc>::construct(
111+
op_alloc, op, static_cast<Sender&&>(sndr), static_cast<Env&&>(env));
112+
// The operation state is now constructed, dismiss the scope guard.
113+
g.dismiss();
114+
// This cannot throw:
115+
stdexec::start(op->op_state_);
116+
} else {
117+
// The caller did not provide an allocator, so we use the default allocator.
118+
Op* op = new Op{static_cast<Sender&&>(sndr), static_cast<Env&&>(env)};
119+
start(op->op_state_);
120+
}
58121
}
59122
};
60123
} // namespace nvexec::_strm

include/nvexec/stream/submit.cuh

Lines changed: 0 additions & 84 deletions
This file was deleted.

include/nvexec/stream_context.cuh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "stream/let_xxx.cuh" // IWYU pragma: export
2828
#include "stream/schedule_from.cuh" // IWYU pragma: export
2929
#include "stream/start_detached.cuh" // IWYU pragma: export
30-
#include "stream/submit.cuh" // IWYU pragma: export
3130
#include "stream/split.cuh" // IWYU pragma: export
3231
#include "stream/then.cuh" // IWYU pragma: export
3332
#include "stream/continues_on.cuh" // IWYU pragma: export

include/stdexec/__detail/__execute.hpp

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,14 @@
2121
#include "__receivers.hpp"
2222
#include "__senders.hpp"
2323
#include "__schedulers.hpp"
24-
#include "__submit.hpp"
24+
#include "__start_detached.hpp"
25+
#include "__then.hpp"
2526
#include "__transform_sender.hpp"
2627

27-
#include <exception>
28-
2928
namespace stdexec {
3029
/////////////////////////////////////////////////////////////////////////////
3130
// [execution.execute]
3231
namespace __execute_ {
33-
template <class _Fun>
34-
struct __as_receiver {
35-
using receiver_concept = receiver_t;
36-
_Fun __fun_;
37-
38-
void set_value() noexcept {
39-
// terminates on exception:
40-
__fun_();
41-
}
42-
43-
[[noreturn]]
44-
void set_error(std::exception_ptr) noexcept {
45-
std::terminate();
46-
}
47-
48-
void set_stopped() noexcept {
49-
}
50-
};
51-
5232
struct execute_t {
5333
template <scheduler _Scheduler, class _Fun>
5434
requires __callable<_Fun&> && move_constructible<_Fun>
@@ -61,7 +41,7 @@ namespace stdexec {
6141
template <sender_of<set_value_t()> _Sender, class _Fun>
6242
requires __callable<_Fun&> && move_constructible<_Fun>
6343
void apply_sender(_Sender&& __sndr, _Fun __fun) const noexcept(false) {
64-
__submit(static_cast<_Sender&&>(__sndr), __as_receiver<_Fun>{static_cast<_Fun&&>(__fun)});
44+
start_detached(then(static_cast<_Sender&&>(__sndr), static_cast<_Fun&&>(__fun)));
6545
}
6646
};
6747
} // namespace __execute_

include/stdexec/__detail/__receiver_ref.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include "__execution_fwd.hpp"
1919

2020
#include "__completion_signatures.hpp"
21-
#include "__env.hpp"
2221
#include "__receivers.hpp"
2322

2423
#include <functional>

0 commit comments

Comments
 (0)