1818#pragma once
1919
2020#include " ../stdexec/__detail/__atomic.hpp"
21+ #include " ../stdexec/__detail/__bulk.hpp"
22+ #include " ../stdexec/__detail/__completion_signatures.hpp"
23+ #include " ../stdexec/__detail/__concepts.hpp"
2124#include " ../stdexec/__detail/__config.hpp"
25+ #include " ../stdexec/__detail/__domain.hpp"
26+ #include " ../stdexec/__detail/__execution_fwd.hpp"
27+ #include " ../stdexec/__detail/__execution_legacy.hpp"
28+ #include " ../stdexec/__detail/__get_completion_signatures.hpp"
2229#include " ../stdexec/__detail/__intrusive_queue.hpp"
2330#include " ../stdexec/__detail/__manual_lifetime.hpp" // IWYU pragma: keep
2431#include " ../stdexec/__detail/__meta.hpp" // IWYU pragma: keep
25- #include " ../stdexec/execution.hpp"
32+ #include " ../stdexec/__detail/__receivers.hpp"
33+ #include " ../stdexec/__detail/__transform_completion_signatures.hpp"
34+ #include " ../stdexec/__detail/__tuple.hpp"
35+ #include " ../stdexec/__detail/__type_traits.hpp"
36+ #include " ../stdexec/__detail/__variant.hpp"
37+
2638#include " detail/atomic_intrusive_queue.hpp"
2739#include " detail/bwos_lifo_queue.hpp"
2840#include " detail/numa.hpp"
3749#include < condition_variable>
3850#include < cstdint>
3951#include < exception>
52+ #include < limits>
4053#include < mutex>
54+ #include < random>
4155#include < span>
4256#include < thread>
4357#include < type_traits>
@@ -1416,12 +1430,10 @@ namespace experimental::execution
14161430 }
14171431 };
14181432
1419- using variant_t = __value_types_of_t <CvSender,
1420- env_of_t <Receiver>,
1421- __q<__decayed_std_tuple>,
1422- __q<__nullable_std_variant>>;
1433+ using variant_t =
1434+ __value_types_of_t <CvSender, env_of_t <Receiver>, __q<__decayed_tuple>, __q<__variant>>;
14231435
1424- variant_t data_;
1436+ variant_t data_{STDEXEC::__no_init} ;
14251437 _static_thread_pool& pool_;
14261438 Receiver rcvr_;
14271439 Shape shape_;
@@ -1440,7 +1452,7 @@ namespace experimental::execution
14401452 if constexpr (Parallelize)
14411453 {
14421454 return static_cast <std::uint32_t >(
1443- (std::min) ( shape_, static_cast <Shape>(pool_.available_parallelism ())));
1455+ __umin ({ shape_, static_cast <Shape>(pool_.available_parallelism ())} ));
14441456 }
14451457 else
14461458 {
@@ -1451,19 +1463,8 @@ namespace experimental::execution
14511463 template <class F >
14521464 void apply (F f)
14531465 {
1454- std::visit (
1455- [&]<class Tuple >(Tuple& tupl) -> void
1456- {
1457- if constexpr (__std::same_as<Tuple, std::monostate>)
1458- {
1459- STDEXEC_TERMINATE ();
1460- }
1461- else
1462- {
1463- std::apply ([&](auto &... args) -> void { f (args...); }, tupl);
1464- }
1465- },
1466- data_);
1466+ STDEXEC_ASSERT (!data_.__is_valueless ());
1467+ __visit ([&](auto & tupl) -> void { __apply (std::move (f), tupl); }, data_);
14671468 }
14681469
14691470 // ! Construct from a pool, receiver, shape, and function.
@@ -1501,7 +1502,7 @@ namespace experimental::execution
15011502 template <class ... As>
15021503 void set_value (As&&... as) noexcept
15031504 {
1504- using tuple_t = __decayed_std_tuple <As...>;
1505+ using tuple_t = __decayed_tuple <As...>;
15051506
15061507 shared_state& state = shared_state_;
15071508
@@ -1514,6 +1515,7 @@ namespace experimental::execution
15141515 if constexpr (MayThrow)
15151516 {
15161517 STDEXEC::set_error (std::move (state.rcvr_ ), std::current_exception ());
1518+ return ;
15171519 }
15181520 }
15191521
@@ -1523,7 +1525,7 @@ namespace experimental::execution
15231525 }
15241526 else
15251527 {
1526- state.apply ([&](auto &... args)
1528+ state.apply ([&](auto &... args) noexcept -> void
15271529 { STDEXEC::set_value (std::move (state.rcvr_ ), std::move (args)...); });
15281530 }
15291531 }
@@ -1761,7 +1763,7 @@ namespace experimental::execution
17611763 std::size_t nthreads = this ->pool_ .available_parallelism ();
17621764 bwos_params params = this ->pool_ .params ();
17631765 std::size_t local_size = params.blockSize * params.numBlocks ;
1764- std::size_t chunk_size = (std::min) ( size / nthreads, local_size * nthreads);
1766+ std::size_t chunk_size = __umin ({ size / nthreads, local_size * nthreads} );
17651767 auto & remote_queue = *this ->pool_ .get_remote_queue ();
17661768 auto it = std::ranges::begin (this ->range_ );
17671769 std::size_t i0 = 0 ;
0 commit comments