diff options
author | Marat Dukhan <maratek@google.com> | 2020-04-14 14:33:30 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2020-04-14 14:33:30 -0700 |
commit | a61ed1ab70389c62f6f699ca1a30a2421d3ea594 (patch) | |
tree | 878ae2bc4ef19eab2d68f5209de60ff8acc734d7 /src/pthreads.c | |
parent | bfa3b9ce6cb71dc8b792e39d24717320a4f92572 (diff) | |
download | pthreadpool-a61ed1ab70389c62f6f699ca1a30a2421d3ea594.tar.gz |
Use load-acquire + store-release on synchronization variables
Synchronization using relaxed atomics + fences instead of LA/SR violates
C11/C++11 memory model and cause failures under thread sanitizer
Diffstat (limited to 'src/pthreads.c')
-rw-r--r-- | src/pthreads.c | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/src/pthreads.c b/src/pthreads.c index a7e4619..2d945a0 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -80,12 +80,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); + pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { + if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -95,12 +95,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -112,12 +112,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { pthreadpool_fence_acquire(); #if PTHREADPOOL_USE_FUTEX - has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -126,12 +126,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { + while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { + while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); @@ -143,7 +143,7 @@ static uint32_t wait_for_new_command( uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -154,7 +154,7 @@ static uint32_t wait_for_new_command( /* This fence serves as a sleep instruction */ pthreadpool_fence_acquire(); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -165,13 +165,13 @@ static uint32_t wait_for_new_command( #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&threadpool->command); } while (command == last_command); #else /* Lock the command mutex */ pthread_mutex_lock(&threadpool->command_mutex); /* Read the command */ - while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { + while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) { /* Wait for new command */ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); } @@ -280,7 +280,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { |