Skip to content

Commit 954159a

Browse files
authored
add a customizable algorithm submit that combines connect and start (#1519)
1 parent c08b654 commit 954159a

13 files changed

Lines changed: 499 additions & 98 deletions

include/exec/async_scope.hpp

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -590,15 +590,27 @@ namespace exec {
590590
struct __future_state : __future_state_base<__future_completions_t<_Sender, _Env>, _Env> {
591591
using _Completions = __future_completions_t<_Sender, _Env>;
592592

593-
__future_state(_Sender __sndr, _Env __env, const __impl* __scope)
593+
__future_state(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope)
594594
: __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), __scope)
595-
, __op_(
596-
stdexec::connect(
597-
static_cast<_Sender&&>(__sndr),
598-
__future_receiver_t<_Sender, _Env>{this, __scope})) {
595+
, __op_(static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope}) {
599596
}
600597

601-
connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
598+
__future_state(_Sender __sndr, _Env __env, const __impl* __scope)
599+
: __future_state(
600+
connect,
601+
static_cast<_Sender&&>(__sndr),
602+
static_cast<_Env&&>(__env),
603+
__scope) {
604+
// If the operation completes synchronously, then the following line will cause
605+
// the destruction of *this, which is not a problem because we used a delegating
606+
// constructor, so *this is considered fully constructed.
607+
__op_.submit(
608+
static_cast<_Sender&&>(__sndr), __future_receiver_t<_Sender, _Env>{this, __scope});
609+
}
610+
611+
STDEXEC_ATTRIBUTE((no_unique_address)) [[]] //
612+
submit_result<_Sender, __future_receiver_t<_Sender, _Env>>
613+
__op_{};
602614
};
603615

604616
template <class _SenderId, class _EnvId>
@@ -729,21 +741,26 @@ namespace exec {
729741
using _Sender = stdexec::__t<_SenderId>;
730742

731743
struct __t : __spawn_op_base<_EnvId> {
732-
template <__decays_to<_Sender> _Sndr>
733-
__t(_Sndr&& __sndr, _Env __env, const __impl* __scope)
744+
__t(connect_t, _Sender&& __sndr, _Env __env, const __impl* __scope)
734745
: __spawn_op_base<_EnvId>{__env::__join(static_cast<_Env&&>(__env),
735746
__spawn_env_{__scope->__stop_source_.get_token()}),
736747
[](__spawn_op_base<_EnvId>* __op) {
737748
delete static_cast<__t*>(__op);
738749
}}
739-
, __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr), __spawn_receiver_t<_Env>{this})) {
750+
, __data_(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this}) {
740751
}
741752

742-
void start() & noexcept {
743-
stdexec::start(__op_);
753+
__t(_Sender __sndr, _Env __env, const __impl* __scope)
754+
: __t(connect, static_cast<_Sender&&>(__sndr), static_cast<_Env&&>(__env), __scope) {
755+
// If the operation completes synchronously, then the following line will cause
756+
// the destruction of *this, which is not a problem because we used a delegating
757+
// constructor, so *this is considered fully constructed.
758+
__data_.submit(static_cast<_Sender&&>(__sndr), __spawn_receiver_t<_Env>{this});
744759
}
745760

746-
connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
761+
STDEXEC_ATTRIBUTE((no_unique_address)) [[]] //
762+
submit_result<_Sender, __spawn_receiver_t<_Env>>
763+
__data_;
747764
};
748765
};
749766

@@ -779,19 +796,18 @@ namespace exec {
779796
requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
780797
void spawn(_Sender&& __sndr, _Env __env = {}) {
781798
using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
782-
// start is noexcept so we can assume that the operation will complete
783-
// after this, which means we can rely on its self-ownership to ensure
784-
// that it is eventually deleted
785-
stdexec::start(*(
786-
new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_}));
799+
// this will connect and start the operation, after which the operation state is
800+
// responsible for deleting itself after it completes.
801+
[[maybe_unused]]
802+
auto* __op =
803+
new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_};
787804
}
788805

789806
template <__movable_value _Env = env<>, sender_in<__env_t<_Env>> _Sender>
790807
auto spawn_future(_Sender&& __sndr, _Env __env = {}) -> __future_t<_Sender, _Env> {
791808
using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
792809
auto __state = std::make_unique<__state_t>(
793810
nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_);
794-
stdexec::start(__state->__op_);
795811
return __future_t<_Sender, _Env>{std::move(__state)};
796812
}
797813

include/exec/just_from.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,18 @@ namespace exec {
134134
noexcept(stdexec::__nothrow_decay_copyable<Rcvr, Fn const &>) -> _opstate<Rcvr, Fn> {
135135
return _opstate<Rcvr, Fn>{static_cast<Rcvr&&>(rcvr), _fn};
136136
}
137+
138+
template <class Rcvr>
139+
STDEXEC_ATTRIBUTE((host, device)) auto submit(Rcvr rcvr) && noexcept -> void {
140+
auto op = static_cast<_sndr_base&&>(*this).connect(static_cast<Rcvr&&>(rcvr));
141+
stdexec::start(op);
142+
}
143+
144+
template <class Rcvr>
145+
STDEXEC_ATTRIBUTE((host, device)) auto submit(Rcvr rcvr) const & noexcept -> void {
146+
auto op = this->connect(static_cast<Rcvr&&>(rcvr));
147+
stdexec::start(op);
148+
}
137149
};
138150

139151
template <class Fn, class Tag = JustTag>

include/nvexec/stream/start_detached.cuh

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,48 @@
2828

2929
namespace nvexec::_strm {
3030
namespace _start_detached {
31+
struct submit_receiver {
32+
using receiver_concept = receiver_t;
33+
using __t = submit_receiver;
34+
using __id = submit_receiver;
35+
36+
template <class... _As>
37+
void set_value(_As&&...) noexcept {
38+
}
39+
40+
template <class _Error>
41+
[[noreturn]]
42+
void set_error(_Error&&) noexcept {
43+
// A detached operation failed. There is noplace for the error to go.
44+
// This is unrecoverable, so we terminate.
45+
std::terminate();
46+
}
47+
48+
void set_stopped() noexcept {
49+
}
50+
51+
[[nodiscard]]
52+
auto get_env() const noexcept -> __root_env {
53+
return {};
54+
}
55+
};
56+
3157
template <class SenderId, class EnvId>
3258
struct operation {
3359
using Sender = __cvref_t<SenderId>;
3460
using Env = __t<EnvId>;
3561

36-
explicit operation(Sender&& sndr, Env env)
62+
explicit operation(connect_t, Sender&& sndr, Env env)
3763
: env_(static_cast<Env&&>(env))
38-
, op_state_(connect(static_cast<Sender&&>(sndr), receiver{{}, this})) {
64+
, op_data_(static_cast<Sender&&>(sndr), receiver{{}, this}) {
65+
}
66+
67+
explicit operation(Sender&& sndr, Env env)
68+
: operation(connect, static_cast<Sender&&>(sndr), static_cast<Env&&>(env)) {
69+
// If the operation completes synchronously, then the following line will cause
70+
// the destruction of *this, which is not a problem because we used a delegating
71+
// constructor, so *this is considered fully constructed.
72+
op_data_.submit(static_cast<Sender&&>(sndr), receiver{{}, this});
3973
}
4074

4175
// If the operation state was allocated with a user-provided allocator, then we must
@@ -85,17 +119,33 @@ namespace nvexec::_strm {
85119
};
86120

87121
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
88-
connect_result_t<Sender, receiver> op_state_;
122+
STDEXEC_ATTRIBUTE((no_unique_address)) submit_result<Sender, receiver> op_data_;
89123
};
124+
125+
template <class Sender, class Env>
126+
concept _use_submit = __submittable<Sender, submit_receiver> && __same_as<Env, __root_env>
127+
&& __same_as<void, __submit_result_t<Sender, submit_receiver>>;
90128
} // namespace _start_detached
91129

92130
template <>
93131
struct apply_sender_for<start_detached_t> {
94132
template <class Sender, class Env = __root_env>
95133
void operator()(Sender&& sndr, Env&& env = {}) const noexcept(false) {
96134
using Op = _start_detached::operation<__cvref_id<Sender>, __id<__decay_t<Env>>>;
97-
// Use the provided allocator, if any, to allocate the operation state.
98-
if constexpr (__callable<get_allocator_t, Env>) {
135+
136+
#if !STDEXEC_APPLE_CLANG() // There seems to be a codegen bug in apple clang that causes
137+
// `start_detached` to segfault when the code path below is
138+
// taken.
139+
// BUGBUG NOT TO SPEC: the use of the non-standard `submit` algorithm here is a
140+
// conforming extension.
141+
if constexpr (_start_detached::_use_submit<Sender, Env>) {
142+
// If submit(sndr, rcvr) returns void, then no state needs to be kept alive
143+
// for the operation. We can just call submit and return.
144+
stdexec::__submit::__submit(static_cast<Sender&&>(sndr), _start_detached::submit_receiver{});
145+
} else
146+
#endif
147+
if constexpr (__callable<get_allocator_t, Env>) {
148+
// Use the provided allocator to allocate the operation state.
99149
auto alloc = get_allocator(env);
100150
using Alloc = decltype(alloc);
101151
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<Op>;
@@ -111,12 +161,12 @@ namespace nvexec::_strm {
111161
op_alloc, op, static_cast<Sender&&>(sndr), static_cast<Env&&>(env));
112162
// The operation state is now constructed, dismiss the scope guard.
113163
g.dismiss();
114-
// This cannot throw:
115-
stdexec::start(op->op_state_);
116164
} else {
117165
// The caller did not provide an allocator, so we use the default allocator.
166+
[[maybe_unused]]
118167
Op* op = new Op{static_cast<Sender&&>(sndr), static_cast<Env&&>(env)};
119-
start(op->op_state_);
168+
// The operation has now started and is responsible for deleting itself when it
169+
// completes.
120170
}
121171
}
122172
};

include/stdexec/__detail/__basic_sender.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ namespace stdexec {
168168
};
169169

170170
template <class _Sexpr, class _Receiver>
171-
struct __state_box {
171+
struct __state_box : __immovable {
172172
using __tag_t = typename __decay_t<_Sexpr>::__tag_t;
173173
using __state_t = __state_type_t<__tag_t, _Sexpr, _Receiver>;
174174

@@ -509,6 +509,15 @@ namespace stdexec {
509509
static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr));
510510
}
511511

512+
template <__decays_to<__sexpr> _Self, /*receiver*/ class _Receiver>
513+
STDEXEC_ATTRIBUTE((always_inline)) static auto submit(_Self&& __self, _Receiver&& __rcvr) //
514+
noexcept(__noexcept_of<__impl<_Self>::submit, _Self, _Receiver>) //
515+
-> __msecond<
516+
__if_c<__decays_to<_Self, __sexpr>>,
517+
__result_of<__impl<_Self>::submit, _Self, _Receiver>> {
518+
return __impl<_Self>::submit(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr));
519+
}
520+
512521
template <class _Sender, class _ApplyFn>
513522
STDEXEC_ATTRIBUTE((always_inline)) static auto apply(_Sender&& __sndr, _ApplyFn&& __fun) //
514523
noexcept(

include/stdexec/__detail/__just.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ namespace stdexec {
5252
},
5353
__state);
5454
};
55+
56+
static constexpr auto submit =
57+
[]<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver __rcvr) noexcept -> void {
58+
static_assert(sender_expr_for<_Sender, _JustTag>);
59+
auto&& __state = get_state(static_cast<_Sender&&>(__sndr), __rcvr);
60+
__state.apply(
61+
[&]<class... _Ts>(_Ts&&... __ts) noexcept {
62+
__tag_t()(static_cast<_Receiver&&>(__rcvr), static_cast<_Ts&&>(__ts)...);
63+
},
64+
static_cast<decltype(__state)>(__state));
65+
};
5566
};
5667

5768
struct just_t {

include/stdexec/__detail/__let.hpp

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "__schedulers.hpp"
2828
#include "__sender_adaptor_closure.hpp"
2929
#include "__senders.hpp"
30+
#include "__submit.hpp"
3031
#include "__transform_sender.hpp"
3132
#include "__transform_completion_signatures.hpp"
3233
#include "__variant.hpp"
@@ -224,9 +225,8 @@ namespace stdexec {
224225
__result_receiver_t<_Receiver, _Scheduler>>;
225226

226227
template <class _ResultSender, class _Scheduler, class _Receiver>
227-
using __op_state_t = connect_result_t<
228-
_ResultSender,
229-
__checked_result_receiver_t<_ResultSender, _Scheduler, _Receiver>>;
228+
using __submit_result =
229+
submit_result<_ResultSender, __checked_result_receiver_t<_ResultSender, _Scheduler, _Receiver>>;
230230

231231
template <class _SetTag, class _Fun, class _Sched, class... _Env>
232232
struct __transform_signal_fn {
@@ -353,10 +353,11 @@ namespace stdexec {
353353
//! Metafunction creating the operation state needed to connect the result of calling
354354
//! the sender factory function, `_Fun`, and passing its result to a receiver.
355355
template <class _Receiver, class _Fun, class _Set, class _Sched>
356-
struct __op_state_for {
357-
static_assert(!std::is_reference_v<_Sched>);
356+
struct __submit_datum_for {
357+
// compute the result of calling submit with the result of executing _Fun
358+
// with _Args. if the result is void, substitute with __ignore.
358359
template <class... _Args>
359-
using __f = __op_state_t<
360+
using __f = __submit_result<
360361
__mcall<__result_sender_fn<_Set, _Fun, _Sched, env_of_t<_Receiver>>, _Args...>,
361362
_Sched,
362363
_Receiver>;
@@ -371,10 +372,10 @@ namespace stdexec {
371372
using __env_t = __result_env_t<_Sched, env_of_t<_Receiver>>;
372373
using __rcvr_t = __receiver_with_sched_t<_Receiver, _Sched>;
373374
using __result_variant = __variant_for<__monostate, _Tuples...>;
374-
using __op_state_variant = //
375+
using __submit_variant = //
375376
__variant_for<
376377
__monostate,
377-
__mapply<__op_state_for<_Receiver, _Fun, _Set, _Sched>, _Tuples>...>;
378+
__mapply<__submit_datum_for<_Receiver, _Fun, _Set, _Sched>, _Tuples>...>;
378379

379380
template <class _ResultSender, class _OpState>
380381
auto __get_result_receiver(const _ResultSender&, _OpState& __op_state) -> decltype(auto) {
@@ -407,7 +408,7 @@ namespace stdexec {
407408
__result_variant __args_{};
408409
//! Variant type for holding the operation state from connecting
409410
//! the function result to the downstream receiver:
410-
__op_state_variant __op_state3_{};
411+
__submit_variant __storage_{};
411412
};
412413

413414
//! Implementation of the `let_*_t` types, where `_Set` is, e.g., `set_value_t` for `let_value`.
@@ -493,8 +494,8 @@ namespace stdexec {
493494
};
494495

495496
//! Helper function to actually invoke the function to produce `let_*`'s sender,
496-
//! connect it to the downstream receiver, and start it.
497-
//! This is the heart of `let_*`.
497+
//! connect it to the downstream receiver, and start it. This is the heart of
498+
//! `let_*`.
498499
template <class _State, class _OpState, class... _As>
499500
static void __bind_(_State& __state, _OpState& __op_state, _As&&... __as) {
500501
// Store the passed-in (received) args:
@@ -504,9 +505,10 @@ namespace stdexec {
504505
// Create a receiver based on the state, the computed sender, and the operation state:
505506
auto __rcvr2 = __state.__get_result_receiver(__sndr2, __op_state);
506507
// Connect the sender to the receiver and start it:
507-
auto& __op2 = __state.__op_state3_.emplace_from(
508-
stdexec::connect, std::move(__sndr2), std::move(__rcvr2));
509-
stdexec::start(__op2);
508+
using __result_t = decltype(submit_result{std::move(__sndr2), std::move(__rcvr2)});
509+
auto& __op =
510+
__state.__storage_.template emplace<__result_t>(std::move(__sndr2), std::move(__rcvr2));
511+
__op.submit(std::move(__sndr2), std::move(__rcvr2));
510512
}
511513

512514
template <class _OpState, class... _As>

0 commit comments

Comments
 (0)