aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2020-03-23 01:35:09 -0700
committerMarat Dukhan <maratek@google.com>2020-03-23 01:35:09 -0700
commitcb207d8b505e495d578bf6b9fe538b7f291a33a2 (patch)
tree5e110b296f6e85ffe2b4b8524769f65000bb687e /src
parent0f57821e68a56b4ba78fd7f4c7d1a494286497aa (diff)
downloadpthreadpool-cb207d8b505e495d578bf6b9fe538b7f291a33a2.tar.gz
Support WebAssembly+Threads build
- Abstract away atomic operations and data type from the source file - Polyfill atomic operations for Clang targeting WAsm+Threads - Set Emscripten link options for WebAssembly+Threads builds
Diffstat (limited to 'src')
-rw-r--r--src/threadpool-atomics.h178
-rw-r--r--src/threadpool-pthreads.c125
2 files changed, 240 insertions, 63 deletions
diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h
new file mode 100644
index 0000000..92fcd8d
--- /dev/null
+++ b/src/threadpool-atomics.h
@@ -0,0 +1,178 @@
+#pragma once
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#if defined(__wasm__) && defined(__EMSCRIPTEN_PTHREADS__) && defined(__clang__)
+ /*
+ * Clang for WebAssembly target lacks stdatomic.h header,
+ * even though it supports the necessary low-level intrinsics.
+ * Thus, we implement pthreadpool atomic functions on top of
+ * low-level Clang-specific interfaces for this target.
+ */
+
+ typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t;
+ typedef _Atomic(size_t) pthreadpool_atomic_size_t;
+ typedef _Atomic(void*) pthreadpool_atomic_void_p;
+
+ static inline uint32_t pthreadpool_load_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return __c11_atomic_load(address, __ATOMIC_RELAXED);
+ }
+
+ static inline size_t pthreadpool_load_relaxed_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return __c11_atomic_load(address, __ATOMIC_RELAXED);
+ }
+
+ static inline void* pthreadpool_load_relaxed_void_p(
+ pthreadpool_atomic_void_p* address)
+ {
+ return __c11_atomic_load(address, __ATOMIC_RELAXED);
+ }
+
+ static inline void pthreadpool_store_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ __c11_atomic_store(address, value, __ATOMIC_RELAXED);
+ }
+
+ static inline void pthreadpool_store_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ __c11_atomic_store(address, value, __ATOMIC_RELAXED);
+ }
+
+ static inline void pthreadpool_store_relaxed_void_p(
+ pthreadpool_atomic_void_p* address,
+ void* value)
+ {
+ __c11_atomic_store(address, value, __ATOMIC_RELAXED);
+ }
+
+ static inline void pthreadpool_store_release_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ __c11_atomic_store(address, value, __ATOMIC_RELEASE);
+ }
+
+ static inline void pthreadpool_store_release_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ __c11_atomic_store(address, value, __ATOMIC_RELEASE);
+ }
+
+ static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ uint32_t decrement)
+ {
+ return __c11_atomic_fetch_sub(address, decrement, __ATOMIC_RELAXED);
+ }
+
+ static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t* expected_value,
+ size_t new_value)
+ {
+ return __c11_atomic_compare_exchange_weak(
+ address, expected_value, new_value, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
+ }
+
+ static inline void pthreadpool_fence_acquire() {
+ __c11_atomic_thread_fence(__ATOMIC_ACQUIRE);
+ }
+
+ static inline void pthreadpool_fence_release() {
+ __c11_atomic_thread_fence(__ATOMIC_RELEASE);
+ }
+#else
+ #include <stdatomic.h>
+
+ typedef _Atomic(uint32_t) pthreadpool_atomic_uint32_t;
+ typedef _Atomic(size_t) pthreadpool_atomic_size_t;
+ typedef _Atomic(void*) pthreadpool_atomic_void_p;
+
+ static inline uint32_t pthreadpool_load_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return atomic_load_explicit(address, memory_order_relaxed);
+ }
+
+ static inline size_t pthreadpool_load_relaxed_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return atomic_load_explicit(address, memory_order_relaxed);
+ }
+
+ static inline void* pthreadpool_load_relaxed_void_p(
+ pthreadpool_atomic_void_p* address)
+ {
+ return atomic_load_explicit(address, memory_order_relaxed);
+ }
+
+ static inline void pthreadpool_store_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ atomic_store_explicit(address, value, memory_order_relaxed);
+ }
+
+ static inline void pthreadpool_store_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ atomic_store_explicit(address, value, memory_order_relaxed);
+ }
+
+ static inline void pthreadpool_store_relaxed_void_p(
+ pthreadpool_atomic_void_p* address,
+ void* value)
+ {
+ atomic_store_explicit(address, value, memory_order_relaxed);
+ }
+
+ static inline void pthreadpool_store_release_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ atomic_store_explicit(address, value, memory_order_release);
+ }
+
+ static inline void pthreadpool_store_release_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ atomic_store_explicit(address, value, memory_order_release);
+ }
+
+ static inline uint32_t pthreadpool_fetch_sub_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ uint32_t decrement)
+ {
+ return atomic_fetch_sub_explicit(address, decrement, memory_order_relaxed);
+ }
+
+ static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t* expected_value,
+ size_t new_value)
+ {
+ return atomic_compare_exchange_weak_explicit(
+ address, expected_value, new_value, memory_order_relaxed, memory_order_relaxed);
+ }
+
+ static inline void pthreadpool_fence_acquire() {
+ atomic_thread_fence(memory_order_acquire);
+ }
+
+ static inline void pthreadpool_fence_release() {
+ atomic_thread_fence(memory_order_release);
+ }
+#endif
diff --git a/src/threadpool-pthreads.c b/src/threadpool-pthreads.c
index 96aa72a..7cc190d 100644
--- a/src/threadpool-pthreads.c
+++ b/src/threadpool-pthreads.c
@@ -1,5 +1,4 @@
/* Standard C headers */
-#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
@@ -42,6 +41,7 @@
/* Internal headers */
#include "threadpool-utils.h"
+#include "threadpool-atomics.h"
/* Number of iterations in spin-wait loop before going into futex/mutex wait */
#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000
@@ -86,11 +86,11 @@ static inline size_t min(size_t a, size_t b) {
#if PTHREADPOOL_USE_FUTEX
#if defined(__linux__)
- static int futex_wait(_Atomic uint32_t* address, uint32_t value) {
+ static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
}
- static int futex_wake_all(_Atomic uint32_t* address) {
+ static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
}
#else
@@ -111,19 +111,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
* Index of the first element in the work range.
* Before processing a new element the owning worker thread increments this value.
*/
- atomic_size_t range_start;
+ pthreadpool_atomic_size_t range_start;
/**
* Index of the element after the last element of the work range.
* Before processing a new element the stealing worker thread decrements this value.
*/
- atomic_size_t range_end;
+ pthreadpool_atomic_size_t range_end;
/**
* The number of elements in the work range.
* Due to race conditions range_length <= range_end - range_start.
* The owning worker thread must decrement this value before incrementing @a range_start.
* The stealing worker thread must decrement this value before decrementing @a range_end.
*/
- atomic_size_t range_length;
+ pthreadpool_atomic_size_t range_length;
/**
* Thread number in the 0..threads_count-1 range.
*/
@@ -145,7 +145,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
/**
* The number of threads that are processing an operation.
*/
- atomic_size_t active_threads;
+ pthreadpool_atomic_size_t active_threads;
#if PTHREADPOOL_USE_FUTEX
/**
* Indicates if there are active threads.
@@ -153,24 +153,24 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
* - has_active_threads == 0 if active_threads == 0
* - has_active_threads == 1 if active_threads != 0
*/
- _Atomic uint32_t has_active_threads;
+ pthreadpool_atomic_uint32_t has_active_threads;
#endif
/**
* The last command submitted to the thread pool.
*/
- _Atomic uint32_t command;
+ pthreadpool_atomic_uint32_t command;
/**
* The function to call for each item.
*/
- void *_Atomic task;
+ pthreadpool_atomic_void_p task;
/**
* The first argument to the item processing function.
*/
- void *_Atomic argument;
+ pthreadpool_atomic_void_p argument;
/**
* Copy of the flags passed to parallelization function.
*/
- _Atomic uint32_t flags;
+ pthreadpool_atomic_uint32_t flags;
/**
* Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
*/
@@ -207,13 +207,13 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ
static void checkin_worker_thread(struct pthreadpool* threadpool) {
#if PTHREADPOOL_USE_FUTEX
- if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) {
- atomic_store_explicit(&threadpool->has_active_threads, 0, memory_order_relaxed);
+ if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) {
+ pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0);
futex_wake_all(&threadpool->has_active_threads);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- if (atomic_fetch_sub_explicit(&threadpool->active_threads, 1, memory_order_relaxed) == 1) {
+ if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) {
pthread_cond_signal(&threadpool->completion_condvar);
}
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -223,12 +223,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) {
static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Initial check */
#if PTHREADPOOL_USE_FUTEX
- uint32_t has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed);
+ uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- size_t active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed);
+ size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -237,15 +237,15 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Spin-wait */
for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
/* This fence serves as a sleep instruction */
- atomic_thread_fence(memory_order_acquire);
+ pthreadpool_fence_acquire();
#if PTHREADPOOL_USE_FUTEX
- has_active_threads = atomic_load_explicit(&threadpool->has_active_threads, memory_order_relaxed);
+ has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- active_threads = atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed);
+ active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -254,26 +254,24 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Fall-back to mutex/futex wait */
#if PTHREADPOOL_USE_FUTEX
- while ((has_active_threads = atomic_load(&threadpool->has_active_threads)) != 0) {
+ while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) {
futex_wait(&threadpool->has_active_threads, 1);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- while (atomic_load_explicit(&threadpool->active_threads, memory_order_relaxed) != 0) {
+ while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) {
pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
};
pthread_mutex_unlock(&threadpool->completion_mutex);
#endif
}
-inline static bool atomic_decrement(atomic_size_t* value) {
- size_t actual_value = atomic_load_explicit(value, memory_order_relaxed);
+inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) {
+ size_t actual_value = pthreadpool_load_relaxed_size_t(value);
if (actual_value == 0) {
return false;
}
- while (!atomic_compare_exchange_weak_explicit(
- value, &actual_value, actual_value - 1, memory_order_relaxed, memory_order_relaxed))
- {
+ while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) {
if (actual_value == 0) {
return false;
}
@@ -291,10 +289,10 @@ inline static size_t modulo_decrement(uint32_t i, uint32_t n) {
}
static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) {
- const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) atomic_load_explicit(&threadpool->task, memory_order_relaxed);
- void *const argument = atomic_load_explicit(&threadpool->argument, memory_order_relaxed);
+ const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
+ void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);
/* Process thread's own range of items */
- size_t range_start = atomic_load_explicit(&thread->range_start, memory_order_relaxed);
+ size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start);
while (atomic_decrement(&thread->range_length)) {
task(argument, range_start++);
}
@@ -308,13 +306,13 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_
{
struct thread_info* other_thread = &threadpool->threads[tid];
while (atomic_decrement(&other_thread->range_length)) {
- const size_t item_id = atomic_fetch_sub_explicit(&other_thread->range_end, 1, memory_order_relaxed) - 1;
+ const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1;
task(argument, item_id);
}
}
/* Make changes by this thread visible to other threads */
- atomic_thread_fence(memory_order_release);
+ pthreadpool_fence_release();
}
static uint32_t wait_for_new_command(
@@ -322,7 +320,7 @@ static uint32_t wait_for_new_command(
uint32_t last_command,
uint32_t last_flags)
{
- uint32_t command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -331,9 +329,9 @@ static uint32_t wait_for_new_command(
/* Spin-wait loop */
for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
/* This fence serves as a sleep instruction */
- atomic_thread_fence(memory_order_acquire);
+ pthreadpool_fence_acquire();
- command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -344,13 +342,13 @@ static uint32_t wait_for_new_command(
#if PTHREADPOOL_USE_FUTEX
do {
futex_wait(&threadpool->command, last_command);
- command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
} while (command == last_command);
#else
/* Lock the command mutex */
pthread_mutex_lock(&threadpool->command_mutex);
/* Read the command */
- while ((command = atomic_load_explicit(&threadpool->command, memory_order_relaxed)) == last_command) {
+ while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) {
/* Wait for new command */
pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
}
@@ -373,9 +371,9 @@ static void* thread_main(void* arg) {
/* Monitor new commands and act accordingly */
for (;;) {
uint32_t command = wait_for_new_command(threadpool, last_command, flags);
- atomic_thread_fence(memory_order_acquire);
+ pthreadpool_fence_acquire();
- flags = atomic_load_explicit(&threadpool->flags, memory_order_relaxed);
+ flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
/* Process command */
switch (command & THREADPOOL_COMMAND_MASK) {
@@ -435,6 +433,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
if (threads_count == 0) {
#if defined(_SC_NPROCESSORS_ONLN)
threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
+ #if defined(__EMSCRIPTEN_PTHREADS__)
+ /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */
+ if (threads_count >= 8) {
+ threads_count = 8;
+ }
+ #endif
#elif defined(_WIN32)
SYSTEM_INFO system_info;
ZeroMemory(&system_info, sizeof(system_info));
@@ -465,10 +469,9 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
#endif
#if PTHREADPOOL_USE_FUTEX
- atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
#endif
- atomic_store_explicit(
- &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_release);
+ pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
for (size_t tid = 1; tid < threads_count; tid++) {
@@ -519,16 +522,15 @@ void pthreadpool_parallelize_1d(
#endif
/* Setup global arguments */
- atomic_store_explicit(&threadpool->task, task, memory_order_relaxed);
- atomic_store_explicit(&threadpool->argument, argument, memory_order_relaxed);
- atomic_store_explicit(&threadpool->flags, flags, memory_order_relaxed);
+ pthreadpool_store_relaxed_void_p(&threadpool->task, task);
+ pthreadpool_store_relaxed_void_p(&threadpool->argument, argument);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const size_t threads_count = threadpool->threads_count;
- atomic_store_explicit(
- &threadpool->active_threads, threads_count - 1 /* caller thread */, memory_order_relaxed);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
#if PTHREADPOOL_USE_FUTEX
- atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
#endif
/* Spread the work between threads */
@@ -536,9 +538,9 @@ void pthreadpool_parallelize_1d(
for (size_t tid = 0; tid < threads_count; tid++) {
struct thread_info* thread = &threadpool->threads[tid];
const size_t range_end = multiply_divide(range, tid + 1, threads_count);
- atomic_store_explicit(&thread->range_start, range_start, memory_order_relaxed);
- atomic_store_explicit(&thread->range_end, range_end, memory_order_relaxed);
- atomic_store_explicit(&thread->range_length, range_end - range_start, memory_order_relaxed);
+ pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
+ pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
+ pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start);
/* The next range starts where the previous ended */
range_start = range_end;
@@ -551,7 +553,7 @@ void pthreadpool_parallelize_1d(
* to ensure the unmasked command is different then the last command, because worker threads
* monitor for change in the unmasked command.
*/
- const uint32_t old_command = atomic_load_explicit(&threadpool->command, memory_order_relaxed);
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_compute_1d;
#if PTHREADPOOL_USE_FUTEX
@@ -559,13 +561,13 @@ void pthreadpool_parallelize_1d(
* Store the command with release semantics to guarantee that if a worker thread observes
* the new command value, it also observes the updated command parameters.
*/
- atomic_store_explicit(&threadpool->command, new_command, memory_order_release);
+ pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
/* Wake up the threads */
futex_wake_all(&threadpool->command);
#else
/* Relaxed semantics because pthread_mutex_unlock acts as a release fence */
- atomic_store_explicit(&threadpool->command, new_command, memory_order_relaxed);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->command, new_command);
/* Unlock the command variables before waking up the threads for better performance */
pthread_mutex_unlock(&threadpool->command_mutex);
@@ -593,7 +595,7 @@ void pthreadpool_parallelize_1d(
wait_worker_threads(threadpool);
/* Make changes by other threads visible to this thread */
- atomic_thread_fence(memory_order_acquire);
+ pthreadpool_fence_acquire();
/* Unprotect the global threadpool structures */
pthread_mutex_unlock(&threadpool->execution_mutex);
@@ -1169,11 +1171,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) {
if (threadpool != NULL) {
if (threadpool->threads_count > 1) {
#if PTHREADPOOL_USE_FUTEX
- atomic_store_explicit(
- &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed);
- atomic_store_explicit(&threadpool->has_active_threads, 1, memory_order_relaxed);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
- atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_release);
+ pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
/* Wake up worker threads */
futex_wake_all(&threadpool->command);
@@ -1181,12 +1182,10 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) {
/* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
pthread_mutex_lock(&threadpool->command_mutex);
- /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
- atomic_store_explicit(
- &threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */, memory_order_relaxed);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threadpool->threads_count - 1 /* caller thread */);
/* Update the threadpool command. */
- atomic_store_explicit(&threadpool->command, threadpool_command_shutdown, memory_order_relaxed);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->command, threadpool_command_shutdown);
/* Wake up worker threads */
pthread_cond_broadcast(&threadpool->command_condvar);