aboutsummaryrefslogtreecommitdiff
path: root/src/pthreads.c
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2020-04-14 14:33:30 -0700
committerMarat Dukhan <maratek@google.com>2020-04-14 14:33:30 -0700
commita61ed1ab70389c62f6f699ca1a30a2421d3ea594 (patch)
tree878ae2bc4ef19eab2d68f5209de60ff8acc734d7 /src/pthreads.c
parentbfa3b9ce6cb71dc8b792e39d24717320a4f92572 (diff)
downloadpthreadpool-a61ed1ab70389c62f6f699ca1a30a2421d3ea594.tar.gz
Use load-acquire + store-release on synchronization variables
Synchronization using relaxed atomics + fences instead of LA/SR violates C11/C++11 memory model and cause failures under thread sanitizer
Diffstat (limited to 'src/pthreads.c')
-rw-r--r--src/pthreads.c26
1 files changed, 13 insertions, 13 deletions
diff --git a/src/pthreads.c b/src/pthreads.c
index a7e4619..2d945a0 100644
--- a/src/pthreads.c
+++ b/src/pthreads.c
@@ -80,12 +80,12 @@
static void checkin_worker_thread(struct pthreadpool* threadpool) {
#if PTHREADPOOL_USE_FUTEX
if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
- pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0);
+ pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
futex_wake_all(&threadpool->has_active_threads);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
+ if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
pthread_cond_signal(&threadpool->completion_condvar);
}
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -95,12 +95,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) {
static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Initial check */
#if PTHREADPOOL_USE_FUTEX
- uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
+ uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -112,12 +112,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
pthreadpool_fence_acquire();
#if PTHREADPOOL_USE_FUTEX
- has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
+ has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -126,12 +126,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Fall-back to mutex/futex wait */
#if PTHREADPOOL_USE_FUTEX
- while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) {
+ while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
futex_wait(&threadpool->has_active_threads, 1);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) {
+ while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
};
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -143,7 +143,7 @@ static uint32_t wait_for_new_command(
uint32_t last_command,
uint32_t last_flags)
{
- uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -154,7 +154,7 @@ static uint32_t wait_for_new_command(
/* This fence serves as a sleep instruction */
pthreadpool_fence_acquire();
- command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -165,13 +165,13 @@ static uint32_t wait_for_new_command(
#if PTHREADPOOL_USE_FUTEX
do {
futex_wait(&threadpool->command, last_command);
- command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
} while (command == last_command);
#else
/* Lock the command mutex */
pthread_mutex_lock(&threadpool->command_mutex);
/* Read the command */
- while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) {
+ while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
/* Wait for new command */
pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
}
@@ -280,7 +280,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
#if PTHREADPOOL_USE_FUTEX
pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
#endif
- pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
for (size_t tid = 1; tid < threads_count; tid++) {