aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2020-04-14 14:33:30 -0700
committerMarat Dukhan <maratek@google.com>2020-04-14 14:33:30 -0700
commita61ed1ab70389c62f6f699ca1a30a2421d3ea594 (patch)
tree878ae2bc4ef19eab2d68f5209de60ff8acc734d7 /src
parentbfa3b9ce6cb71dc8b792e39d24717320a4f92572 (diff)
downloadpthreadpool-a61ed1ab70389c62f6f699ca1a30a2421d3ea594.tar.gz
Use load-acquire + store-release on synchronization variables
Synchronization using relaxed atomics + fences instead of LA/SR violates C11/C++11 memory model and cause failures under thread sanitizer
Diffstat (limited to 'src')
-rw-r--r--src/pthreads.c26
-rw-r--r--src/threadpool-atomics.h132
-rw-r--r--src/windows.c12
3 files changed, 149 insertions, 21 deletions
diff --git a/src/pthreads.c b/src/pthreads.c
index a7e4619..2d945a0 100644
--- a/src/pthreads.c
+++ b/src/pthreads.c
@@ -80,12 +80,12 @@
static void checkin_worker_thread(struct pthreadpool* threadpool) {
#if PTHREADPOOL_USE_FUTEX
if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
- pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0);
+ pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
futex_wake_all(&threadpool->has_active_threads);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
+ if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
pthread_cond_signal(&threadpool->completion_condvar);
}
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -95,12 +95,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) {
static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Initial check */
#if PTHREADPOOL_USE_FUTEX
- uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
+ uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -112,12 +112,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
pthreadpool_fence_acquire();
#if PTHREADPOOL_USE_FUTEX
- has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads);
+ has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
if (has_active_threads == 0) {
return;
}
#else
- active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -126,12 +126,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) {
/* Fall-back to mutex/futex wait */
#if PTHREADPOOL_USE_FUTEX
- while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) {
+ while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
futex_wait(&threadpool->has_active_threads, 1);
}
#else
pthread_mutex_lock(&threadpool->completion_mutex);
- while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) {
+ while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
};
pthread_mutex_unlock(&threadpool->completion_mutex);
@@ -143,7 +143,7 @@ static uint32_t wait_for_new_command(
uint32_t last_command,
uint32_t last_flags)
{
- uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -154,7 +154,7 @@ static uint32_t wait_for_new_command(
/* This fence serves as a sleep instruction */
pthreadpool_fence_acquire();
- command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -165,13 +165,13 @@ static uint32_t wait_for_new_command(
#if PTHREADPOOL_USE_FUTEX
do {
futex_wait(&threadpool->command, last_command);
- command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
} while (command == last_command);
#else
/* Lock the command mutex */
pthread_mutex_lock(&threadpool->command_mutex);
/* Read the command */
- while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) {
+ while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
/* Wait for new command */
pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
}
@@ -280,7 +280,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
#if PTHREADPOOL_USE_FUTEX
pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
#endif
- pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
for (size_t tid = 1; tid < threads_count; tid++) {
diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h
index e2fbe6a..2b46c9a 100644
--- a/src/threadpool-atomics.h
+++ b/src/threadpool-atomics.h
@@ -43,6 +43,18 @@
return __c11_atomic_load(address, __ATOMIC_RELAXED);
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return __c11_atomic_load(address, __ATOMIC_ACQUIRE);
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return __c11_atomic_load(address, __ATOMIC_ACQUIRE);
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -84,6 +96,12 @@
return __c11_atomic_fetch_sub(address, 1, __ATOMIC_RELAXED) - 1;
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return __c11_atomic_fetch_sub(address, 1, __ATOMIC_RELEASE) - 1;
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
@@ -128,6 +146,24 @@
return *address;
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ /* x86-64 loads always have acquire semantics; use only a compiler barrier */
+ const uint32_t value = *address;
+ _ReadBarrier();
+ return value;
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ /* x86-64 loads always have acquire semantics; use only a compiler barrier */
+ const size_t value = *address;
+ _ReadBarrier();
+ return value;
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -173,6 +209,12 @@
return (size_t) _InterlockedDecrement64((volatile __int64*) address);
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return (size_t) _InterlockedDecrement64((volatile __int64*) address);
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
@@ -221,6 +263,24 @@
return *address;
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ /* x86 loads always have acquire semantics; use only a compiler barrier */
+ const uint32_t value = *address;
+ _ReadBarrier();
+ return value;
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ /* x86 loads always have acquire semantics; use only a compiler barrier */
+ const size_t value = *address;
+ _ReadBarrier();
+ return value;
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -246,7 +306,8 @@
pthreadpool_atomic_uint32_t* address,
uint32_t value)
{
- /* x86 stores always have release semantics */
+ /* x86 stores always have release semantics; use only a compiler barrier */
+ _WriteBarrier();
*address = value;
}
@@ -254,7 +315,8 @@
pthreadpool_atomic_size_t* address,
size_t value)
{
- /* x86 stores always have release semantics */
+ /* x86 stores always have release semantics; use only a compiler barrier */
+ _WriteBarrier();
*address = value;
}
@@ -264,6 +326,12 @@
return (size_t) _InterlockedDecrement((volatile long*) address);
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return (size_t) _InterlockedDecrement((volatile long*) address);
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
@@ -310,6 +378,18 @@
return (void*) __iso_volatile_load64((const volatile __int64*) address);
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return (uint32_t) __ldar32((volatile unsigned __int32*) address);
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return (size_t) __ldar64((volatile unsigned __int64*) address);
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -353,6 +433,12 @@
return (size_t) _InterlockedDecrement64_nf((volatile __int64*) address);
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return (size_t) _InterlockedDecrement64_rel((volatile __int64*) address);
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
@@ -401,6 +487,24 @@
return (void*) __iso_volatile_load32((const volatile __int32*) address);
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ const uint32_t value = (uint32_t) __iso_volatile_load32((const volatile __int32*) address);
+ __dmb(_ARM_BARRIER_ISH);
+ _ReadBarrier();
+ return value;
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ const size_t value = (size_t) __iso_volatile_load32((const volatile __int32*) address);
+ __dmb(_ARM_BARRIER_ISH);
+ _ReadBarrier();
+ return value;
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -446,6 +550,12 @@
return (size_t) _InterlockedDecrement_nf((volatile long*) address);
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return (size_t) _InterlockedDecrement_rel((volatile long*) address);
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
@@ -496,6 +606,18 @@
return atomic_load_explicit(address, memory_order_relaxed);
}
+ static inline uint32_t pthreadpool_load_acquire_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return atomic_load_explicit(address, memory_order_acquire);
+ }
+
+ static inline size_t pthreadpool_load_acquire_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return atomic_load_explicit(address, memory_order_acquire);
+ }
+
static inline void pthreadpool_store_relaxed_uint32_t(
pthreadpool_atomic_uint32_t* address,
uint32_t value)
@@ -537,6 +659,12 @@
return atomic_fetch_sub_explicit(address, 1, memory_order_relaxed) - 1;
}
+ static inline size_t pthreadpool_decrement_fetch_release_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return atomic_fetch_sub_explicit(address, 1, memory_order_release) - 1;
+ }
+
static inline bool pthreadpool_try_decrement_relaxed_size_t(
pthreadpool_atomic_size_t* value)
{
diff --git a/src/windows.c b/src/windows.c
index e68b250..19e534f 100644
--- a/src/windows.c
+++ b/src/windows.c
@@ -21,14 +21,14 @@
static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
- if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
+ if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
SetEvent(threadpool->completion_event[event_index]);
}
}
static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
/* Initial check */
- size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -38,7 +38,7 @@ static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_i
/* This fence serves as a sleep instruction */
pthreadpool_fence_acquire();
- active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
if (active_threads == 0) {
return;
}
@@ -55,7 +55,7 @@ static uint32_t wait_for_new_command(
uint32_t last_command,
uint32_t last_flags)
{
- uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -66,7 +66,7 @@ static uint32_t wait_for_new_command(
/* This fence serves as a sleep instruction */
pthreadpool_fence_acquire();
- command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
if (command != last_command) {
return command;
}
@@ -170,7 +170,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
NULL /* name */);
}
- pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
for (size_t tid = 1; tid < threads_count; tid++) {