Skip to content

Commit 1852b43

Browse files
committed
implement proposed resolution of P3718
1 parent 47185ef commit 1852b43

45 files changed

Lines changed: 664 additions & 531 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.clang-format

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ LambdaBodyIndentation: Signature
8484
LineEnding: LF
8585
Macros: [
8686
'STDEXEC_MEMFN_DECL(...)=__VA_ARGS__',
87-
'STDEXEC_ATTRIBUTE(X)=[[]]',
87+
'STDEXEC_ATTRIBUTE(...)=',
8888
'STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS=[[no_unique_address]]',
8989
'STDEXEC_MISSING_MEMBER(X,Y)=true',
9090
'STDEXEC_DEFINE_MEMBER(X)=void foo() {}',

include/exec/fork.hpp

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,23 @@ namespace exec {
3030
struct _dematerialize_fn {
3131
struct _impl_fn {
3232
template <class Rcvr, class Tag, class... Args>
33-
STDEXEC_ATTRIBUTE((always_inline, host, device)) void operator()(Rcvr& rcvr, Tag, const Args&... args) const noexcept {
33+
STDEXEC_ATTRIBUTE((always_inline, host, device))
34+
void operator()(Rcvr& rcvr, Tag, const Args&... args) const noexcept {
3435
Tag{}(static_cast<Rcvr&&>(rcvr), args...);
3536
}
3637
};
3738

3839
template <class Rcvr, class Tuple>
39-
STDEXEC_ATTRIBUTE((always_inline, host, device)) void operator()(Rcvr& rcvr, const Tuple& tupl) const noexcept {
40+
STDEXEC_ATTRIBUTE((always_inline, host, device))
41+
void operator()(Rcvr& rcvr, const Tuple& tupl) const noexcept {
4042
tupl.apply(_impl_fn{}, tupl, rcvr);
4143
}
4244
};
4345

4446
struct _mk_when_all_fn {
4547
template <class CacheSndr, class... Closures>
46-
STDEXEC_ATTRIBUTE((always_inline, host, device)) auto operator()(CacheSndr sndr, Closures&&... closures) const {
48+
STDEXEC_ATTRIBUTE((always_inline, host, device))
49+
auto operator()(CacheSndr sndr, Closures&&... closures) const {
4750
return stdexec::when_all(static_cast<Closures&&>(closures)(sndr)...);
4851
}
4952
};
@@ -61,7 +64,8 @@ namespace exec {
6164

6265
template <class Domain>
6366
struct _env_t {
64-
STDEXEC_ATTRIBUTE((always_inline, host, device)) static constexpr auto query(stdexec::get_domain_t) noexcept -> Domain {
67+
STDEXEC_ATTRIBUTE((always_inline, host, device))
68+
static constexpr auto query(stdexec::get_domain_t) noexcept -> Domain {
6569
return {};
6670
}
6771
};
@@ -93,12 +97,14 @@ namespace exec {
9397
};
9498

9599
template <class _Self, class... _Env>
96-
STDEXEC_ATTRIBUTE((host, device)) static auto get_completion_signatures(_Self&&, _Env&&...) noexcept {
100+
STDEXEC_ATTRIBUTE((host, device))
101+
static auto get_completion_signatures(_Self&&, _Env&&...) noexcept {
97102
return stdexec::__mapply<stdexec::__qq<_cache_sndr_completions_t>, Variant>{};
98103
}
99104

100105
template <class Rcvr>
101-
STDEXEC_ATTRIBUTE((host, device)) auto connect(Rcvr rcvr) const -> _opstate_t<Rcvr> {
106+
STDEXEC_ATTRIBUTE((host, device))
107+
auto connect(Rcvr rcvr) const -> _opstate_t<Rcvr> {
102108
return _opstate_t<Rcvr>{static_cast<Rcvr&&>(rcvr), _results_};
103109
}
104110

@@ -117,7 +123,7 @@ namespace exec {
117123
struct _opstate_t {
118124
using operation_state_concept = stdexec::operation_state_t;
119125
using _env_t = stdexec::__call_result_t<stdexec::__env::__fwd_fn, stdexec::env_of_t<Rcvr>>;
120-
using _child_completions_t = stdexec::completion_signatures_of_t<Sndr, _env_t>;
126+
using _child_completions_t = stdexec::__completion_signatures_of_t<Sndr, _env_t>;
121127
using _domain_t = stdexec::__early_domain_of_t<Sndr, stdexec::__none_such>;
122128
using _when_all_sndr_t = fork_t::_when_all_sndr_t<_child_completions_t, Closures, _domain_t>;
123129
using _child_opstate_t =
@@ -126,7 +132,8 @@ namespace exec {
126132
stdexec::connect_result_t<_when_all_sndr_t, stdexec::__rcvr_ref_t<Rcvr>>;
127133
using _cache_sndr_t = fork_t::_cache_sndr_t<_variant_t<_child_completions_t>, _domain_t>;
128134

129-
STDEXEC_ATTRIBUTE((host, device)) explicit _opstate_t(Sndr&& sndr, Closures&& closures, Rcvr rcvr) noexcept
135+
STDEXEC_ATTRIBUTE((host, device))
136+
explicit _opstate_t(Sndr&& sndr, Closures&& closures, Rcvr rcvr) noexcept
130137
: _rcvr_(static_cast<Rcvr&&>(rcvr))
131138
, _fork_opstate_(
132139
stdexec::connect(
@@ -153,7 +160,8 @@ namespace exec {
153160
}
154161

155162
template <class Tag, class... Args>
156-
STDEXEC_ATTRIBUTE((host, device)) void _complete(Tag, Args&&... args) noexcept {
163+
STDEXEC_ATTRIBUTE((host, device))
164+
void _complete(Tag, Args&&... args) noexcept {
157165
try {
158166
using _tuple_t = stdexec::__decayed_tuple<Tag, Args...>;
159167
_cache_.template emplace<_tuple_t>(Tag{}, static_cast<Args&&>(args)...);
@@ -169,20 +177,23 @@ namespace exec {
169177
}
170178

171179
template <class... Values>
172-
STDEXEC_ATTRIBUTE((always_inline, host, device)) void set_value(Values&&... values) noexcept {
180+
STDEXEC_ATTRIBUTE((always_inline, host, device))
181+
void set_value(Values&&... values) noexcept {
173182
this->_complete(stdexec::set_value, static_cast<Values&&>(values)...);
174183
}
175184

176185
template <class Error>
177-
STDEXEC_ATTRIBUTE((always_inline, host, device)) void set_error(Error&& err) noexcept {
186+
STDEXEC_ATTRIBUTE((always_inline, host, device))
187+
void set_error(Error&& err) noexcept {
178188
this->_complete(stdexec::set_error, static_cast<Error&&>(err));
179189
}
180190

181191
STDEXEC_ATTRIBUTE((always_inline, host, device)) void set_stopped() noexcept {
182192
this->_complete(stdexec::set_stopped);
183193
}
184194

185-
STDEXEC_ATTRIBUTE((nodiscard, host, device)) constexpr auto get_env() const noexcept //
195+
STDEXEC_ATTRIBUTE((nodiscard, host, device))
196+
constexpr auto get_env() const noexcept //
186197
-> stdexec::__fwd_env_t<stdexec::env_of_t<Rcvr>> {
187198
return stdexec::__env::__fwd_fn{}(stdexec::get_env(_rcvr_));
188199
}
@@ -198,7 +209,8 @@ namespace exec {
198209
using _closures_t = stdexec::__tuple_for<Closures...>;
199210

200211
template <class Sndr>
201-
STDEXEC_ATTRIBUTE((host, device)) friend constexpr auto operator|(Sndr sndr, _closure_t self) noexcept //
212+
STDEXEC_ATTRIBUTE((host, device))
213+
friend constexpr auto operator|(Sndr sndr, _closure_t self) noexcept //
202214
-> _sndr_t<Sndr, Closures...> {
203215
return _sndr_t<Sndr, Closures...>{
204216
{}, static_cast<_closures_t&&>(self._closures_), static_cast<Sndr&&>(sndr)};
@@ -209,13 +221,15 @@ namespace exec {
209221

210222
template <class Sndr, class... Closures>
211223
requires stdexec::sender<Sndr>
212-
STDEXEC_ATTRIBUTE((host, device)) auto operator()(Sndr sndr, Closures... closures) const -> _sndr_t<Sndr, Closures...> {
224+
STDEXEC_ATTRIBUTE((host, device))
225+
auto operator()(Sndr sndr, Closures... closures) const -> _sndr_t<Sndr, Closures...> {
213226
return {{}, {static_cast<Closures&&>(closures)...}, static_cast<Sndr&&>(sndr)};
214227
}
215228

216229
template <class... Closures>
217230
requires((!stdexec::sender<Closures>) && ...)
218-
STDEXEC_ATTRIBUTE((host, device)) auto operator()(Closures... closures) const -> _closure_t<Closures...> {
231+
STDEXEC_ATTRIBUTE((host, device))
232+
auto operator()(Closures... closures) const -> _closure_t<Closures...> {
219233
return {{static_cast<Closures&&>(closures)...}};
220234
}
221235
};
@@ -229,14 +243,17 @@ namespace exec {
229243
using _closures_t = stdexec::__tuple_for<Closures...>;
230244

231245
template <class Self, class... Env>
232-
STDEXEC_ATTRIBUTE((host, device)) static auto get_completion_signatures(Self&&, Env&&...) noexcept {
246+
STDEXEC_ATTRIBUTE((host, device))
247+
static auto get_completion_signatures(Self&&, Env&&...) noexcept {
233248
using namespace stdexec;
234249
using _domain_t = __early_domain_of_t<Sndr, __none_such>;
235250
using _child_t = __copy_cvref_t<Self, Sndr>;
236-
using _child_completions_t = completion_signatures_of_t<_child_t, __fwd_env_t<Env>...>;
251+
using _child_completions_t = __completion_signatures_of_t<_child_t, __fwd_env_t<Env>...>;
237252
using __decay_copyable_results_t = stdexec::__decay_copyable_results_t<_child_completions_t>;
238253

239-
if constexpr (!__decay_copyable_results_t::value) {
254+
if constexpr (!stdexec::__valid_completion_signatures<_child_completions_t>) {
255+
return _child_completions_t{};
256+
} else if constexpr (!__decay_copyable_results_t::value) {
240257
return _ERROR_<
241258
_WHAT_<>(PREDECESSOR_RESULTS_ARE_NOT_DECAY_COPYABLE),
242259
_IN_ALGORITHM_(exec::fork_t)>();
@@ -247,20 +264,23 @@ namespace exec {
247264
}
248265

249266
template <class Rcvr>
250-
STDEXEC_ATTRIBUTE((host, device)) auto connect(Rcvr rcvr) && -> _opstate_t<Sndr, _closures_t, Rcvr> {
267+
STDEXEC_ATTRIBUTE((host, device))
268+
auto connect(Rcvr rcvr) && -> _opstate_t<Sndr, _closures_t, Rcvr> {
251269
return _opstate_t<Sndr, _closures_t, Rcvr>{
252270
static_cast<Sndr&&>(sndr_),
253271
static_cast<_closures_t&&>(_closures_),
254272
static_cast<Rcvr&&>(rcvr)};
255273
}
256274

257275
template <class Rcvr>
258-
STDEXEC_ATTRIBUTE((host, device)) auto connect(Rcvr rcvr) const & -> _opstate_t<Sndr const &, _closures_t const &, Rcvr> {
276+
STDEXEC_ATTRIBUTE((host, device))
277+
auto connect(Rcvr rcvr) const & -> _opstate_t<Sndr const &, _closures_t const &, Rcvr> {
259278
return _opstate_t<Sndr const &, _closures_t const &, Rcvr>{
260279
sndr_, _closures_, static_cast<Rcvr&&>(rcvr)};
261280
}
262281

263-
STDEXEC_ATTRIBUTE((host, device)) constexpr auto get_env() const noexcept -> stdexec::__fwd_env_t<stdexec::env_of_t<Sndr>> {
282+
STDEXEC_ATTRIBUTE((host, device))
283+
constexpr auto get_env() const noexcept -> stdexec::__fwd_env_t<stdexec::env_of_t<Sndr>> {
264284
return stdexec::__env::__fwd_fn{}(stdexec::get_env(sndr_));
265285
}
266286

include/nvexec/nvtx.cuh

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ namespace nvexec {
121121
return {};
122122
}
123123

124-
auto get_env() const noexcept -> env_of_t<const Sender&> {
125-
return stdexec::get_env(sndr_);
124+
auto get_env() const noexcept -> stream_sender_attrs<Sender> {
125+
return {&sndr_};
126126
}
127127
};
128128
};
@@ -138,7 +138,8 @@ namespace nvexec {
138138
return nvtx_sender_th<kind::push, Sender>{{}, static_cast<Sender&&>(sndr), std::move(name)};
139139
}
140140

141-
STDEXEC_ATTRIBUTE((always_inline)) auto operator()(std::string name) const -> stdexec::__binder_back<push_t, std::string> {
141+
STDEXEC_ATTRIBUTE((always_inline))
142+
auto operator()(std::string name) const -> stdexec::__binder_back<push_t, std::string> {
142143
return {{std::move(name)}, {}, {}};
143144
}
144145
};
@@ -149,7 +150,8 @@ namespace nvexec {
149150
return nvtx_sender_th<kind::pop, Sender>{{}, static_cast<Sender&&>(sndr), {}};
150151
}
151152

152-
STDEXEC_ATTRIBUTE((always_inline)) auto operator()() const noexcept -> stdexec::__binder_back<pop_t> {
153+
STDEXEC_ATTRIBUTE((always_inline))
154+
auto operator()() const noexcept -> stdexec::__binder_back<pop_t> {
153155
return {{}, {}, {}};
154156
}
155157
};
@@ -164,7 +166,8 @@ namespace nvexec {
164166
}
165167

166168
template <stdexec::__sender_adaptor_closure Closure>
167-
STDEXEC_ATTRIBUTE((always_inline)) auto operator()(std::string name, Closure closure) const
169+
STDEXEC_ATTRIBUTE((always_inline))
170+
auto operator()(std::string name, Closure closure) const
168171
-> stdexec::__binder_back<scoped_t, std::string, Closure> {
169172
return {
170173
{std::move(name), static_cast<Closure&&>(closure)},

include/nvexec/stream/bulk.cuh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ namespace nvexec::_strm {
147147
return {};
148148
}
149149

150-
auto get_env() const noexcept -> env_of_t<const Sender&> {
151-
return stdexec::get_env(sndr_);
150+
auto get_env() const noexcept -> stream_sender_attrs<Sender> {
151+
return {&sndr_};
152152
}
153153
};
154154
};
@@ -383,8 +383,8 @@ namespace nvexec::_strm {
383383
return {};
384384
}
385385

386-
auto get_env() const noexcept -> env_of_t<const Sender&> {
387-
return stdexec::get_env(sndr_);
386+
auto get_env() const noexcept -> stream_sender_attrs<Sender> {
387+
return {&sndr_};
388388
}
389389
};
390390
};

include/nvexec/stream/common.cuh

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,39 @@ namespace nvexec {
6868
inline STDEXEC_ATTRIBUTE((host, device)) auto is_on_gpu() noexcept -> bool {
6969
return get_device_type() == device_type::device;
7070
}
71+
72+
namespace _strm {
73+
// Used by stream_domain to late-customize senders for execution
74+
// on the stream_scheduler.
75+
template <class Tag, class... Env>
76+
struct transform_sender_for;
77+
78+
template <class Tag>
79+
struct apply_sender_for;
80+
} // namespace _strm
7181
} // namespace nvexec
7282

7383
namespace nvexec {
7484
struct stream_context;
7585

76-
struct stream_domain;
86+
// The stream_domain is how the stream scheduler customizes the sender algorithms. All of the
87+
// algorithms use the current scheduler's domain to transform senders before starting them.
88+
struct stream_domain : stdexec::default_domain {
89+
template <stdexec::sender_expr Sender, class Tag = stdexec::tag_of_t<Sender>, class... Env>
90+
requires stdexec::
91+
__callable<stdexec::__sexpr_apply_t, Sender, _strm::transform_sender_for<Tag, Env...>>
92+
static auto transform_sender(Sender&& sndr, const Env&... env) {
93+
return stdexec::__sexpr_apply(
94+
static_cast<Sender&&>(sndr), _strm::transform_sender_for<Tag, Env...>{env...});
95+
}
96+
97+
template <class Tag, stdexec::sender Sender, class... Args>
98+
requires stdexec::__callable<_strm::apply_sender_for<Tag>, Sender, Args...>
99+
static auto apply_sender(Tag, Sender&& sndr, Args&&... args) {
100+
return _strm::apply_sender_for<Tag>{}(
101+
static_cast<Sender&&>(sndr), static_cast<Args&&>(args)...);
102+
}
103+
};
77104

78105
namespace _strm {
79106

@@ -86,14 +113,6 @@ namespace nvexec {
86113
((STDEXEC_IS_TRIVIALLY_COPYABLE(Ts) || std::is_reference_v<Ts>) && ...);
87114
#endif
88115

89-
// Used by stream_domain to late-customize senders for execution
90-
// on the stream_scheduler.
91-
template <class Tag, class... Env>
92-
struct transform_sender_for;
93-
94-
template <class Tag>
95-
struct apply_sender_for;
96-
97116
inline auto get_stream_priority(stream_priority priority) -> std::pair<int, cudaError_t> {
98117
int least{};
99118
int greatest{};
@@ -292,14 +311,16 @@ namespace nvexec {
292311
return tag_invoke(get_stream_provider_t{}, env);
293312
}
294313

295-
STDEXEC_ATTRIBUTE((host, device)) static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool {
314+
STDEXEC_ATTRIBUTE((host, device))
315+
static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool {
296316
return true;
297317
}
298318
};
299319

300320
struct set_noop {
301321
template <class... Ts>
302-
STDEXEC_ATTRIBUTE((host, device)) void operator()(Ts&&...) const noexcept {
322+
STDEXEC_ATTRIBUTE((host, device))
323+
void operator()(Ts&&...) const noexcept {
303324
// TODO TRAP
304325
std::printf("ERROR: use of empty variant.");
305326
}
@@ -331,11 +352,34 @@ namespace nvexec {
331352
return stdexec::read_env(*this);
332353
}
333354

334-
STDEXEC_ATTRIBUTE((host, device)) static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool {
355+
STDEXEC_ATTRIBUTE((host, device))
356+
static constexpr auto query(stdexec::forwarding_query_t) noexcept -> bool {
335357
return true;
336358
}
337359
};
338360

361+
template <class Sender>
362+
struct stream_sender_attrs {
363+
using __t = stream_sender_attrs;
364+
using __id = stream_sender_attrs;
365+
366+
STDEXEC_ATTRIBUTE((nodiscard))
367+
constexpr auto query(get_domain_late_t) const noexcept -> stream_domain {
368+
return {};
369+
}
370+
371+
template <__forwarding_query Query>
372+
requires __env::__queryable<env_of_t<Sender>, Query>
373+
STDEXEC_ATTRIBUTE((nodiscard))
374+
constexpr auto query(Query) const //
375+
noexcept(__env::__nothrow_queryable<env_of_t<Sender>, Query>)
376+
-> __env::__query_result_t<env_of_t<Sender>, Query> {
377+
return stdexec::get_env(*child_).query(Query{});
378+
}
379+
380+
const Sender* child_{};
381+
};
382+
339383
template <class BaseEnv>
340384
auto make_stream_env(BaseEnv&& base_env, stream_provider_t* stream_provider) noexcept {
341385
return __env::__join(
@@ -398,7 +442,8 @@ namespace nvexec {
398442
using __id = stream_enqueue_receiver;
399443

400444
template <class... As>
401-
STDEXEC_ATTRIBUTE((host, device)) void set_value(As&&... as) noexcept {
445+
STDEXEC_ATTRIBUTE((host, device))
446+
void set_value(As&&... as) noexcept {
402447
variant_->template emplace<decayed_tuple<set_value_t, As...>>(
403448
set_value_t(), static_cast<As&&>(as)...);
404449
producer_(task_);
@@ -410,7 +455,8 @@ namespace nvexec {
410455
}
411456

412457
template <class Error>
413-
STDEXEC_ATTRIBUTE((host, device)) void set_error(Error&& err) noexcept {
458+
STDEXEC_ATTRIBUTE((host, device))
459+
void set_error(Error&& err) noexcept {
414460
if constexpr (__decays_to<Error, std::exception_ptr>) {
415461
// What is `exception_ptr` but death pending
416462
variant_->template emplace<decayed_tuple<set_error_t, cudaError_t>>(

0 commit comments

Comments
 (0)