Skip to content

Commit b8279c3

Browse files
gonnetxnnpack-bot
authored andcommitted
Change the approach to recruiting threads from a pthreadpool_executor.
Since calling the `pthreadpool_executor`'s `schedule` function can be expensive, the calling thread currently packed an array of data for the threads needing to be scheduled and, in turn, scheduled a single call to the `wake_up_threads` function that would schedule them. This apporach, although more efficient than letting the calling thread do the scheduling, has high latency due to the additional level of indirection. This change removes the `wake_up_threads` function and modifies `ensure_num_threads` to check if any threads with an ID higher than the current thread need to be scheduled, and schedules only the first two it finds. A call to `ensure_num_threads` is added at the top of the `thread_main` function for each scheduled thread, thus ensuring that threads are scheduled in a cascading fashion. If no threads are scheduled, * The first call to `pthreadpool_parallelize` will call `ensure_num_threads(threadpool, 0)`, and the `pthreadpool_executor`'s `schedule` function once to schedule the first and second threads, then go on to do work. * The first thread, at the top of the `thread_main` function, will then call `ensure_num_threads(threadpool, 1)` and schedule the third and fourth threads, then go on to do work. * The second thread, etc... One advantage of this approach is that `ensure_num_threads` will stop scheduling threads if the work has already been completed, thus avoiding overheads where we have a large number of threads, but very little work. Since each thread schedules two more threads, the latency until the last thread is scheduled is at most `O(log(n))`. PiperOrigin-RevId: 814693873
1 parent d5952a0 commit b8279c3

1 file changed

Lines changed: 48 additions & 65 deletions

File tree

src/pthreads.c

Lines changed: 48 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,50 @@ static uint32_t thread_wrap_up(struct pthreadpool* threadpool,
432432
return curr_active_threads;
433433
}
434434

435+
static void* thread_main(void* arg);
436+
437+
static void ensure_num_threads(struct pthreadpool* threadpool,
438+
uint32_t thread_id) {
439+
struct pthreadpool_executor* executor = &threadpool->executor;
440+
441+
/* If we're not using an executor, do nothing. */
442+
if (!executor->num_threads) {
443+
return;
444+
}
445+
446+
// Get the number of required threads.
447+
const uint32_t max_num_threads = pthreadpool_load_acquire_size_t(
448+
(pthreadpool_atomic_size_t*)&threadpool->threads_count.value);
449+
450+
// Start up to two other threads so that we fan out exponentially.
451+
uint32_t num_threads_to_start = 2; // thread_id > 0 ? 2 : 1;
452+
453+
/* Schedule any missing threads for this threadpool. */
454+
for (uint32_t tid = thread_id + 1;
455+
num_threads_to_start && tid < max_num_threads; tid++) {
456+
// Check whether this thread was active, and if not, schedule it.
457+
struct thread_info* thread = &threadpool->threads[tid];
458+
if (!pthreadpool_load_relaxed_uint32_t(&thread->is_active) &&
459+
!pthreadpool_exchange_sequentially_consistent_uint32_t(
460+
&thread->is_active, 1)) {
461+
// Make sure there is still ongoing work.
462+
if (thread_id) {
463+
const int32_t curr_active_threads =
464+
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
465+
if (curr_active_threads < 0 ||
466+
curr_active_threads == PTHREADPOOL_NUM_ACTIVE_THREADS_DONE) {
467+
return;
468+
}
469+
}
470+
471+
pthreadpool_register_threads(threadpool, 1);
472+
executor->schedule(threadpool->executor_context, thread,
473+
(void (*)(void*))thread_main);
474+
num_threads_to_start--;
475+
}
476+
}
477+
}
478+
435479
static void* thread_main(void* arg) {
436480
// Unpack the argument, i.e. extract the pointer to the `pthreadpool` from the
437481
// provided pointer to this thread's `thread_info`.
@@ -443,6 +487,9 @@ static void* thread_main(void* arg) {
443487
offsetof(struct pthreadpool, threads));
444488
uint32_t last_job_id = 0;
445489

490+
// Check whether we have to wake up any other threads.
491+
ensure_num_threads(threadpool, thread_id);
492+
446493
// Get the current threadpool state.
447494
int32_t curr_active_threads =
448495
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads);
@@ -603,70 +650,6 @@ struct pthreadpool* pthreadpool_create_v2(struct pthreadpool_executor* executor,
603650
return threadpool;
604651
}
605652

606-
static void wake_up_threads(void** contexts) {
607-
struct pthreadpool* threadpool = (struct pthreadpool*)contexts[0];
608-
struct pthreadpool_executor* executor = &threadpool->executor;
609-
for (uint32_t k = 1; contexts[k] != NULL; k++) {
610-
if (contexts[k + 1] != NULL) {
611-
/* Fly, my pretties! Fly, fly, fly! */
612-
executor->schedule(threadpool->executor_context, contexts[k],
613-
(void (*)(void*))thread_main);
614-
} else {
615-
void* context = contexts[k];
616-
free(contexts);
617-
thread_main(context);
618-
return;
619-
}
620-
}
621-
}
622-
623-
static void ensure_num_threads(struct pthreadpool* threadpool,
624-
uint32_t num_threads) {
625-
assert(num_threads >= 1);
626-
assert(num_threads <= threadpool->max_num_threads);
627-
struct pthreadpool_executor* executor = &threadpool->executor;
628-
629-
/* If we're not using an executor, do nothing. */
630-
if (!executor->num_threads) {
631-
return;
632-
}
633-
634-
void** thread_contexts = alloca(sizeof(void*) * num_threads);
635-
int32_t num_threads_to_wake = 0;
636-
637-
/* Create any missing threads for this threadpool. */
638-
for (uint32_t tid = 1;
639-
tid < num_threads &&
640-
pthreadpool_load_consume_int32_t(&threadpool->num_active_threads) > 0;
641-
tid++) {
642-
struct thread_info* thread = &threadpool->threads[tid];
643-
644-
// Check whether this thread was active, and if not, add it to the list of
645-
// threads that need starting.
646-
if (!pthreadpool_exchange_sequentially_consistent_uint32_t(
647-
&thread->is_active, 1)) {
648-
pthreadpool_register_threads(threadpool, 1);
649-
thread_contexts[num_threads_to_wake++] = thread;
650-
}
651-
}
652-
653-
if (num_threads_to_wake > 1) {
654-
void** contexts = malloc(sizeof(void*) * (num_threads_to_wake + 2));
655-
contexts[0] = threadpool;
656-
memcpy(contexts + 1, thread_contexts, sizeof(void*) * num_threads_to_wake);
657-
contexts[num_threads_to_wake + 1] = NULL;
658-
/* Fly, my pretties! Fly, fly, fly! */
659-
executor->schedule(threadpool->executor_context, contexts,
660-
(void (*)(void*))wake_up_threads);
661-
} else if (num_threads_to_wake == 1) {
662-
for (int k = 0; k < num_threads_to_wake; k++) {
663-
/* Fly, my pretties! Fly, fly, fly! */
664-
executor->schedule(threadpool->executor_context, thread_contexts[k],
665-
(void (*)(void*))thread_main);
666-
}
667-
}
668-
}
669-
670653
PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
671654
struct pthreadpool* threadpool, thread_function_t thread_function,
672655
const void* params, size_t params_size, void* task, void* context,
@@ -737,7 +720,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
737720
threadpool->max_num_threads - num_threads);
738721

739722
/* Make sure we have enough threads running. */
740-
ensure_num_threads(threadpool, num_threads);
723+
ensure_num_threads(threadpool, /*thread_id=*/0);
741724

742725
/* Do a bit of work ourselves, as thread zero. */
743726
run_thread_function(threadpool, /*thread_id=*/0);

0 commit comments

Comments
 (0)