diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/pthreads.c | 26 | ||||
-rw-r--r-- | src/threadpool-atomics.h | 132 | ||||
-rw-r--r-- | src/windows.c | 12 |
3 files changed, 149 insertions, 21 deletions
diff --git a/src/pthreads.c b/src/pthreads.c index a7e4619..2d945a0 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -80,12 +80,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { #if PTHREADPOOL_USE_FUTEX if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); + pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0); futex_wake_all(&threadpool->has_active_threads); } #else pthread_mutex_lock(&threadpool->completion_mutex); - if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { + if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) { pthread_cond_signal(&threadpool->completion_condvar); } pthread_mutex_unlock(&threadpool->completion_mutex); @@ -95,12 +95,12 @@ static void checkin_worker_thread(struct pthreadpool* threadpool) { static void wait_worker_threads(struct pthreadpool* threadpool) { /* Initial check */ #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -112,12 +112,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { pthreadpool_fence_acquire(); #if PTHREADPOOL_USE_FUTEX - has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); if (has_active_threads == 0) { return; } #else - active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -126,12 +126,12 @@ static void wait_worker_threads(struct pthreadpool* threadpool) { /* Fall-back to mutex/futex wait */ #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { + while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) { futex_wait(&threadpool->has_active_threads, 1); } #else pthread_mutex_lock(&threadpool->completion_mutex); - while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { + while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) { pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); }; pthread_mutex_unlock(&threadpool->completion_mutex); @@ -143,7 +143,7 @@ static uint32_t wait_for_new_command( uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -154,7 +154,7 @@ static uint32_t wait_for_new_command( /* This fence serves as a sleep instruction */ pthreadpool_fence_acquire(); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -165,13 +165,13 @@ static uint32_t wait_for_new_command( #if PTHREADPOOL_USE_FUTEX do { futex_wait(&threadpool->command, last_command); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&threadpool->command); } while (command == last_command); #else /* Lock the command mutex */ pthread_mutex_lock(&threadpool->command_mutex); /* Read the command */ - while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { + while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) { /* Wait for new command */ pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); } @@ -280,7 +280,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h index e2fbe6a..2b46c9a 100644 --- a/src/threadpool-atomics.h +++ b/src/threadpool-atomics.h @@ -43,6 +43,18 @@ return __c11_atomic_load(address, __ATOMIC_RELAXED); } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return __c11_atomic_load(address, __ATOMIC_ACQUIRE); + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + return __c11_atomic_load(address, __ATOMIC_ACQUIRE); + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -84,6 +96,12 @@ return __c11_atomic_fetch_sub(address, 1, __ATOMIC_RELAXED) - 1; } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return __c11_atomic_fetch_sub(address, 1, __ATOMIC_RELEASE) - 1; + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { @@ -128,6 +146,24 @@ return *address; } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + /* x86-64 loads always have acquire semantics; use only a compiler barrier */ + const uint32_t value = *address; + _ReadBarrier(); + return value; + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + /* x86-64 loads always have acquire semantics; use only a compiler barrier */ + const size_t value = *address; + _ReadBarrier(); + return value; + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -173,6 +209,12 @@ return (size_t) _InterlockedDecrement64((volatile __int64*) address); } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return (size_t) _InterlockedDecrement64((volatile __int64*) address); + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { @@ -221,6 +263,24 @@ return *address; } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + /* x86 loads always have acquire semantics; use only a compiler barrier */ + const uint32_t value = *address; + _ReadBarrier(); + return value; + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + /* x86 loads always have acquire semantics; use only a compiler barrier */ + const size_t value = *address; + _ReadBarrier(); + return value; + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -246,7 +306,8 @@ pthreadpool_atomic_uint32_t* address, uint32_t value) { - /* x86 stores always have release semantics */ + /* x86 stores always have release semantics; use only a compiler barrier */ + _WriteBarrier(); *address = value; } @@ -254,7 +315,8 @@ pthreadpool_atomic_size_t* address, size_t value) { - /* x86 stores always have release semantics */ + /* x86 stores always have release semantics; use only a compiler barrier */ + _WriteBarrier(); *address = value; } @@ -264,6 +326,12 @@ return (size_t) _InterlockedDecrement((volatile long*) address); } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return (size_t) _InterlockedDecrement((volatile long*) address); + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { @@ -310,6 +378,18 @@ return (void*) __iso_volatile_load64((const volatile __int64*) address); } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return (uint32_t) __ldar32((volatile unsigned __int32*) address); + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + return (size_t) __ldar64((volatile unsigned __int64*) address); + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -353,6 +433,12 @@ return (size_t) _InterlockedDecrement64_nf((volatile __int64*) address); } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return (size_t) _InterlockedDecrement64_rel((volatile __int64*) address); + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { @@ -401,6 +487,24 @@ return (void*) __iso_volatile_load32((const volatile __int32*) address); } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + const uint32_t value = (uint32_t) __iso_volatile_load32((const volatile __int32*) address); + __dmb(_ARM_BARRIER_ISH); + _ReadBarrier(); + return value; + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + const size_t value = (size_t) __iso_volatile_load32((const volatile __int32*) address); + __dmb(_ARM_BARRIER_ISH); + _ReadBarrier(); + return value; + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -446,6 +550,12 @@ return (size_t) _InterlockedDecrement_nf((volatile long*) address); } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return (size_t) _InterlockedDecrement_rel((volatile long*) address); + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { @@ -496,6 +606,18 @@ return atomic_load_explicit(address, memory_order_relaxed); } + static inline uint32_t pthreadpool_load_acquire_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return atomic_load_explicit(address, memory_order_acquire); + } + + static inline size_t pthreadpool_load_acquire_size_t( + pthreadpool_atomic_size_t* address) + { + return atomic_load_explicit(address, memory_order_acquire); + } + static inline void pthreadpool_store_relaxed_uint32_t( pthreadpool_atomic_uint32_t* address, uint32_t value) @@ -537,6 +659,12 @@ return atomic_fetch_sub_explicit(address, 1, memory_order_relaxed) - 1; } + static inline size_t pthreadpool_decrement_fetch_release_size_t( + pthreadpool_atomic_size_t* address) + { + return atomic_fetch_sub_explicit(address, 1, memory_order_release) - 1; + } + static inline bool pthreadpool_try_decrement_relaxed_size_t( pthreadpool_atomic_size_t* value) { diff --git a/src/windows.c b/src/windows.c index e68b250..19e534f 100644 --- a/src/windows.c +++ b/src/windows.c @@ -21,14 +21,14 @@ static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) { - if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { + if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) { SetEvent(threadpool->completion_event[event_index]); } } static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) { /* Initial check */ - size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -38,7 +38,7 @@ static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_i /* This fence serves as a sleep instruction */ pthreadpool_fence_acquire(); - active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); if (active_threads == 0) { return; } @@ -55,7 +55,7 @@ static uint32_t wait_for_new_command( uint32_t last_command, uint32_t last_flags) { - uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -66,7 +66,7 @@ static uint32_t wait_for_new_command( /* This fence serves as a sleep instruction */ pthreadpool_fence_acquire(); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + command = pthreadpool_load_acquire_uint32_t(&threadpool->command); if (command != last_command) { return command; } @@ -170,7 +170,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { NULL /* name */); } - pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ for (size_t tid = 1; tid < threads_count; tid++) { |