Skip to content

Commit f2576ce

Browse files
authored
Customize bulk_chunked and bulk_unchunked for parallel_scheduler (#1532)
Doing this, we will align with the intent of P2079R7. - the backend supports `shedule_bulk_chunked` and `schedule_bulk_unchunked` instead of `bulk_schedule` - update replaceability API to match P2079R7 - add chunking logic - if execution policy of bulk_chunked doesn't require parallelization, call only one chunk - add more tests
1 parent 35a3e31 commit f2576ce

4 files changed

Lines changed: 297 additions & 61 deletions

File tree

include/exec/__detail/__system_context_default_impl.hpp

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace exec::__system_context_default_impl {
5252
- __recv::__op_ (__operation*) -- 8
5353
- __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 128 (when connected with an empty receiver & fun)
5454
- __operation::__on_heap_ (bool) -- optimized away
55-
- __bulk_functor::__r_ (bulk_item_receiver*) - 8
55+
- __bulk_unchunked_functor::__r_ (bulk_item_receiver*) - 8
5656
---------------------
5757
Total: 152; extra 24 bytes compared to internal operation state.
5858
@@ -159,35 +159,81 @@ namespace exec::__system_context_default_impl {
159159
}
160160
};
161161

162+
template <typename _T>
163+
concept __has_available_paralellism = requires(_T __pool) {
164+
{ __pool.available_parallelism() } -> std::integral;
165+
};
166+
162167
template <typename _BaseSchedulerContext>
163168
struct __generic_impl : parallel_scheduler_backend {
164169
__generic_impl()
165-
: __pool_scheduler_(__pool_.get_scheduler()) {
170+
: __pool_scheduler_(__pool_.get_scheduler())
171+
, __available_parallelism_(0) {
172+
// If the pool exposes the available parallelism, use it to determine the chunk size.
173+
if constexpr (__has_available_paralellism<_BaseSchedulerContext>) {
174+
__available_parallelism_ = static_cast<uint32_t>(__pool_.available_parallelism());
175+
} else {
176+
__available_parallelism_ = std::thread::hardware_concurrency();
177+
}
166178
}
167179
private:
168180
using __pool_scheduler_t = decltype(std::declval<_BaseSchedulerContext>().get_scheduler());
169181

170-
/// The underlying thread pool.
182+
//! The underlying thread pool.
171183
_BaseSchedulerContext __pool_;
184+
//! The scheduler to use for starting work in our pool.
172185
__pool_scheduler_t __pool_scheduler_;
186+
//! The available parallelism of the pool, used to determine the chunk size.
187+
//! Use a value of 0 to disable chunking.
188+
uint32_t __available_parallelism_;
189+
190+
//! Helper class that maps from a chunk index to the start and end of the chunk.
191+
struct __chunker {
192+
uint32_t __chunk_size_;
193+
uint32_t __max_size_;
194+
195+
uint32_t __begin(uint32_t __chunk_index) const noexcept {
196+
return __chunk_index * __chunk_size_;
197+
}
198+
199+
uint32_t __end(uint32_t __chunk_index) const noexcept {
200+
return std::min(__begin(__chunk_index + 1), __max_size_);
201+
}
202+
};
203+
204+
//! Functor called by the `bulk_chunked` operation; sends a `execute` signal to the frontend.
205+
struct __bulk_chunked_functor {
206+
bulk_item_receiver* __r_;
207+
__chunker __chunker_;
208+
209+
void operator()(unsigned long __idx) const noexcept {
210+
auto __chunk_index = static_cast<uint32_t>(__idx);
211+
__r_->execute(__chunker_.__begin(__chunk_index), __chunker_.__end(__chunk_index));
212+
}
213+
};
173214

174-
//! Functor called by the `bulk` operation; sends a `start` signal to the frontend.
175-
struct __bulk_functor {
215+
//! Functor called by the `bulk_unchunked` operation; sends a `execute` signal to the frontend.
216+
struct __bulk_unchunked_functor {
176217
bulk_item_receiver* __r_;
177218

178219
void operator()(unsigned long __idx) const noexcept {
179-
__r_->execute(static_cast<uint32_t>(__idx));
220+
__r_->execute(static_cast<uint32_t>(__idx), static_cast<uint32_t>(__idx + 1));
180221
}
181222
};
182223

183224
using __schedule_operation_t =
184225
__operation<decltype(stdexec::schedule(std::declval<__pool_scheduler_t>()))>;
185226

186-
using __bulk_schedule_operation_t = __operation<decltype(stdexec::bulk(
227+
using __schedule_bulk_chunked_operation_t = __operation<decltype(stdexec::bulk(
187228
stdexec::schedule(std::declval<__pool_scheduler_t>()),
188229
stdexec::par,
189230
std::declval<uint32_t>(),
190-
std::declval<__bulk_functor>()))>;
231+
std::declval<__bulk_chunked_functor>()))>;
232+
using __schedule_bulk_unchunked_operation_t = __operation<decltype(stdexec::bulk(
233+
stdexec::schedule(std::declval<__pool_scheduler_t>()),
234+
stdexec::par,
235+
std::declval<uint32_t>(),
236+
std::declval<__bulk_unchunked_functor>()))>;
191237

192238
public:
193239
void schedule(std::span<std::byte> __storage, receiver& __r) noexcept override {
@@ -201,15 +247,46 @@ namespace exec::__system_context_default_impl {
201247
}
202248
}
203249

204-
void bulk_schedule(
250+
void schedule_bulk_chunked(
205251
uint32_t __size,
206252
std::span<std::byte> __storage,
207253
bulk_item_receiver& __r) noexcept override {
208254
try {
255+
// Determine the chunking size based on the ratio between the given size and the number of workers in our pool.
256+
// Aim at having 2 chunks per worker.
257+
uint32_t __chunk_size =
258+
(__available_parallelism_ > 0 && __size > 3 * __available_parallelism_)
259+
? __size / __available_parallelism_ / 2
260+
: 1;
261+
uint32_t __num_chunks = (__size + __chunk_size - 1) / __chunk_size;
262+
209263
auto __sndr = stdexec::bulk(
210-
stdexec::schedule(__pool_scheduler_), stdexec::par, __size, __bulk_functor{&__r});
211-
auto __os =
212-
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
264+
stdexec::schedule(__pool_scheduler_),
265+
stdexec::par,
266+
__num_chunks,
267+
__bulk_chunked_functor{
268+
&__r, __chunker{__chunk_size, __size}
269+
});
270+
auto __os = __schedule_bulk_chunked_operation_t::__construct_maybe_alloc(
271+
__storage, &__r, std::move(__sndr));
272+
__os->start();
273+
} catch (std::exception& __e) {
274+
__r.set_error(std::current_exception());
275+
}
276+
}
277+
278+
void schedule_bulk_unchunked(
279+
uint32_t __size,
280+
std::span<std::byte> __storage,
281+
bulk_item_receiver& __r) noexcept override {
282+
try {
283+
auto __sndr = stdexec::bulk(
284+
stdexec::schedule(__pool_scheduler_),
285+
stdexec::par,
286+
__size,
287+
__bulk_unchunked_functor{&__r});
288+
auto __os = __schedule_bulk_unchunked_operation_t::__construct_maybe_alloc(
289+
__storage, &__r, std::move(__sndr));
213290
__os->start();
214291
} catch (std::exception& __e) {
215292
__r.set_error(std::current_exception());

include/exec/__detail/__system_context_replaceability_api.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ namespace exec::system_context_replaceability {
113113
/// Receiver for bulk sheduling operations.
114114
struct bulk_item_receiver : receiver {
115115
/// Called for each item of a bulk operation, possible on different threads.
116-
virtual void execute(std::uint32_t) noexcept = 0;
116+
virtual void execute(std::uint32_t, std::uint32_t) noexcept = 0;
117117
};
118118

119119
/// Interface for the parallel scheduler backend.
@@ -125,9 +125,15 @@ namespace exec::system_context_replaceability {
125125
/// Schedule work on parallel scheduler, calling `__r` when done and using `__s` for preallocated
126126
/// memory.
127127
virtual void schedule(std::span<std::byte> __s, receiver& __r) noexcept = 0;
128-
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item and then
129-
/// when done, and using `__s` for preallocated memory.
130-
virtual void bulk_schedule(
128+
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for different
129+
/// subranges of [0, __n), and using `__s` for preallocated memory.
130+
virtual void schedule_bulk_chunked(
131+
std::uint32_t __n,
132+
std::span<std::byte> __s,
133+
bulk_item_receiver& __r) noexcept = 0;
134+
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item, and
135+
/// using `__s` for preallocated memory.
136+
virtual void schedule_bulk_unchunked(
131137
std::uint32_t __n,
132138
std::span<std::byte> __s,
133139
bulk_item_receiver& __r) noexcept = 0;

0 commit comments

Comments
 (0)