Skip to content

Commit f102fe9

Browse files
committed
Customize bulk_chunked instead of bulk for static thread pool.
1 parent 3984ae5 commit f102fe9

2 files changed

Lines changed: 64 additions & 17 deletions

File tree

include/exec/static_thread_pool.hpp

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,11 @@ namespace exec {
195195
#endif
196196

197197
template <class Fun, class Shape, class... Args>
198-
requires __callable<Fun, Shape, Args&...>
198+
requires __callable<Fun, Shape, Shape, Args&...>
199199
using bulk_non_throwing = //
200200
__mbool<
201201
// If function invocation doesn't throw
202-
__nothrow_callable<Fun, Shape, Args&...> &&
202+
__nothrow_callable<Fun, Shape, Shape, Args&...> &&
203203
// and emplacing a tuple doesn't throw
204204
#if STDEXEC_MSVC()
205205
__bulk_non_throwing<Args...>::__v
@@ -236,7 +236,7 @@ namespace exec {
236236

237237
struct transform_bulk {
238238
template <class Data, class Sender>
239-
auto operator()(bulk_t, Data&& data, Sender&& sndr) {
239+
auto operator()(bulk_chunked_t, Data&& data, Sender&& sndr) {
240240
auto [pol, shape, fun] = static_cast<Data&&>(data);
241241
// TODO: handle non-par execution policies
242242
return bulk_sender_t<Sender, decltype(shape), decltype(fun)>{
@@ -265,7 +265,7 @@ namespace exec {
265265
public:
266266
struct domain : stdexec::default_domain {
267267
// For eager customization
268-
template <sender_expr_for<bulk_t> Sender>
268+
template <sender_expr_for<bulk_chunked_t> Sender>
269269
auto transform_sender(Sender&& sndr) const noexcept {
270270
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
271271
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
@@ -279,8 +279,8 @@ namespace exec {
279279
}
280280
}
281281

282-
// transform the generic bulk sender into a parallel thread-pool bulk sender
283-
template <sender_expr_for<bulk_t> Sender, class Env>
282+
// transform the generic bulk_chunked sender into a parallel thread-pool bulk sender
283+
template <sender_expr_for<bulk_chunked_t> Sender, class Env>
284284
auto transform_sender(Sender&& sndr, const Env& env) const noexcept {
285285
if constexpr (__completes_on<Sender, static_thread_pool_::scheduler>) {
286286
auto sched = get_completion_scheduler<set_value_t>(get_env(sndr));
@@ -679,9 +679,8 @@ namespace exec {
679679

680680
for (std::uint32_t index = 0; index < threadCount; ++index) {
681681
threadStates_[index].emplace(this, index, params, numa_);
682-
threadIndexByNumaNode_.push_back(
683-
thread_index_by_numa_node{
684-
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
682+
threadIndexByNumaNode_.push_back(thread_index_by_numa_node{
683+
.numa_node = threadStates_[index]->numa_node(), .thread_index = index});
685684
}
686685

687686
// NOLINTNEXTLINE(modernize-use-ranges) we still support platforms without the std::ranges algorithms
@@ -1158,9 +1157,7 @@ namespace exec {
11581157
// In the case that the shape is much larger than the total number of threads,
11591158
// then each call to computation will call the function many times.
11601159
auto [begin, end] = even_share(sh_state.shape_, tid, total_threads);
1161-
for (Shape i = begin; i < end; ++i) {
1162-
sh_state.fun_(i, args...);
1163-
}
1160+
sh_state.fun_(begin, end, args...);
11641161
};
11651162

11661163
auto completion = [&](auto&... args) {

test/stdexec/algos/adaptors/test_bulk.cpp

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,6 @@ namespace {
892892
}
893893
}
894894

895-
// TODO: also add similar tests for bulk_chunked and bulk_unchunked
896895
TEST_CASE("eager customization of bulk works with static thread pool", "[adaptors][bulk]") {
897896
exec::static_thread_pool pool{4};
898897
ex::scheduler auto sch = pool.get_scheduler();
@@ -907,7 +906,6 @@ namespace {
907906

908907
auto snd = ex::just() //
909908
| ex::continues_on(sch) | ex::bulk(ex::par, tids.size(), fun);
910-
CHECK(std::equal_to<void*>()(&snd.pool_, &pool));
911909
stdexec::sync_wait(std::move(snd));
912910

913911
// All the work should not have run on the same thread
@@ -918,7 +916,34 @@ namespace {
918916
}
919917
}
920918

921-
// TODO: also add similar tests for bulk_chunked and bulk_unchunked
919+
TEST_CASE(
920+
"eager customization of bulk_chunked works with static thread pool",
921+
"[adaptors][bulk]") {
922+
exec::static_thread_pool pool{4};
923+
ex::scheduler auto sch = pool.get_scheduler();
924+
925+
SECTION("Without values in the set_value channel") {
926+
std::vector<std::thread::id> tids(42);
927+
928+
auto fun = [&tids](std::size_t b, std::size_t e) {
929+
while (b < e) {
930+
tids[b++] = std::this_thread::get_id();
931+
}
932+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
933+
};
934+
935+
auto snd = ex::just() //
936+
| ex::continues_on(sch) | ex::bulk_chunked(ex::par, tids.size(), fun);
937+
stdexec::sync_wait(std::move(snd));
938+
939+
// All the work should not have run on the same thread
940+
const auto actual = static_cast<std::size_t>(std::count(tids.begin(), tids.end(), tids[0]));
941+
const std::size_t wrong = tids.size();
942+
943+
CHECK(actual != wrong);
944+
}
945+
}
946+
922947
TEST_CASE("lazy customization of bulk works with static thread pool", "[adaptors][bulk]") {
923948
exec::static_thread_pool pool{4};
924949
ex::scheduler auto sch = pool.get_scheduler();
@@ -943,6 +968,32 @@ namespace {
943968
}
944969
}
945970

971+
TEST_CASE("lazy customization of bulk_chunked works with static thread pool", "[adaptors][bulk]") {
972+
exec::static_thread_pool pool{4};
973+
ex::scheduler auto sch = pool.get_scheduler();
974+
975+
SECTION("Without values in the set_value channel") {
976+
std::vector<std::thread::id> tids(42);
977+
978+
auto fun = [&tids](std::size_t b, std::size_t e) {
979+
while (b < e) {
980+
tids[b++] = std::this_thread::get_id();
981+
}
982+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
983+
};
984+
985+
auto snd = ex::just() //
986+
| ex::bulk_chunked(ex::par, tids.size(), fun);
987+
stdexec::sync_wait(stdexec::starts_on(sch, std::move(snd)));
988+
989+
// All the work should not have run on the same thread
990+
const auto actual = static_cast<std::size_t>(std::count(tids.begin(), tids.end(), tids[0]));
991+
const std::size_t wrong = tids.size();
992+
993+
CHECK(actual != wrong);
994+
}
995+
}
996+
946997
TEST_CASE("default bulk works with non-default constructible types", "[adaptors][bulk]") {
947998
ex::sender auto s = ex::just(non_default_constructible{42})
948999
| ex::bulk(ex::par, 1, [](int, auto&) { });
@@ -986,8 +1037,7 @@ namespace {
9861037
auto snd = ex::just(std::string{"hello"})
9871038
| exec::on(
9881039
sched, //
989-
ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; }))
990-
| exec::write(stdexec::prop{ex::get_scheduler, inline_scheduler()});
1040+
ex::bulk(ex::par, 1, [&called](int, std::string x) { called = true; }));
9911041
wait_for_value(std::move(snd), std::string{"hijacked"});
9921042
REQUIRE_FALSE(called);
9931043
}

0 commit comments

Comments
 (0)