diff options
author | Marat Dukhan <maratek@google.com> | 2020-03-26 11:02:12 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2020-03-26 11:02:12 -0700 |
commit | 6469659dd404768fd80f1989dfd66930a6587bf8 (patch) | |
tree | 3c2d9013c8434be37294c112f8d696cfb96a7041 /src | |
parent | 31b939ccfec1347dbd4e8fbfd7bb9bde467ae0f0 (diff) | |
download | pthreadpool-6469659dd404768fd80f1989dfd66930a6587bf8.tar.gz |
Refactor multi-threaded case of parallelization functions
- Extract multi-threaded setup logic into a generalized pthreadpool_parallelize function
- Call into pthreadpool_parallelize directly from tiled and 2+-dimensional functions
Diffstat (limited to 'src')
-rw-r--r-- | src/threadpool-pthreads.c | 247 |
1 files changed, 142 insertions, 105 deletions
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 934e5e7..6ebd521 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,4 +1,5 @@ /* Standard C headers */ +#include <assert.h> #include <stdbool.h> #include <stdint.h> #include <stdlib.h> @@ -124,7 +125,7 @@ static inline size_t min(size_t a, size_t b) { enum threadpool_command { threadpool_command_init, - threadpool_command_compute_1d, + threadpool_command_parallelize, threadpool_command_shutdown, }; @@ -182,6 +183,10 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthreadpool_atomic_uint32_t command; /** + * The entry point function to call for each thread in the thread pool for parallelization tasks. + */ + pthreadpool_atomic_void_p thread_function; + /** * The function to call for each item. */ pthreadpool_atomic_void_p task; @@ -190,7 +195,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthreadpool_atomic_void_p argument; /** - * Copy of the flags passed to parallelization function. + * Copy of the flags passed to a parallelization function. */ pthreadpool_atomic_uint32_t flags; /** @@ -322,6 +327,8 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) { return i - 1; } +typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread); + static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); @@ -411,13 +418,16 @@ static void* thread_main(void* arg) { /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { - case threadpool_command_compute_1d: + case threadpool_command_parallelize: { + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { saved_fpu_state = get_fpu_state(); disable_fpu_denormals(); } - thread_parallelize_1d(threadpool, thread); + + thread_function(threadpool, thread); if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { set_fpu_state(saved_fpu_state); } @@ -526,6 +536,107 @@ size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { } } +static void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + pthread_mutex_lock(&threadpool->execution_mutex); + + #if !PTHREADPOOL_USE_FUTEX + /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ + pthread_mutex_lock(&threadpool->command_mutex); + #endif + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + #endif + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary even with a conditional variable, because the workers might + * be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + #if PTHREADPOOL_USE_FUTEX + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + /* Unlock the command variables before waking up the threads for better performance */ + pthread_mutex_unlock(&threadpool->command_mutex); + + /* Wake up the threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + #endif + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_function(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* Wait until the threads finish computation */ + wait_worker_threads(threadpool); + + /* Make changes by other threads visible to this thread */ + pthreadpool_fence_acquire(); + + /* Unprotect the global threadpool structures */ + pthread_mutex_unlock(&threadpool->execution_mutex); +} + void pthreadpool_parallelize_1d( struct pthreadpool* threadpool, pthreadpool_task_1d_t task, @@ -547,91 +658,9 @@ void pthreadpool_parallelize_1d( set_fpu_state(saved_fpu_state); } } else { - /* Protect the global threadpool structures */ - pthread_mutex_lock(&threadpool->execution_mutex); - - #if !PTHREADPOOL_USE_FUTEX - /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); - #endif - - /* Setup global arguments */ - pthreadpool_store_relaxed_void_p(&threadpool->task, task); - pthreadpool_store_relaxed_void_p(&threadpool->argument, argument); - pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); - - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - const size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - #endif - - /* Spread the work between threads */ - size_t range_start = 0; - for (size_t tid = 0; tid < threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - const size_t range_end = multiply_divide(range, tid + 1, threads_count); - pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); - pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); - pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); - - /* The next range starts where the previous ended */ - range_start = range_end; - } - - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. - */ - const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; - - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated command parameters. - * - * Note: release semantics is necessary even with a conditional variable, because the workers might - * be waiting in a spin-loop rather than the conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - #if PTHREADPOOL_USE_FUTEX - /* Wake up the threads */ - futex_wake_all(&threadpool->command); - #else - /* Unlock the command variables before waking up the threads for better performance */ - pthread_mutex_unlock(&threadpool->command_mutex); - - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); - #endif - - /* Save and modify FPU denormals control, if needed */ - struct fpu_state saved_fpu_state = { 0 }; - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - saved_fpu_state = get_fpu_state(); - disable_fpu_denormals(); - } - - /* Do computations as worker #0 */ - thread_parallelize_1d(threadpool, &threadpool->threads[0]); - - /* Restore FPU denormals control, if needed */ - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - set_fpu_state(saved_fpu_state); - } - - /* Wait until the threads finish computation */ - wait_worker_threads(threadpool); - - /* Make changes by other threads visible to this thread */ - pthreadpool_fence_acquire(); - - /* Unprotect the global threadpool structures */ - pthread_mutex_unlock(&threadpool->execution_mutex); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) task, argument, range, flags); } } @@ -679,7 +708,9 @@ void pthreadpool_parallelize_1d_tile_1d( .range = range, .tile = tile }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_1d_tile_1d, &context, tile_range, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_1d_tile_1d, &context, tile_range, flags); } } @@ -725,7 +756,9 @@ void pthreadpool_parallelize_2d( .argument = argument, .range_j = fxdiv_init_size_t(range_j) }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d, &context, range_i * range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d, &context, range_i * range_j, flags); } } @@ -783,7 +816,9 @@ void pthreadpool_parallelize_2d_tile_1d( .range_j = range_j, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d_tile_1d, &context, range_i * tile_range_j, flags); } } @@ -847,7 +882,9 @@ void pthreadpool_parallelize_2d_tile_2d( .tile_i = tile_i, .tile_j = tile_j }; - pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags); } } @@ -919,9 +956,9 @@ void pthreadpool_parallelize_3d_tile_2d( .tile_j = tile_j, .tile_k = tile_k }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_3d_tile_2d, &context, - range_i * tile_range_j * tile_range_k, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_3d_tile_2d, &context, range_i * tile_range_j * tile_range_k, flags); } } @@ -1002,9 +1039,9 @@ void pthreadpool_parallelize_4d_tile_2d( .tile_k = tile_k, .tile_l = tile_l }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_4d_tile_2d, &context, - range_i * range_j * tile_range_k * tile_range_l, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_4d_tile_2d, &context, range_i * range_j * tile_range_k * tile_range_l, flags); } } @@ -1094,9 +1131,9 @@ void pthreadpool_parallelize_5d_tile_2d( .tile_l = tile_l, .tile_m = tile_m, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_5d_tile_2d, &context, - range_i * range_j * range_k * tile_range_l * tile_range_m, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_5d_tile_2d, &context, range_i * range_j * range_k * tile_range_l * tile_range_m, flags); } } @@ -1194,9 +1231,9 @@ void pthreadpool_parallelize_6d_tile_2d( .tile_m = tile_m, .tile_n = tile_n, }; - pthreadpool_parallelize_1d(threadpool, - (pthreadpool_task_1d_t) compute_6d_tile_2d, &context, - range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); + pthreadpool_parallelize( + threadpool, &thread_parallelize_1d, + (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); } } |