1818#pragma once
1919
2020#include " ../stdexec/__detail/__atomic.hpp"
21+ #include " ../stdexec/__detail/__bulk.hpp"
22+ #include " ../stdexec/__detail/__completion_signatures.hpp"
23+ #include " ../stdexec/__detail/__concepts.hpp"
2124#include " ../stdexec/__detail/__config.hpp"
25+ #include " ../stdexec/__detail/__domain.hpp"
26+ #include " ../stdexec/__detail/__execution_fwd.hpp"
27+ #include " ../stdexec/__detail/__execution_legacy.hpp"
28+ #include " ../stdexec/__detail/__get_completion_signatures.hpp"
2229#include " ../stdexec/__detail/__intrusive_queue.hpp"
23- #include " ../stdexec/__detail/__manual_lifetime.hpp" // IWYU pragma: keep
24- #include " ../stdexec/__detail/__meta.hpp" // IWYU pragma: keep
25- #include " ../stdexec/execution.hpp"
30+ #include " ../stdexec/__detail/__manual_lifetime.hpp"
31+ #include " ../stdexec/__detail/__meta.hpp"
32+ #include " ../stdexec/__detail/__optional.hpp"
33+ #include " ../stdexec/__detail/__receivers.hpp"
34+ #include " ../stdexec/__detail/__transform_completion_signatures.hpp"
35+ #include " ../stdexec/__detail/__tuple.hpp"
36+ #include " ../stdexec/__detail/__type_traits.hpp"
37+ #include " ../stdexec/__detail/__variant.hpp"
38+
2639#include " detail/atomic_intrusive_queue.hpp"
2740#include " detail/bwos_lifo_queue.hpp"
2841#include " detail/numa.hpp"
3750#include < condition_variable>
3851#include < cstdint>
3952#include < exception>
53+ #include < limits>
4054#include < mutex>
55+ #include < random>
4156#include < span>
4257#include < thread>
4358#include < type_traits>
@@ -690,12 +705,12 @@ namespace experimental::execution
690705
691706 alignas (64 ) __std::atomic<std::uint32_t> num_active_{};
692707 alignas (64 ) remote_queue_list remotes_;
693- std::uint32_t thread_count_;
694- std::uint32_t max_steals_{thread_count_ + 1 };
695- bwos_params params_;
696- std::vector<std::thread> threads_;
697- std::vector<std::optional <thread_state>> thread_states_;
698- numa_policy numa_;
708+ std::uint32_t thread_count_;
709+ std::uint32_t max_steals_{thread_count_ + 1 };
710+ bwos_params params_;
711+ std::vector<std::thread> threads_;
712+ std::vector<__optional <thread_state>> thread_states_;
713+ numa_policy numa_;
699714
700715 struct thread_index_by_numa_node
701716 {
@@ -1416,12 +1431,10 @@ namespace experimental::execution
14161431 }
14171432 };
14181433
1419- using variant_t = __value_types_of_t <CvSender,
1420- env_of_t <Receiver>,
1421- __q<__decayed_std_tuple>,
1422- __q<__nullable_std_variant>>;
1434+ using variant_t =
1435+ __value_types_of_t <CvSender, env_of_t <Receiver>, __q<__decayed_tuple>, __q<__variant>>;
14231436
1424- variant_t data_;
1437+ variant_t data_{STDEXEC::__no_init} ;
14251438 _static_thread_pool& pool_;
14261439 Receiver rcvr_;
14271440 Shape shape_;
@@ -1440,7 +1453,7 @@ namespace experimental::execution
14401453 if constexpr (Parallelize)
14411454 {
14421455 return static_cast <std::uint32_t >(
1443- ( std::min) (shape_, static_cast <Shape> (pool_.available_parallelism ())));
1456+ __umin ({ std::size_t (shape_), std::size_t (pool_.available_parallelism ())} ));
14441457 }
14451458 else
14461459 {
@@ -1451,19 +1464,8 @@ namespace experimental::execution
14511464 template <class F >
14521465 void apply (F f)
14531466 {
1454- std::visit (
1455- [&]<class Tuple >(Tuple& tupl) -> void
1456- {
1457- if constexpr (__std::same_as<Tuple, std::monostate>)
1458- {
1459- STDEXEC_TERMINATE ();
1460- }
1461- else
1462- {
1463- std::apply ([&](auto &... args) -> void { f (args...); }, tupl);
1464- }
1465- },
1466- data_);
1467+ STDEXEC_ASSERT (!data_.__is_valueless ());
1468+ __visit ([&](auto & tupl) -> void { __apply (std::move (f), tupl); }, data_);
14671469 }
14681470
14691471 // ! Construct from a pool, receiver, shape, and function.
@@ -1501,7 +1503,7 @@ namespace experimental::execution
15011503 template <class ... As>
15021504 void set_value (As&&... as) noexcept
15031505 {
1504- using tuple_t = __decayed_std_tuple <As...>;
1506+ using tuple_t = __decayed_tuple <As...>;
15051507
15061508 shared_state& state = shared_state_;
15071509
@@ -1514,6 +1516,7 @@ namespace experimental::execution
15141516 if constexpr (MayThrow)
15151517 {
15161518 STDEXEC::set_error (std::move (state.rcvr_ ), std::current_exception ());
1519+ return ;
15171520 }
15181521 }
15191522
@@ -1523,7 +1526,7 @@ namespace experimental::execution
15231526 }
15241527 else
15251528 {
1526- state.apply ([&](auto &... args)
1529+ state.apply ([&](auto &... args) noexcept -> void
15271530 { STDEXEC::set_value (std::move (state.rcvr_ ), std::move (args)...); });
15281531 }
15291532 }
@@ -1761,7 +1764,7 @@ namespace experimental::execution
17611764 std::size_t nthreads = this ->pool_ .available_parallelism ();
17621765 bwos_params params = this ->pool_ .params ();
17631766 std::size_t local_size = params.blockSize * params.numBlocks ;
1764- std::size_t chunk_size = (std::min) ( size / nthreads, local_size * nthreads);
1767+ std::size_t chunk_size = __umin ({ size / nthreads, local_size * nthreads} );
17651768 auto & remote_queue = *this ->pool_ .get_remote_queue ();
17661769 auto it = std::ranges::begin (this ->range_ );
17671770 std::size_t i0 = 0 ;
0 commit comments