Skip to content

Commit b18f599

Browse files
committed
Remove the thread pool size limitation
The thread pool is now dynamic vector, allowing one thread per core on CPUs with many cores. Since the maximum number of threads is not known at compile time, the `signals` array is now a vector
1 parent ddf0cb4 commit b18f599

3 files changed

Lines changed: 13 additions & 19 deletions

File tree

modules/mux/targets/host/include/host/thread_pool.h

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <map>
2727
#include <mutex>
2828
#include <new>
29+
#include <vector>
2930

3031
#ifdef CA_HOST_ENABLE_PAPI_COUNTERS
3132
#include <papi.h>
@@ -95,17 +96,14 @@ struct thread_pool_s final {
9596
/// @param[in] user_data User data to pass to the function.
9697
/// @param[in] user_data2 A second user data to pass to the function.
9798
/// @param[in] user_data3 A third user data to pass to the function.
98-
/// @param[in,out] signals A list of bools that will be signalled when each
99+
/// @param[in,out] signals A vector of bools that will be signalled when each
99100
/// slice of the enqueue range has completed.
100101
/// @param[in,out] count A number that is incremented immediately, and
101102
/// decremented when the enqueued function has completed.
102103
/// @param[in] slices The number of pieces that the work is to be divided into
103104
/// when it is enqueued on the thread pool.
104-
///
105-
/// @tparam N Length of `signals`.
106-
template <size_t N>
107105
void enqueue_range(function_t function, void *user_data, void *user_data2,
108-
std::array<std::atomic<bool>, N> &signals,
106+
std::vector<std::atomic<bool>> &signals,
109107
std::atomic<uint32_t> *count, size_t slices) {
110108
const tracer::TraceGuard<tracer::Impl> traceGuard(__func__);
111109

@@ -173,20 +171,15 @@ struct thread_pool_s final {
173171
/// enqueue, wait() will wait for the counter to reach zero.
174172
void wait(std::atomic<uint32_t> *count);
175173

176-
/// The maximum number of threads our thread pool supports. Useful for
177-
/// allocating memory (you know the max size of allocations required).
178-
static const size_t max_num_threads = 32;
179-
180174
/// The maximum number of work that can be enqueued.
181175
static const size_t queue_max = 4096;
182176

183-
/// The number of threads actually initialized in the thread pool. General
184-
/// the lower of the number of cores or max_num_threads, but could be lower in
185-
/// the presence of debug settings.
177+
/// The number of threads actually initialized in the thread pool. In General
178+
/// the number of cores, but could be lower in the presence of debug settings.
186179
size_t initialized_threads;
187180

188181
/// The pool of threads to use for execution.
189-
std::array<cargo::thread, max_num_threads> pool;
182+
std::vector<cargo::thread> pool;
190183

191184
/// The buffer to hold the queue of work.
192185
std::array<thread_pool_work_item_s, queue_max> queue;

modules/mux/targets/host/source/queue.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,9 +249,10 @@ void commandNDRange(host::queue_s *queue, host::command_info_s *info) {
249249
return;
250250
}
251251

252-
constexpr size_t signal_count =
253-
host::thread_pool_s::max_num_threads * slice_multiplier;
254-
std::array<std::atomic<bool>, signal_count> signals;
252+
const size_t signal_count =
253+
host_device->thread_pool.num_threads() * slice_multiplier;
254+
255+
std::vector<std::atomic<bool>> signals(signal_count);
255256
std::atomic<uint32_t> queued(0);
256257
host_device->thread_pool.enqueue_range(
257258
[](void *const in, void *const info, void *, size_t index) {

modules/mux/targets/host/source/thread_pool.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ thread_pool_s::thread_pool_s() : stayAlive(true) {
8787
const size_t hw_threads = cargo::thread::hardware_concurrency();
8888
const size_t desired_threads =
8989
clamp(hw_threads - ca_free_hw_threads, 2, hw_threads);
90-
const size_t max_threads = thread_pool_s::max_num_threads;
91-
size_t debug_threads = thread_pool_s::max_num_threads;
90+
size_t debug_threads = hw_threads;
9291

9392
// Register the value of the CA_HOST_NUM_THREADS environment variable.
9493
// If the programmer has provided an override to the number of threads that
@@ -102,7 +101,8 @@ thread_pool_s::thread_pool_s() : stayAlive(true) {
102101
}
103102

104103
// Must be set before num_threads() is called.
105-
initialized_threads = std::min({desired_threads, max_threads, debug_threads});
104+
initialized_threads = std::min({desired_threads, debug_threads});
105+
pool.resize(num_threads());
106106
for (size_t i = 0, e = num_threads(); i < e; i++) {
107107
pool[i] = cargo::thread(threadFunc, this);
108108
pool[i].set_name("host:pool:" + std::to_string(i));

0 commit comments

Comments
 (0)