Skip to content

Commit 9fba24b

Browse files
committed
add a customizable algorithm submit that combines connect and start
1 parent c08b654 commit 9fba24b

10 files changed

Lines changed: 262 additions & 47 deletions

File tree

include/exec/async_scope.hpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -593,12 +593,13 @@ namespace exec {
593593
__future_state(_Sender __sndr, _Env __env, const __impl* __scope)
594594
: __future_state_base<_Completions, _Env>(static_cast<_Env&&>(__env), __scope)
595595
, __op_(
596-
stdexec::connect(
596+
stdexec::submit(
597597
static_cast<_Sender&&>(__sndr),
598-
__future_receiver_t<_Sender, _Env>{this, __scope})) {
598+
__future_receiver_t<_Sender, _Env>{this, __scope},
599+
__ignore{})) {
599600
}
600601

601-
connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
602+
STDEXEC_ATTRIBUTE((no_unique_address)) submit_result_t<_Sender, __future_receiver_t<_Sender, _Env>, __ignore> __op_;
602603
};
603604

604605
template <class _SenderId, class _EnvId>
@@ -736,14 +737,10 @@ namespace exec {
736737
[](__spawn_op_base<_EnvId>* __op) {
737738
delete static_cast<__t*>(__op);
738739
}}
739-
, __op_(stdexec::connect(static_cast<_Sndr&&>(__sndr), __spawn_receiver_t<_Env>{this})) {
740+
, __data_(stdexec::submit(static_cast<_Sndr&&>(__sndr), __spawn_receiver_t<_Env>{this}, __ignore{})) {
740741
}
741742

742-
void start() & noexcept {
743-
stdexec::start(__op_);
744-
}
745-
746-
connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
743+
STDEXEC_ATTRIBUTE((no_unique_address)) submit_result_t<_Sender, __spawn_receiver_t<_Env>, __ignore> __data_;
747744
};
748745
};
749746

@@ -779,19 +776,18 @@ namespace exec {
779776
requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
780777
void spawn(_Sender&& __sndr, _Env __env = {}) {
781778
using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
782-
// start is noexcept so we can assume that the operation will complete
783-
// after this, which means we can rely on its self-ownership to ensure
784-
// that it is eventually deleted
785-
stdexec::start(*(
786-
new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_}));
779+
// this will connect and start the operation, after which the operation state is
780+
// responsible for deleting itself after it completes.
781+
[[maybe_unused]]
782+
auto* __op =
783+
new __op_t{nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_};
787784
}
788785

789786
template <__movable_value _Env = env<>, sender_in<__env_t<_Env>> _Sender>
790787
auto spawn_future(_Sender&& __sndr, _Env __env = {}) -> __future_t<_Sender, _Env> {
791788
using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
792789
auto __state = std::make_unique<__state_t>(
793790
nest(static_cast<_Sender&&>(__sndr)), static_cast<_Env&&>(__env), &__impl_);
794-
stdexec::start(__state->__op_);
795791
return __future_t<_Sender, _Env>{std::move(__state)};
796792
}
797793

include/nvexec/stream/start_detached.cuh

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,40 @@
2828

2929
namespace nvexec::_strm {
3030
namespace _start_detached {
31+
struct submit_receiver {
32+
using receiver_concept = receiver_t;
33+
using __t = submit_receiver;
34+
using __id = submit_receiver;
35+
36+
template <class... _As>
37+
void set_value(_As&&...) noexcept {
38+
}
39+
40+
template <class _Error>
41+
[[noreturn]]
42+
void set_error(_Error&&) noexcept {
43+
// A detached operation failed. There is noplace for the error to go.
44+
// This is unrecoverable, so we terminate.
45+
std::terminate();
46+
}
47+
48+
void set_stopped() noexcept {
49+
}
50+
51+
[[nodiscard]]
52+
auto get_env() const noexcept -> __root_env {
53+
return {};
54+
}
55+
};
56+
3157
template <class SenderId, class EnvId>
3258
struct operation {
3359
using Sender = __cvref_t<SenderId>;
3460
using Env = __t<EnvId>;
3561

3662
explicit operation(Sender&& sndr, Env env)
3763
: env_(static_cast<Env&&>(env))
38-
, op_state_(connect(static_cast<Sender&&>(sndr), receiver{{}, this})) {
64+
, op_data_(submit(static_cast<Sender&&>(sndr), receiver{{}, this}, __ignore{})) {
3965
}
4066

4167
// If the operation state was allocated with a user-provided allocator, then we must
@@ -85,7 +111,7 @@ namespace nvexec::_strm {
85111
};
86112

87113
STDEXEC_ATTRIBUTE((no_unique_address)) Env env_;
88-
connect_result_t<Sender, receiver> op_state_;
114+
STDEXEC_ATTRIBUTE((no_unique_address)) submit_result_t<Sender, receiver> op_data_;
89115
};
90116
} // namespace _start_detached
91117

@@ -94,8 +120,16 @@ namespace nvexec::_strm {
94120
template <class Sender, class Env = __root_env>
95121
void operator()(Sender&& sndr, Env&& env = {}) const noexcept(false) {
96122
using Op = _start_detached::operation<__cvref_id<Sender>, __id<__decay_t<Env>>>;
97-
// Use the provided allocator, if any, to allocate the operation state.
98-
if constexpr (__callable<get_allocator_t, Env>) {
123+
124+
// NON-STANDARD
125+
if constexpr (
126+
__same_as<Env, __root_env>
127+
&& __same_as<void, submit_result_t<Sender, _start_detached::submit_receiver>>) {
128+
// If submit(sndr, rcvr) returns void, then no state needs to be kept alive
129+
// for the operation. We can just call submit and return.
130+
stdexec::submit(static_cast<Sender&&>(sndr), _start_detached::submit_receiver{});
131+
} else if constexpr (__callable<get_allocator_t, Env>) {
132+
// Use the provided allocator to allocate the operation state.
99133
auto alloc = get_allocator(env);
100134
using Alloc = decltype(alloc);
101135
using OpAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<Op>;
@@ -111,12 +145,12 @@ namespace nvexec::_strm {
111145
op_alloc, op, static_cast<Sender&&>(sndr), static_cast<Env&&>(env));
112146
// The operation state is now constructed, dismiss the scope guard.
113147
g.dismiss();
114-
// This cannot throw:
115-
stdexec::start(op->op_state_);
116148
} else {
117149
// The caller did not provide an allocator, so we use the default allocator.
150+
[[maybe_unused]]
118151
Op* op = new Op{static_cast<Sender&&>(sndr), static_cast<Env&&>(env)};
119-
start(op->op_state_);
152+
// The operation has now started and is responsible for deleting itself when it
153+
// completes.
120154
}
121155
}
122156
};

include/stdexec/__detail/__basic_sender.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,15 @@ namespace stdexec {
509509
static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr));
510510
}
511511

512+
template <__decays_to<__sexpr> _Self, /*receiver*/ class _Receiver>
513+
STDEXEC_ATTRIBUTE((always_inline)) static auto submit(_Self&& __self, _Receiver&& __rcvr) //
514+
noexcept(__noexcept_of<__impl<_Self>::submit, _Self, _Receiver>) //
515+
-> __msecond<
516+
__if_c<__decays_to<_Self, __sexpr>>,
517+
__result_of<__impl<_Self>::submit, _Self, _Receiver>> {
518+
return __impl<_Self>::submit(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr));
519+
}
520+
512521
template <class _Sender, class _ApplyFn>
513522
STDEXEC_ATTRIBUTE((always_inline)) static auto apply(_Sender&& __sndr, _ApplyFn&& __fun) //
514523
noexcept(

include/stdexec/__detail/__just.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ namespace stdexec {
5252
},
5353
__state);
5454
};
55+
56+
static constexpr auto submit =
57+
[]<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver __rcvr) noexcept -> void {
58+
static_assert(sender_expr_for<_Sender, _JustTag>);
59+
auto&& __state = get_state(static_cast<_Sender&&>(__sndr), __rcvr);
60+
__state.apply(
61+
[&]<class... _Ts>(_Ts&&... __ts) noexcept {
62+
__tag_t()(static_cast<_Receiver&&>(__rcvr), static_cast<_Ts&&>(__ts)...);
63+
},
64+
static_cast<decltype(__state)>(__state));
65+
};
5566
};
5667

5768
struct just_t {

include/stdexec/__detail/__let.hpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "__schedulers.hpp"
2828
#include "__sender_adaptor_closure.hpp"
2929
#include "__senders.hpp"
30+
#include "__submit.hpp"
3031
#include "__transform_sender.hpp"
3132
#include "__transform_completion_signatures.hpp"
3233
#include "__variant.hpp"
@@ -224,9 +225,10 @@ namespace stdexec {
224225
__result_receiver_t<_Receiver, _Scheduler>>;
225226

226227
template <class _ResultSender, class _Scheduler, class _Receiver>
227-
using __op_state_t = connect_result_t<
228+
using __submit_result = submit_result_t<
228229
_ResultSender,
229-
__checked_result_receiver_t<_ResultSender, _Scheduler, _Receiver>>;
230+
__checked_result_receiver_t<_ResultSender, _Scheduler, _Receiver>,
231+
__ignore>;
230232

231233
template <class _SetTag, class _Fun, class _Sched, class... _Env>
232234
struct __transform_signal_fn {
@@ -353,10 +355,11 @@ namespace stdexec {
353355
//! Metafunction creating the operation state needed to connect the result of calling
354356
//! the sender factory function, `_Fun`, and passing its result to a receiver.
355357
template <class _Receiver, class _Fun, class _Set, class _Sched>
356-
struct __op_state_for {
357-
static_assert(!std::is_reference_v<_Sched>);
358+
struct __submit_datum_for {
359+
// compute the result of calling submit with the result of executing _Fun
360+
// with _Args. if the result is void, substitute with __ignore.
358361
template <class... _Args>
359-
using __f = __op_state_t<
362+
using __f = __submit_result<
360363
__mcall<__result_sender_fn<_Set, _Fun, _Sched, env_of_t<_Receiver>>, _Args...>,
361364
_Sched,
362365
_Receiver>;
@@ -371,10 +374,10 @@ namespace stdexec {
371374
using __env_t = __result_env_t<_Sched, env_of_t<_Receiver>>;
372375
using __rcvr_t = __receiver_with_sched_t<_Receiver, _Sched>;
373376
using __result_variant = __variant_for<__monostate, _Tuples...>;
374-
using __op_state_variant = //
377+
using __submit_variant = //
375378
__variant_for<
376379
__monostate,
377-
__mapply<__op_state_for<_Receiver, _Fun, _Set, _Sched>, _Tuples>...>;
380+
__mapply<__submit_datum_for<_Receiver, _Fun, _Set, _Sched>, _Tuples>...>;
378381

379382
template <class _ResultSender, class _OpState>
380383
auto __get_result_receiver(const _ResultSender&, _OpState& __op_state) -> decltype(auto) {
@@ -407,7 +410,7 @@ namespace stdexec {
407410
__result_variant __args_{};
408411
//! Variant type for holding the operation state from connecting
409412
//! the function result to the downstream receiver:
410-
__op_state_variant __op_state3_{};
413+
__submit_variant __storage_{};
411414
};
412415

413416
//! Implementation of the `let_*_t` types, where `_Set` is, e.g., `set_value_t` for `let_value`.
@@ -493,8 +496,8 @@ namespace stdexec {
493496
};
494497

495498
//! Helper function to actually invoke the function to produce `let_*`'s sender,
496-
//! connect it to the downstream receiver, and start it.
497-
//! This is the heart of `let_*`.
499+
//! connect it to the downstream receiver, and start it. This is the heart of
500+
//! `let_*`.
498501
template <class _State, class _OpState, class... _As>
499502
static void __bind_(_State& __state, _OpState& __op_state, _As&&... __as) {
500503
// Store the passed-in (received) args:
@@ -504,9 +507,8 @@ namespace stdexec {
504507
// Create a receiver based on the state, the computed sender, and the operation state:
505508
auto __rcvr2 = __state.__get_result_receiver(__sndr2, __op_state);
506509
// Connect the sender to the receiver and start it:
507-
auto& __op2 = __state.__op_state3_.emplace_from(
508-
stdexec::connect, std::move(__sndr2), std::move(__rcvr2));
509-
stdexec::start(__op2);
510+
__state.__storage_.emplace_from(
511+
stdexec::submit, std::move(__sndr2), std::move(__rcvr2), __ignore{});
510512
}
511513

512514
template <class _OpState, class... _As>

include/stdexec/__detail/__read_env.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "__optional.hpp"
2727
#include "__meta.hpp"
2828
#include "__receivers.hpp"
29+
#include "__submit.hpp"
2930

3031
#include <exception>
3132

@@ -119,6 +120,24 @@ namespace stdexec {
119120
stdexec::__set_value_invoke(static_cast<_Receiver&&>(__rcvr), __query_fn);
120121
}
121122
};
123+
124+
static constexpr auto submit = //
125+
[]<class _Sender, class _Receiver>(const _Sender& __sndr, _Receiver __rcvr) noexcept {
126+
static_assert(sender_expr_for<_Sender, __read_env_t>);
127+
using __query = __data_of<_Sender>;
128+
using __result = __call_result_t<__query, env_of_t<_Receiver>>;
129+
// When the query completes with a reference type, we can complete the receiver
130+
// immediately; there is no need to return an operation state.
131+
if constexpr (__same_as<__result, __result&&>) {
132+
stdexec::__set_value_invoke(
133+
static_cast<_Receiver&&>(__rcvr), __query(), stdexec::get_env(__rcvr));
134+
} else {
135+
// This will create a __state object to cache the query result and start the
136+
// operation.
137+
return __submit::__op_data<const _Sender&, _Receiver>(
138+
__sndr, static_cast<_Receiver&&>(__rcvr));
139+
}
140+
};
122141
};
123142
} // namespace __read
124143

include/stdexec/__detail/__start_detached.hpp

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,47 @@
2424
#include "__receivers.hpp"
2525
#include "__env.hpp"
2626
#include "__scope.hpp"
27+
#include "__submit.hpp"
2728
#include "__transform_sender.hpp"
2829

2930
namespace stdexec {
3031
/////////////////////////////////////////////////////////////////////////////
3132
// [execution.senders.consumer.start_detached]
3233
namespace __start_detached {
34+
struct __submit_receiver {
35+
using receiver_concept = receiver_t;
36+
using __t = __submit_receiver;
37+
using __id = __submit_receiver;
38+
39+
template <class... _As>
40+
void set_value(_As&&...) noexcept {
41+
}
42+
43+
template <class _Error>
44+
[[noreturn]]
45+
void set_error(_Error&&) noexcept {
46+
// A detached operation failed. There is noplace for the error to go.
47+
// This is unrecoverable, so we terminate.
48+
std::terminate();
49+
}
50+
51+
void set_stopped() noexcept {
52+
}
53+
54+
[[nodiscard]]
55+
auto get_env() const noexcept -> __root_env {
56+
return {};
57+
}
58+
};
59+
3360
template <class _SenderId, class _EnvId>
3461
struct __operation {
3562
using _Sender = __cvref_t<_SenderId>;
3663
using _Env = __t<_EnvId>;
3764

3865
explicit __operation(_Sender&& __sndr, _Env __env)
3966
: __env_(static_cast<_Env&&>(__env))
40-
, __op_state_(connect(static_cast<_Sender&&>(__sndr), __receiver{this})) {
67+
, __op_data_(submit(static_cast<_Sender&&>(__sndr), __receiver{this}, __ignore{})) {
4168
}
4269

4370
static void __destroy_delete(__operation* __self) noexcept {
@@ -85,7 +112,7 @@ namespace stdexec {
85112
};
86113

87114
STDEXEC_ATTRIBUTE((no_unique_address)) _Env __env_;
88-
connect_result_t<_Sender, __receiver> __op_state_;
115+
STDEXEC_ATTRIBUTE((no_unique_address)) submit_result_t<_Sender, __receiver, __ignore> __op_data_;
89116
};
90117

91118
struct start_detached_t {
@@ -117,13 +144,20 @@ namespace stdexec {
117144
requires sender_in<_Sender, __as_root_env_t<_Env>>
118145
void apply_sender(_Sender&& __sndr, _Env&& __env = {}) const noexcept(false) {
119146
using _Op = __operation<__cvref_id<_Sender>, __id<__decay_t<_Env>>>;
120-
// Use the provided allocator, if any, to allocate the operation state.
121-
if constexpr (__callable<get_allocator_t, _Env>) {
147+
148+
// NON-STANDARD
149+
if constexpr (
150+
__same_as<_Env, __root_env>
151+
&& __same_as<void, submit_result_t<_Sender, __submit_receiver>>) {
152+
// If submit(sndr, rcvr) returns void, then no state needs to be kept alive
153+
// for the operation. We can just call submit and return.
154+
stdexec::submit(static_cast<_Sender&&>(__sndr), __submit_receiver{});
155+
} else if constexpr (__callable<get_allocator_t, _Env>) {
156+
// Use the provided allocator if any to allocate the operation state.
122157
auto __alloc = get_allocator(__env);
123158
using _Alloc = decltype(__alloc);
124159
using _OpAlloc = typename std::allocator_traits<_Alloc>::template rebind_alloc<_Op>;
125-
// We use the allocator to allocate the operation state and also to construct
126-
// it.
160+
// We use the allocator to allocate the op state and also to construct it.
127161
_OpAlloc __op_alloc{__alloc};
128162
_Op* __op = std::allocator_traits<_OpAlloc>::allocate(__op_alloc, 1);
129163
__scope_guard __g{[__op, &__op_alloc]() noexcept {
@@ -135,12 +169,12 @@ namespace stdexec {
135169
__op_alloc, __op, static_cast<_Sender&&>(__sndr), static_cast<_Env&&>(__env));
136170
// The operation state is now constructed, dismiss the scope guard.
137171
__g.__dismiss();
138-
// This cannot throw:
139-
stdexec::start(__op->__op_state_);
140172
} else {
141173
// The caller did not provide an allocator, so we use the default allocator.
174+
[[maybe_unused]]
142175
_Op* __op = new _Op(static_cast<_Sender&&>(__sndr), static_cast<_Env&&>(__env));
143-
start(__op->__op_state_);
176+
// The operation has now started and is responsible for deleting itself when it
177+
// completes.
144178
}
145179
}
146180
};

0 commit comments

Comments
 (0)