Skip to content

Commit 815ee8a

Browse files
committed
merge the __submit implementation into start_detached for nvexec/
1 parent a4e0899 commit 815ee8a

3 files changed

Lines changed: 83 additions & 104 deletions

File tree

include/nvexec/stream/start_detached.cuh

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,107 @@
1818

1919
#pragma once
2020

21+
#include "../../stdexec/execution.hpp"
22+
#include "../../exec/scope.hpp"
23+
2124
#include <exception>
25+
#include <memory>
2226

2327
#include "common.cuh"
24-
#include "submit.cuh"
2528

2629
namespace nvexec::_strm {
2730
namespace _start_detached {
28-
template <class Env>
29-
struct detached_receiver_t : stream_receiver_base {
30-
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
31+
template <class SenderId, class EnvId>
32+
struct operation {
33+
using Sender = __cvref_t<SenderId>;
34+
using Env = __t<EnvId>;
3135

32-
template <class... _Args>
33-
void set_value(_Args&&...) noexcept {
36+
explicit operation(Sender&& sndr, Env env)
37+
: env_(static_cast<Env&&>(env))
38+
, op_state_(connect(static_cast<Sender&&>(sndr), receiver{{}, this})) {
3439
}
3540

36-
template <class _Error>
37-
[[noreturn]]
38-
void set_error(_Error&&) noexcept {
39-
std::terminate();
41+
// If the operation state was allocated with a user-provided allocator, then we must
42+
// use the allocator stored within the operation state to destroy the operation
43+
// state. This is a good time to use C++20's destroying delete operation.
44+
static void operator delete(operation* self, std::destroying_delete_t) noexcept {
45+
if constexpr (__callable<get_allocator_t, Env>) {
46+
auto alloc = stdexec::get_allocator(self->env_);
47+
using Alloc = decltype(alloc);
48+
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<operation>;
49+
OpAlloc op_alloc{alloc};
50+
std::allocator_traits<OpAlloc>::destroy(op_alloc, self);
51+
std::allocator_traits<OpAlloc>::deallocate(op_alloc, self, 1);
52+
} else {
53+
std::destroy_at(self);
54+
::operator delete(self);
55+
}
4056
}
4157

42-
void set_stopped() noexcept {
43-
}
58+
// The start_detached receiver deletes the operation state.
59+
struct receiver : stream_receiver_base {
60+
using receiver_concept = receiver_t;
61+
using t = receiver;
62+
using id = receiver;
63+
operation* op_;
4464

45-
auto get_env() const noexcept -> const Env& {
46-
return env_;
47-
}
48-
};
65+
template <class... As>
66+
void set_value(As&&...) noexcept {
67+
delete op_; // NB: invalidates *this
68+
}
69+
70+
template <class Error>
71+
[[noreturn]]
72+
void set_error(Error&&) noexcept {
73+
// A detached operation failed. There is noplace for the error to go.
74+
// This is unrecoverable, so we terminate.
75+
std::terminate();
76+
}
4977

78+
void set_stopped() noexcept {
79+
delete op_; // NB: invalidates *this
80+
}
81+
82+
auto get_env() const noexcept -> const Env& {
83+
return op_->env_;
84+
}
85+
};
86+
87+
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
88+
connect_result_t<Sender, receiver> op_state_;
89+
};
5090
} // namespace _start_detached
5191

5292
template <>
5393
struct apply_sender_for<start_detached_t> {
5494
template <class Sender, class Env = __root_env>
55-
void operator()(Sender&& sndr, Env&& env = {}) const {
56-
using _receiver_t = _start_detached::detached_receiver_t<__decay_t<Env>>;
57-
_submit::submit_t{}(static_cast<Sender&&>(sndr), _receiver_t{{}, static_cast<Env&&>(env)});
95+
void operator()(Sender&& sndr, Env&& env = {}) const noexcept(false) {
96+
using Op = _start_detached::operation<__cvref_id<Sender>, __id<__decay_t<Env>>>;
97+
// Use the provided allocator, if any, to allocate the operation state.
98+
if constexpr (__callable<get_allocator_t, Env>) {
99+
auto alloc = get_allocator(env);
100+
using Alloc = decltype(alloc);
101+
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<Op>;
102+
// We use the allocator to allocate the operation state and also to construct it.
103+
OpAlloc op_alloc{alloc};
104+
Op* op = std::allocator_traits<OpAlloc>::allocate(op_alloc, 1);
105+
exec::scope_guard g{[op, &op_alloc]() noexcept {
106+
std::allocator_traits<OpAlloc>::deallocate(op_alloc, op, 1);
107+
}};
108+
// This can potentially throw. If it does, the scope guard will deallocate the
109+
// storage automatically.
110+
std::allocator_traits<OpAlloc>::construct(
111+
op_alloc, op, static_cast<Sender&&>(sndr), static_cast<Env&&>(env));
112+
// The operation state is now constructed, dismiss the scope guard.
113+
g.dismiss();
114+
// This cannot throw:
115+
stdexec::start(op->op_state_);
116+
} else {
117+
// The caller did not provide an allocator, so we use the default allocator.
118+
auto* op = new _start_detached::operation<__id<Sender>, __id<__decay_t<Env>>>{
119+
static_cast<Sender&&>(sndr), static_cast<Env&&>(env)};
120+
start(op->op_state_);
121+
}
58122
}
59123
};
60124
} // namespace nvexec::_strm

include/nvexec/stream/submit.cuh

Lines changed: 0 additions & 84 deletions
This file was deleted.

include/nvexec/stream_context.cuh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "stream/let_xxx.cuh" // IWYU pragma: export
2828
#include "stream/schedule_from.cuh" // IWYU pragma: export
2929
#include "stream/start_detached.cuh" // IWYU pragma: export
30-
#include "stream/submit.cuh" // IWYU pragma: export
3130
#include "stream/split.cuh" // IWYU pragma: export
3231
#include "stream/then.cuh" // IWYU pragma: export
3332
#include "stream/continues_on.cuh" // IWYU pragma: export

0 commit comments

Comments
 (0)