aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2020-03-26 11:02:12 -0700
committerMarat Dukhan <maratek@google.com>2020-03-26 11:02:12 -0700
commit6469659dd404768fd80f1989dfd66930a6587bf8 (patch)
tree3c2d9013c8434be37294c112f8d696cfb96a7041 /src
parent31b939ccfec1347dbd4e8fbfd7bb9bde467ae0f0 (diff)
downloadpthreadpool-6469659dd404768fd80f1989dfd66930a6587bf8.tar.gz
Refactor multi-threaded case of parallelization functions
- Extract multi-threaded setup logic into a generalized pthreadpool_parallelize function - Call into pthreadpool_parallelize directly from tiled and 2+-dimensional functions
Diffstat (limited to 'src')
-rw-r--r--src/threadpool-pthreads.c247
1 files changed, 142 insertions, 105 deletions
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c
index 934e5e7..6ebd521 100644
--- a/src/threadpool-pthreads.c
+++ b/src/threadpool-pthreads.c
@@ -1,4 +1,5 @@
/* Standard C headers */
+#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
@@ -124,7 +125,7 @@ static inline size_t min(size_t a, size_t b) {
enum threadpool_command {
threadpool_command_init,
- threadpool_command_compute_1d,
+ threadpool_command_parallelize,
threadpool_command_shutdown,
};
@@ -182,6 +183,10 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
*/
pthreadpool_atomic_uint32_t command;
/**
+ * The entry point function to call for each thread in the thread pool for parallelization tasks.
+ */
+ pthreadpool_atomic_void_p thread_function;
+ /**
* The function to call for each item.
*/
pthreadpool_atomic_void_p task;
@@ -190,7 +195,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
*/
pthreadpool_atomic_void_p argument;
/**
- * Copy of the flags passed to parallelization function.
+ * Copy of the flags passed to a parallelization function.
*/
pthreadpool_atomic_uint32_t flags;
/**
@@ -322,6 +327,8 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) {
return i - 1;
}
+typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread);
+
static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) {
const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);
@@ -411,13 +418,16 @@ static void* thread_main(void* arg) {
/* Process command */
switch (command & THREADPOOL_COMMAND_MASK) {
- case threadpool_command_compute_1d:
+ case threadpool_command_parallelize:
{
+ const thread_function_t thread_function =
+ (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
saved_fpu_state = get_fpu_state();
disable_fpu_denormals();
}
- thread_parallelize_1d(threadpool, thread);
+
+ thread_function(threadpool, thread);
if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
set_fpu_state(saved_fpu_state);
}
@@ -526,6 +536,107 @@ size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) {
}
}
+static void pthreadpool_parallelize(
+ struct pthreadpool* threadpool,
+ thread_function_t thread_function,
+ void* task,
+ void* context,
+ size_t linear_range,
+ uint32_t flags)
+{
+ assert(threadpool != NULL);
+ assert(thread_function != NULL);
+ assert(task != NULL);
+ assert(linear_range > 1);
+
+ /* Protect the global threadpool structures */
+ pthread_mutex_lock(&threadpool->execution_mutex);
+
+ #if !PTHREADPOOL_USE_FUTEX
+ /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
+ pthread_mutex_lock(&threadpool->command_mutex);
+ #endif
+
+ /* Setup global arguments */
+ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
+ pthreadpool_store_relaxed_void_p(&threadpool->task, task);
+ pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
+
+ /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
+ const size_t threads_count = threadpool->threads_count;
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+ #if PTHREADPOOL_USE_FUTEX
+ pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
+ #endif
+
+ /* Spread the work between threads */
+ size_t range_start = 0;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ struct thread_info* thread = &threadpool->threads[tid];
+ const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count);
+ pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
+ pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
+ pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start);
+
+ /* The next subrange starts where the previous ended */
+ range_start = range_end;
+ }
+
+ /*
+ * Update the threadpool command.
+ * Imporantly, do it after initializing command parameters (range, task, argument, flags)
+ * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
+ * to ensure the unmasked command is different then the last command, because worker threads
+ * monitor for change in the unmasked command.
+ */
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
+
+ /*
+ * Store the command with release semantics to guarantee that if a worker thread observes
+ * the new command value, it also observes the updated command parameters.
+ *
+ * Note: release semantics is necessary even with a conditional variable, because the workers might
+ * be waiting in a spin-loop rather than the conditional variable.
+ */
+ pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
+ #if PTHREADPOOL_USE_FUTEX
+ /* Wake up the threads */
+ futex_wake_all(&threadpool->command);
+ #else
+ /* Unlock the command variables before waking up the threads for better performance */
+ pthread_mutex_unlock(&threadpool->command_mutex);
+
+ /* Wake up the threads */
+ pthread_cond_broadcast(&threadpool->command_condvar);
+ #endif
+
+ /* Save and modify FPU denormals control, if needed */
+ struct fpu_state saved_fpu_state = { 0 };
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ /* Do computations as worker #0 */
+ thread_function(threadpool, &threadpool->threads[0]);
+
+ /* Restore FPU denormals control, if needed */
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+
+ /* Wait until the threads finish computation */
+ wait_worker_threads(threadpool);
+
+ /* Make changes by other threads visible to this thread */
+ pthreadpool_fence_acquire();
+
+ /* Unprotect the global threadpool structures */
+ pthread_mutex_unlock(&threadpool->execution_mutex);
+}
+
void pthreadpool_parallelize_1d(
struct pthreadpool* threadpool,
pthreadpool_task_1d_t task,
@@ -547,91 +658,9 @@ void pthreadpool_parallelize_1d(
set_fpu_state(saved_fpu_state);
}
} else {
- /* Protect the global threadpool structures */
- pthread_mutex_lock(&threadpool->execution_mutex);
-
- #if !PTHREADPOOL_USE_FUTEX
- /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
- pthread_mutex_lock(&threadpool->command_mutex);
- #endif
-
- /* Setup global arguments */
- pthreadpool_store_relaxed_void_p(&threadpool->task, task);
- pthreadpool_store_relaxed_void_p(&threadpool->argument, argument);
- pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
-
- /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- const size_t threads_count = threadpool->threads_count;
- pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
- #if PTHREADPOOL_USE_FUTEX
- pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
- #endif
-
- /* Spread the work between threads */
- size_t range_start = 0;
- for (size_t tid = 0; tid < threads_count; tid++) {
- struct thread_info* thread = &threadpool->threads[tid];
- const size_t range_end = multiply_divide(range, tid + 1, threads_count);
- pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
- pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
- pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start);
-
- /* The next range starts where the previous ended */
- range_start = range_end;
- }
-
- /*
- * Update the threadpool command.
- * Imporantly, do it after initializing command parameters (range, task, argument)
- * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
- * to ensure the unmasked command is different then the last command, because worker threads
- * monitor for change in the unmasked command.
- */
- const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
- const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d;
-
- /*
- * Store the command with release semantics to guarantee that if a worker thread observes
- * the new command value, it also observes the updated command parameters.
- *
- * Note: release semantics is necessary even with a conditional variable, because the workers might
- * be waiting in a spin-loop rather than the conditional variable.
- */
- pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
- #if PTHREADPOOL_USE_FUTEX
- /* Wake up the threads */
- futex_wake_all(&threadpool->command);
- #else
- /* Unlock the command variables before waking up the threads for better performance */
- pthread_mutex_unlock(&threadpool->command_mutex);
-
- /* Wake up the threads */
- pthread_cond_broadcast(&threadpool->command_condvar);
- #endif
-
- /* Save and modify FPU denormals control, if needed */
- struct fpu_state saved_fpu_state = { 0 };
- if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
- saved_fpu_state = get_fpu_state();
- disable_fpu_denormals();
- }
-
- /* Do computations as worker #0 */
- thread_parallelize_1d(threadpool, &threadpool->threads[0]);
-
- /* Restore FPU denormals control, if needed */
- if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
- set_fpu_state(saved_fpu_state);
- }
-
- /* Wait until the threads finish computation */
- wait_worker_threads(threadpool);
-
- /* Make changes by other threads visible to this thread */
- pthreadpool_fence_acquire();
-
- /* Unprotect the global threadpool structures */
- pthread_mutex_unlock(&threadpool->execution_mutex);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) task, argument, range, flags);
}
}
@@ -679,7 +708,9 @@ void pthreadpool_parallelize_1d_tile_1d(
.range = range,
.tile = tile
};
- pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_1d_tile_1d, &context, tile_range, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_1d_tile_1d, &context, tile_range, flags);
}
}
@@ -725,7 +756,9 @@ void pthreadpool_parallelize_2d(
.argument = argument,
.range_j = fxdiv_init_size_t(range_j)
};
- pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d, &context, range_i * range_j, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_2d, &context, range_i * range_j, flags);
}
}
@@ -783,7 +816,9 @@ void pthreadpool_parallelize_2d_tile_1d(
.range_j = range_j,
.tile_j = tile_j
};
- pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_1d, &context, range_i * tile_range_j, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_2d_tile_1d, &context, range_i * tile_range_j, flags);
}
}
@@ -847,7 +882,9 @@ void pthreadpool_parallelize_2d_tile_2d(
.tile_i = tile_i,
.tile_j = tile_j
};
- pthreadpool_parallelize_1d(threadpool, (pthreadpool_task_1d_t) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_2d_tile_2d, &context, tile_range_i * tile_range_j, flags);
}
}
@@ -919,9 +956,9 @@ void pthreadpool_parallelize_3d_tile_2d(
.tile_j = tile_j,
.tile_k = tile_k
};
- pthreadpool_parallelize_1d(threadpool,
- (pthreadpool_task_1d_t) compute_3d_tile_2d, &context,
- range_i * tile_range_j * tile_range_k, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_3d_tile_2d, &context, range_i * tile_range_j * tile_range_k, flags);
}
}
@@ -1002,9 +1039,9 @@ void pthreadpool_parallelize_4d_tile_2d(
.tile_k = tile_k,
.tile_l = tile_l
};
- pthreadpool_parallelize_1d(threadpool,
- (pthreadpool_task_1d_t) compute_4d_tile_2d, &context,
- range_i * range_j * tile_range_k * tile_range_l, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_4d_tile_2d, &context, range_i * range_j * tile_range_k * tile_range_l, flags);
}
}
@@ -1094,9 +1131,9 @@ void pthreadpool_parallelize_5d_tile_2d(
.tile_l = tile_l,
.tile_m = tile_m,
};
- pthreadpool_parallelize_1d(threadpool,
- (pthreadpool_task_1d_t) compute_5d_tile_2d, &context,
- range_i * range_j * range_k * tile_range_l * tile_range_m, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_5d_tile_2d, &context, range_i * range_j * range_k * tile_range_l * tile_range_m, flags);
}
}
@@ -1194,9 +1231,9 @@ void pthreadpool_parallelize_6d_tile_2d(
.tile_m = tile_m,
.tile_n = tile_n,
};
- pthreadpool_parallelize_1d(threadpool,
- (pthreadpool_task_1d_t) compute_6d_tile_2d, &context,
- range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags);
+ pthreadpool_parallelize(
+ threadpool, &thread_parallelize_1d,
+ (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags);
}
}