diff options
author | Marat Dukhan <maratek@google.com> | 2020-03-23 01:35:09 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2020-03-23 01:35:09 -0700 |
commit | cb207d8b505e495d578bf6b9fe538b7f291a33a2 (patch) | |
tree | 5e110b296f6e85ffe2b4b8524769f65000bb687e /src | |
parent | 0f57821e68a56b4ba78fd7f4c7d1a494286497aa (diff) | |
download | pthreadpool-cb207d8b505e495d578bf6b9fe538b7f291a33a2.tar.gz |
Support WebAssembly+Threads build
- Abstract away atomic operations and data type from the source file
- Polyfill atomic operations for Clang targeting WAsm+Threads
- Set Emscripten link options for WebAssembly+Threads builds
Diffstat (limited to 'src')
-rw-r--r-- | src/threadpool-atomics.h | 178 | ||||
-rw-r--r-- | src/threadpool-pthreads.c | 125 |
2 files changed, 240 insertions, 63 deletions
diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h new file mode 100644 index 0000000..92fcd8d --- /dev/null +++ b/src/threadpool-atomics.h @@ -0,0 +1,178 @@ +#pragma once + +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> + +#if defined(__wasm__) && defined(__EMSCRIPTEN_PTHREADS__) && defined(__clang__) + /* + * Clang for WebAssembly target lacks stdatomic.h header, + * even though it supports the necessary low-level intrinsics. + * Thus, we implement pthreadpool atomic functions on top of + * low-level Clang-specific interfaces for this target. + */ + + typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t; + typedef _Atomic(size_t) pthreadpool_atomic_size_t; + typedef _Atomic(void*) pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return __c11_atomic_load(address, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + __c11_atomic_store(address, value, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELEASE); + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + __c11_atomic_store(address, value, __ATOMIC_RELEASE); + } + + static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + uint32_t decrement) + { + return __c11_atomic_fetch_sub(address, decrement, __ATOMIC_RELAXED); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + return __c11_atomic_compare_exchange_weak( + address, expected_value, new_value, __ATOMIC_RELAXED, __ATOMIC_RELAXED); + } + + static inline void pthreadpool_fence_acquire() { + __c11_atomic_thread_fence(__ATOMIC_ACQUIRE); + } + + static inline void pthreadpool_fence_release() { + __c11_atomic_thread_fence(__ATOMIC_RELEASE); + } +#else + #include <stdatomic.h> + + typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t; + typedef _Atomic(size_t) pthreadpool_atomic_size_t; + typedef _Atomic(void*) pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return atomic_load_explicit(address, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + atomic_store_explicit(address, value, memory_order_relaxed); + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + atomic_store_explicit(address, value, memory_order_release); + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + atomic_store_explicit(address, value, memory_order_release); + } + + static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + uint32_t decrement) + { + return atomic_fetch_sub_explicit(address, decrement, memory_order_relaxed); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + return atomic_compare_exchange_weak_explicit( + address, expected_value, new_value, memory_order_relaxed, memory_order_relaxed); + } + + static inline void pthreadpool_fence_acquire() { + atomic_thread_fence(memory_order_acquire); + } + + static inline void pthreadpool_fence_release() { + atomic_thread_fence(memory_order_release); + } +#endif diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c index 96aa72a..7cc190d 100644 --- a/src/threadpool-pthreads.c +++ b/src/threadpool-pthreads.c @@ -1,5 +1,4 @@ /* Standard C headers */ -#include <stdatomic.h> #include <stdbool.h> #include <stdint.h> #include <stdlib.h> @@ -42,6 +41,7 @@ /* Internal headers */ #include "threadpool-utils.h" +#include "threadpool-atomics.h" /* Number of iterations in spin-wait loop before going into futex/mutex wait */ #define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 @@ -86,11 +86,11 @@ static inline size_t min(size_t a, size_t b) { #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) - static int futex_wait(_Atomic uint32_t* address, uint32_t value) { + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); } - static int futex_wake_all(_Atomic uint32_t* address) { + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); } #else @@ -111,19 +111,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { * Index of the first element in the work range. * Before processing a new element the owning worker thread increments this value. */ - atomic_size_t range_start; + pthreadpool_atomic_size_t range_start; /** * Index of the element after the last element of the work range. * Before processing a new element the stealing worker thread decrements this value. */ - atomic_size_t range_end; + pthreadpool_atomic_size_t range_end; /** * The number of elements in the work range. * Due to race conditions range_length <= range_end - range_start. * The owning worker thread must decrement this value before incrementing @a range_start. * The stealing worker thread must decrement this value before decrementing @a range_end. */ - atomic_size_t range_length; + pthreadpool_atomic_size_t range_length; /** * Thread number in the 0..threads_count-1 range. */ @@ -145,7 +145,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { /** * The number of threads that are processing an operation. */ - atomic_size_t active_threads; + pthreadpool_atomic_size_t active_threads; #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -153,24 +153,24 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * - has_active_threads == 0 if active_threads == 0 * - has_active_threads == 1 if active_threads != 0 */ - _Atomic uint32_t has_active_threads; + pthreadpool_atomic_uint32_t has_active_threads; #endif /** * The last command submitted to the thread pool. */ - _Atomic uint32_t command; + pthreadpool_atomic_uint32_t command; /** * The function to call for each item. */ - void *_Atomic task; + pthreadpool_atomic_void_p task; /** * The first argument to the item processing function. */ - void *_Atomic argument; + pthreadpool_atomic_void_p argument; /** * Copy of the flags passed to parallelization function. */ - _Atomic uint32_t flags; + pthreadpool_atomic_uint32_t flags; /** * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. */ @@ -207,13 +207,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { - atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_relaxed); + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) { + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -223,12 +223,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -237,15 +237,15 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Spin-wait */ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); #if PTHREADPOOL_USE_FUTEX - has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed); + has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed); + active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -254,26 +254,24 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) { + while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) { + while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); #endif } -inline static bool atomic_decrement(atomic_size_t* value) { - size_t actual_value = atomic_load_explicit(value, memory_order_relaxed); +inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) { + size_t actual_value = pthreadpool_load_relaxed_size_t(value); if (actual_value == 0) { return false; } - while (!atomic_compare_exchange_weak_explicit( - value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed)) - { + while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { if (actual_value == 0) { return false; } @@ -291,10 +289,10 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) { } static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { - const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed); - void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed); + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); + void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); /* Process thread's own range of items */ - size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed); + size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); while (atomic_decrement(&thread->range_length)) { task(argument, range_start++); } @@ -308,13 +306,13 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ { struct thread_info* other_thread = &threadpool->threads[tid]; while (atomic_decrement(&other_thread->range_length)) { - const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1; + const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; task(argument, item_id); } } /* Make changes by this thread visible to other threads */ - atomic_thread_fence(memory_order_release); + pthreadpool_fence_release(); } static uint32_t wait_for_new_command( @@ -322,7 +320,7 @@ static uint32_t wait_for_new_command( uint32_t last_command, uint32_t last_flags) { - uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -331,9 +329,9 @@ static uint32_t wait_for_new_command( /* Spin-wait loop */ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { /* This fence serves as a sleep instruction */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -344,13 +342,13 @@ static uint32_t wait_for_new_command( #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); - command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); } while (command == last_command); #else /* Lock the command mutex */ pthread_mutex_lock(&threadpool->command_mutex); /* Read the command */ - while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) { + while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { /* Wait for new command */ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); } @@ -373,9 +371,9 @@ static void* thread_main(void* arg) { /* Monitor new commands and act accordingly */ for (;;) { uint32_t command = wait_for_new_command(threadpool, last_command, flags); - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); - flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed); + flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); /* Process command */ switch (command & THREADPOOL_COMMAND_MASK) { @@ -435,6 +433,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { if (threads_count == 0) { #if defined(_SC_NPROCESSORS_ONLN) threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(__EMSCRIPTEN_PTHREADS__) + /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ + if (threads_count >= 8) { + threads_count = 8; + } + #endif #elif defined(_WIN32) SYSTEM_INFO system_info; ZeroMemory(&system_info, sizeof(system_info)); @@ -465,10 +469,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #endif #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release); + pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { @@ -519,16 +522,15 @@ void pthreadpool_parallelize_1d( #endif /* Setup global arguments */ - atomic_store_explicit(&threadpool->task, task, memory_order_relaxed); - atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed); - atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, argument); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ const size_t threads_count = threadpool->threads_count; - atomic_store_explicit( - &threadpool->active_threads, threads_count - 1 /* caller thread */, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif /* Spread the work between threads */ @@ -536,9 +538,9 @@ void pthreadpool_parallelize_1d( for (size_t tid = 0; tid < threads_count; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_end = multiply_divide(range, tid + 1, threads_count); - atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed); - atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed); - atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); /* The next range starts where the previous ended */ range_start = range_end; @@ -551,7 +553,7 @@ void pthreadpool_parallelize_1d( * to ensure the unmasked command is different then the last command, because worker threads * monitor for change in the unmasked command. */ - const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed); + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d; #if PTHREADPOOL_USE_FUTEX @@ -559,13 +561,13 @@ void pthreadpool_parallelize_1d( * Store the command with release semantics to guarantee that if a worker thread observes * the new command value, it also observes the updated command parameters. */ - atomic_store_explicit(&threadpool->command, new_command, memory_order_release); + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); /* Wake up the threads */ futex_wake_all(&threadpool->command); #else /* Relaxed semantics because pthread_mutex_unlock acts as a release fence */ - atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->command, new_command); /* Unlock the command variables before waking up the threads for better performance */ pthread_mutex_unlock(&threadpool->command_mutex); @@ -593,7 +595,7 @@ void pthreadpool_parallelize_1d( wait_worker_threads(threadpool); /* Make changes by other threads visible to this thread */ - atomic_thread_fence(memory_order_acquire); + pthreadpool_fence_acquire(); /* Unprotect the global threadpool structures */ pthread_mutex_unlock(&threadpool->execution_mutex); @@ -1169,11 +1171,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { if (threadpool->threads_count > 1) { #if PTHREADPOOL_USE_FUTEX - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); - atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release); + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ futex_wake_all(&threadpool->command); @@ -1181,12 +1182,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->command_mutex); - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - atomic_store_explicit( - &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */); /* Update the threadpool command. */ - atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_relaxed); + pthreadpool_store_relaxed_uint32_t(&threadpool->command, threadpool_command_shutdown); /* Wake up worker threads */ pthread_cond_broadcast(&threadpool->command_condvar); |