diff options
author | Marat Dukhan <maratek@gmail.com> | 2020-04-07 19:14:45 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@gmail.com> | 2020-04-07 19:15:27 -0700 |
commit | be1bd8ed45f30ccdc23e5dcbf3896c1ae85f1ef3 (patch) | |
tree | 8ac2f511c4d3cc540cab4a0a0b716b7595f50ea1 | |
parent | fa67ff531c0f9999c742d500a4fa061b96937297 (diff) | |
download | pthreadpool-be1bd8ed45f30ccdc23e5dcbf3896c1ae85f1ef3.tar.gz |
Windows implementation using Events
-rw-r--r-- | CMakeLists.txt | 13 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | src/legacy-api.c | 15 | ||||
-rw-r--r-- | src/memory.c | 3 | ||||
-rw-r--r-- | src/portable-api.c | 6 | ||||
-rw-r--r-- | src/pthreads.c | 1 | ||||
-rw-r--r-- | src/shim.c | 8 | ||||
-rw-r--r-- | src/threadpool-atomics.h | 179 | ||||
-rw-r--r-- | src/threadpool-common.h | 18 | ||||
-rw-r--r-- | src/threadpool-object.h | 31 | ||||
-rw-r--r-- | src/threadpool-utils.h | 38 | ||||
-rw-r--r-- | src/windows.c | 363 |
12 files changed, 642 insertions, 35 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index da42de5..c84d915 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,8 +7,8 @@ PROJECT(pthreadpool C CXX) SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build") SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared) OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON) -SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, or default) for worker threads") -SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd) +SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, event, or default) for worker threads") +SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd event) IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}") OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON) OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON) @@ -71,6 +71,8 @@ ELSE() LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c) IF(APPLE AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd")) LIST(APPEND PTHREADPOOL_SRCS src/gcd.c) + ELSEIF(WIN32 AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "event")) + LIST(APPEND PTHREADPOOL_SRCS src/windows.c) ELSE() LIST(APPEND PTHREADPOOL_SRCS src/pthreads.c) ENDIF() @@ -96,12 +98,19 @@ ENDIF() IF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "condvar") TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0) ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "futex") TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=1) TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0) ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd") TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=1) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0) +ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "event") + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=1) ELSEIF(NOT PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default") MESSAGE(FATAL_ERROR "Unsupported synchronization primitive ${PTHREADPOOL_SYNC_PRIMITIVE}") ENDIF() @@ -13,7 +13,7 @@ It provides similar functionality to `#pragma omp parallel for`, but with additi * Run on user-specified or auto-detected number of threads. * Work-stealing scheduling for efficient work balancing. * Wait-free synchronization of work items. -* Compatible with Linux (including Android), macOS, iOS, MinGW, Emscripten environments. +* Compatible with Linux (including Android), macOS, iOS, Windows, Emscripten environments. * 100% unit tests coverage. * Throughput and latency microbenchmarks. diff --git a/src/legacy-api.c b/src/legacy-api.c index 43fb798..8d5a6fd 100644 --- a/src/legacy-api.c +++ b/src/legacy-api.c @@ -4,21 +4,12 @@ /* Dependencies */ #include <fxdiv.h> -/* Library header */ +/* Public library header */ #include <pthreadpool.h> +/* Internal library headers */ +#include "threadpool-utils.h" -static inline size_t divide_round_up(size_t dividend, size_t divisor) { - if (dividend % divisor == 0) { - return dividend / divisor; - } else { - return dividend / divisor + 1; - } -} - -static inline size_t min(size_t a, size_t b) { - return a < b ? a : b; -} void pthreadpool_compute_1d( pthreadpool_t threadpool, diff --git a/src/memory.c b/src/memory.c index 020cb6d..c2ac38c 100644 --- a/src/memory.c +++ b/src/memory.c @@ -11,11 +11,10 @@ /* Windows headers */ #ifdef _WIN32 - #define NOMINMAX #include <malloc.h> #endif -/* Internal headers */ +/* Internal library headers */ #include "threadpool-common.h" #include "threadpool-object.h" diff --git a/src/portable-api.c b/src/portable-api.c index c2c33a8..32df0d6 100644 --- a/src/portable-api.c +++ b/src/portable-api.c @@ -12,13 +12,13 @@ /* Dependencies */ #include <fxdiv.h> -/* Library header */ +/* Public library header */ #include <pthreadpool.h> -/* Internal headers */ -#include "threadpool-utils.h" +/* Internal library headers */ #include "threadpool-atomics.h" #include "threadpool-object.h" +#include "threadpool-utils.h" size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { diff --git a/src/pthreads.c b/src/pthreads.c index 05c6aac..8e4e234 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -38,7 +38,6 @@ /* Windows-specific headers */ #ifdef _WIN32 - #define NOMINMAX #include <sysinfoapi.h> #endif @@ -1,12 +1,12 @@ /* Standard C headers */ #include <stddef.h> -/* Library header */ +/* Public library header */ #include <pthreadpool.h> -static inline size_t min(size_t a, size_t b) { - return a < b ? a : b; -} +/* Internal library headers */ +#include "threadpool-utils.h" + struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h index 4e48da7..9bfeea0 100644 --- a/src/threadpool-atomics.h +++ b/src/threadpool-atomics.h @@ -4,6 +4,13 @@ #include <stddef.h> #include <stdint.h> +/* MSVC-specific headers */ +#ifdef _MSC_VER + #include <intrin.h> + #include <immintrin.h> +#endif + + #if defined(__wasm__) && defined(__EMSCRIPTEN_PTHREADS__) && defined(__clang__) /* * Clang for WebAssembly target lacks stdatomic.h header, @@ -92,6 +99,178 @@ static inline void pthreadpool_fence_release() { __c11_atomic_thread_fence(__ATOMIC_RELEASE); } +#elif defined(_MSC_VER) && (defined(_M_X64) || defined(_M_AMD64)) + typedef volatile uint32_t pthreadpool_atomic_uint32_t; + typedef volatile size_t pthreadpool_atomic_size_t; + typedef void *volatile pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return *address; + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return *address; + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return *address; + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + *address = value; + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + *address = value; + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + *address = value; + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + /* x86-64 stores always have release semantics */ + *address = value; + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + /* x86-64 stores always have release semantics */ + *address = value; + } + + static inline size_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t decrement) + { + return (size_t) _InterlockedExchangeAdd64((__int64 volatile*) address, (__int64) -decrement); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + const __int64 expected_old_value = *expected_value; + const __int64 actual_old_value = _InterlockedCompareExchange64( + (__int64 volatile*) address, (__int64) new_value, expected_old_value); + *expected_value = (size_t) actual_old_value; + return actual_old_value == expected_old_value; + } + + static inline void pthreadpool_fence_acquire() { + _mm_lfence(); + } + + static inline void pthreadpool_fence_release() { + _mm_sfence(); + } +#elif defined(_MSC_VER) && defined(_M_IX86) + typedef volatile uint32_t pthreadpool_atomic_uint32_t; + typedef volatile size_t pthreadpool_atomic_size_t; + typedef void *volatile pthreadpool_atomic_void_p; + + static inline uint32_t pthreadpool_load_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address) + { + return *address; + } + + static inline size_t pthreadpool_load_relaxed_size_t( + pthreadpool_atomic_size_t* address) + { + return *address; + } + + static inline void* pthreadpool_load_relaxed_void_p( + pthreadpool_atomic_void_p* address) + { + return *address; + } + + static inline void pthreadpool_store_relaxed_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + *address = value; + } + + static inline void pthreadpool_store_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + *address = value; + } + + static inline void pthreadpool_store_relaxed_void_p( + pthreadpool_atomic_void_p* address, + void* value) + { + *address = value; + } + + static inline void pthreadpool_store_release_uint32_t( + pthreadpool_atomic_uint32_t* address, + uint32_t value) + { + /* x86 stores always have release semantics */ + *address = value; + } + + static inline void pthreadpool_store_release_size_t( + pthreadpool_atomic_size_t* address, + size_t value) + { + /* x86 stores always have release semantics */ + *address = value; + } + + static inline size_t pthreadpool_fetch_sub_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t decrement) + { + return (size_t) _InterlockedExchangeAdd((long volatile*) address, (long) -decrement); + } + + static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t( + pthreadpool_atomic_size_t* address, + size_t* expected_value, + size_t new_value) + { + const long expected_old_value = *expected_value; + const long actual_old_value = _InterlockedCompareExchange( + (long volatile*) address, (long) new_value, expected_old_value); + *expected_value = (size_t) actual_old_value; + return actual_old_value == expected_old_value; + } + + static inline void pthreadpool_fence_acquire() { + _mm_lfence(); + } + + static inline void pthreadpool_fence_release() { + _mm_sfence(); + } #else #include <stdatomic.h> diff --git a/src/threadpool-common.h b/src/threadpool-common.h index ba5587e..8576517 100644 --- a/src/threadpool-common.h +++ b/src/threadpool-common.h @@ -22,8 +22,16 @@ #endif #endif +#ifndef PTHREADPOOL_USE_EVENT + #if defined(_WIN32) + #define PTHREADPOOL_USE_EVENT 1 + #else + #define PTHREADPOOL_USE_EVENT 0 + #endif +#endif + #ifndef PTHREADPOOL_USE_CONDVAR - #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX + #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX || PTHREADPOOL_USE_EVENT #define PTHREADPOOL_USE_CONDVAR 0 #else #define PTHREADPOOL_USE_CONDVAR 1 @@ -35,7 +43,13 @@ #define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 #define PTHREADPOOL_CACHELINE_SIZE 64 -#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE))) +#if defined(__GNUC__) + #define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE))) +#elif defined(_MSC_VER) + #define PTHREADPOOL_CACHELINE_ALIGNED __declspec(align(PTHREADPOOL_CACHELINE_SIZE)) +#else + #error "Platform-specific implementation of PTHREADPOOL_CACHELINE_ALIGNED required" +#endif #if defined(__clang__) #if __has_extension(c_static_assert) || __has_feature(c_static_assert) diff --git a/src/threadpool-object.h b/src/threadpool-object.h index 81e3515..0b4e56a 100644 --- a/src/threadpool-object.h +++ b/src/threadpool-object.h @@ -18,6 +18,11 @@ #include <dispatch/dispatch.h> #endif +/* Windows headers */ +#if PTHREADPOOL_USE_EVENT +#include <windows.h> +#endif + /* Dependencies */ #include <fxdiv.h> @@ -65,6 +70,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { */ pthread_t thread_object; #endif +#if PTHREADPOOL_USE_EVENT + /** + * The Windows thread handle corresponding to the thread. + */ + HANDLE thread_handle; +#endif }; PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, @@ -446,6 +457,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ dispatch_semaphore_t execution_semaphore; #endif +#if PTHREADPOOL_USE_EVENT + /** + * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. + */ + HANDLE execution_mutex; +#endif #if PTHREADPOOL_USE_CONDVAR /** * Guards access to the @a active_threads variable. @@ -464,6 +481,20 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { */ pthread_cond_t command_condvar; #endif +#if PTHREADPOOL_USE_EVENT + /** + * Events to wait on until all threads complete an operation (until @a active_threads is zero). + * To avoid race conditions due to spin-lock synchronization, we use two events and switch event in use after every + * submitted command according to the high bit of the command word. + */ + HANDLE completion_event[2]; + /** + * Events to wait on for change of the @a command variable. + * To avoid race conditions due to spin-lock synchronization, we use two events and switch event in use after every + * submitted command according to the high bit of the command word. + */ + HANDLE command_event[2]; +#endif /** * The number of threads in the thread pool. Never changes after pthreadpool_create. */ diff --git a/src/threadpool-utils.h b/src/threadpool-utils.h index 1d147e0..a86392b 100644 --- a/src/threadpool-utils.h +++ b/src/threadpool-utils.h @@ -3,12 +3,20 @@ #include <stdint.h> #include <stddef.h> -#if defined(__SSE__) || defined(__x86_64__) -#include <xmmintrin.h> +/* SSE-specific headers */ +#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1) + #include <xmmintrin.h> #endif +/* MSVC-specific headers */ +#if defined(_MSC_VER) && _MSC_VER >= 1920 + #include <intrin.h> + #include <immintrin.h> +#endif + + struct fpu_state { -#if defined(__SSE__) || defined(__x86_64__) +#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1) uint32_t mxcsr; #elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) uint32_t fpscr; @@ -21,7 +29,7 @@ struct fpu_state { static inline struct fpu_state get_fpu_state() { struct fpu_state state = { 0 }; -#if defined(__SSE__) || defined(__x86_64__) +#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1) state.mxcsr = (uint32_t) _mm_getcsr(); #elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) __asm__ __volatile__("VMRS %[fpscr], fpscr" : [fpscr] "=r" (state.fpscr)); @@ -32,7 +40,7 @@ static inline struct fpu_state get_fpu_state() { } static inline void set_fpu_state(const struct fpu_state state) { -#if defined(__SSE__) || defined(__x86_64__) +#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1) _mm_setcsr((unsigned int) state.mxcsr); #elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) __asm__ __volatile__("VMSR fpscr, %[fpscr]" : : [fpscr] "r" (state.fpscr)); @@ -42,7 +50,7 @@ static inline void set_fpu_state(const struct fpu_state state) { } static inline void disable_fpu_denormals() { -#if defined(__SSE__) || defined(__x86_64__) +#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1) _mm_setcsr(_mm_getcsr() | 0x8040); #elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0) uint32_t fpscr; @@ -65,14 +73,23 @@ static inline void disable_fpu_denormals() { static inline size_t multiply_divide(size_t a, size_t b, size_t d) { #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4) return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d); - #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) + #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) && defined(__SIZEOF_INT128__) return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d); + #elif (defined(_MSC_VER) && _MSC_VER >= 1920) && (defined(_M_AMD64) || defined(_M_X64)) + uint64_t product_hi; + const uint64_t product_lo = _umul128(a, b, &product_hi); + uint64_t remainder; + return (size_t) _udiv128(product_hi, product_lo, d, &remainder); + #elif (defined(_MSC_VER) && _MSC_VER >= 1920) && defined(_M_IX86) + const unsigned __int64 product_full = __emulu((unsigned int) a, (unsigned int) b); + unsigned int remainder; + return (size_t) _udiv64(product_full, (unsigned int) d, &remainder); #else #error "Platform-specific implementation of multiply_divide required" #endif } -static inline size_t modulo_decrement(uint32_t i, uint32_t n) { +static inline size_t modulo_decrement(size_t i, size_t n) { /* Wrap modulo n, if needed */ if (i == 0) { i = n; @@ -89,6 +106,11 @@ static inline size_t divide_round_up(size_t dividend, size_t divisor) { } } +/* Windows headers define min and max macros; undefine it here */ +#ifdef min + #undef min +#endif + static inline size_t min(size_t a, size_t b) { return a < b ? a : b; } diff --git a/src/windows.c b/src/windows.c new file mode 100644 index 0000000..0139441 --- /dev/null +++ b/src/windows.c @@ -0,0 +1,363 @@ +/* Standard C headers */ +#include <assert.h> +#include <limits.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +/* Configuration header */ +#include "threadpool-common.h" + +/* Windows headers */ +#include <windows.h> + +/* MSVC-specific headers */ +#ifdef _MSC_VER + #include <intrin.h> + #include <immintrin.h> +#endif + +/* Public library header */ +#include <pthreadpool.h> + +/* Internal library headers */ +#include "threadpool-atomics.h" +#include "threadpool-object.h" +#include "threadpool-utils.h" + + +static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) { + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + SetEvent(threadpool->completion_event[event_index]); + } +} + +static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) { + /* Initial check */ + size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + if (active_threads == 0) { + return; + } + + /* Spin-wait */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + pthreadpool_fence_acquire(); + + active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + if (active_threads == 0) { + return; + } + } + + /* Fall-back to event wait */ + const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE); + assert(wait_status == WAIT_OBJECT_0); + assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0); +} + +static uint32_t wait_for_new_command( + struct pthreadpool* threadpool, + uint32_t last_command, + uint32_t last_flags) +{ + uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + if (command != last_command) { + return command; + } + + if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + pthreadpool_fence_acquire(); + + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + if (command != last_command) { + return command; + } + } + } + + /* Spin-wait disabled or timed out, fall back to event wait */ + const uint32_t event_index = (last_command >> 31); + const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE); + assert(wait_status == WAIT_OBJECT_0); + + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + assert(command != last_command); + return command; +} + +static DWORD WINAPI thread_main(LPVOID arg) { + struct thread_info* thread = (struct thread_info*) arg; + struct pthreadpool* threadpool = thread->threadpool; + uint32_t last_command = threadpool_command_init; + struct fpu_state saved_fpu_state = { 0 }; + uint32_t flags = 0; + + /* Check in */ + checkin_worker_thread(threadpool, 0); + + /* Monitor new commands and act accordingly */ + for (;;) { + uint32_t command = wait_for_new_command(threadpool, last_command, flags); + pthreadpool_fence_acquire(); + + flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); + + /* Process command */ + switch (command & THREADPOOL_COMMAND_MASK) { + case threadpool_command_parallelize: + { + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + thread_function(threadpool, thread); + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + break; + } + case threadpool_command_shutdown: + /* Exit immediately: the master thread is waiting on pthread_join */ + return 0; + case threadpool_command_init: + /* To inhibit compiler warning */ + break; + } + /* Notify the master thread that we finished processing */ + const uint32_t event_index = command >> 31; + checkin_worker_thread(threadpool, event_index); + /* Update last command */ + last_command = command; + }; + return 0; +} + +struct pthreadpool* pthreadpool_create(size_t threads_count) { + if (threads_count == 0) { + SYSTEM_INFO system_info; + ZeroMemory(&system_info, sizeof(system_info)); + GetSystemInfo(&system_info); + threads_count = (size_t) system_info.dwNumberOfProcessors; + } + + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + if (threadpool == NULL) { + return NULL; + } + threadpool->threads_count = threads_count; + for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + threadpool->threads[tid].threadpool = threadpool; + } + + /* Thread pool with a single thread computes everything on the caller thread. */ + if (threads_count > 1) { + threadpool->execution_mutex = CreateMutexW( + NULL /* mutex attributes */, + FALSE /* initially owned */, + NULL /* name */); + for (size_t i = 0; i < 2; i++) { + threadpool->completion_event[i] = CreateEventW( + NULL /* event attributes */, + TRUE /* manual-reset event: yes */, + FALSE /* initial state: nonsignaled */, + NULL /* name */); + threadpool->command_event[i] = CreateEventW( + NULL /* event attributes */, + TRUE /* manual-reset event: yes */, + FALSE /* initial state: nonsignaled */, + NULL /* name */); + } + + pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + + /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ + for (size_t tid = 1; tid < threads_count; tid++) { + threadpool->threads[tid].thread_handle = CreateThread( + NULL /* thread attributes */, + 0 /* stack size: default */, + &thread_main, + &threadpool->threads[tid], + 0 /* creation flags */, + NULL /* thread id */); + } + + /* Wait until all threads initialize */ + wait_worker_threads(threadpool, 0); + } + return threadpool; +} + +PTHREADPOOL_INTERNAL void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + const void* params, + size_t params_size, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE); + assert(wait_status == WAIT_OBJECT_0); + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + const size_t threads_count = threadpool->threads_count; + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + + if (params_size != 0) { + CopyMemory(&threadpool->params, params, params_size); + pthreadpool_fence_release(); + } + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary, because the workers might be waiting in a spin-loop + * rather than on the event object. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid race conditions. + * Choose the event based on the high bit of the command, which is flipped on every update. + */ + const uint32_t event_index = (old_command >> 31); + const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]); + assert(set_event_status != FALSE); + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_function(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* + * Wait until the threads finish computation + * Use the complementary event because it corresponds to the new command. + */ + wait_worker_threads(threadpool, event_index ^ 1); + + /* + * Reset events for the next command. + * Note: the reset events are different from the events used for synchronization in this update. + */ + BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]); + assert(reset_event_status != FALSE); + reset_event_status = ResetEvent(threadpool->completion_event[event_index]); + assert(reset_event_status != FALSE); + + /* Make changes by other threads visible to this thread */ + pthreadpool_fence_acquire(); + + /* Unprotect the global threadpool structures */ + const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex); + assert(release_mutex_status != FALSE); +} + +void pthreadpool_destroy(struct pthreadpool* threadpool) { + if (threadpool != NULL) { + const size_t threads_count = threadpool->threads_count; + if (threads_count > 1) { + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads values. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); + + /* + * Signal the event to wake up the threads. + * Event in use must be switched after every submitted command to avoid race conditions. + * Choose the event based on the high bit of the command, which is flipped on every update. + */ + const uint32_t event_index = (old_command >> 31); + const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]); + assert(set_event_status != FALSE); + + /* Wait until all threads return */ + for (size_t tid = 1; tid < threads_count; tid++) { + const HANDLE thread_handle = threadpool->threads[tid].thread_handle; + if (thread_handle != NULL) { + const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE); + assert(wait_status == WAIT_OBJECT_0); + + const BOOL close_status = CloseHandle(thread_handle); + assert(close_status != FALSE); + } + } + + /* Release resources */ + if (threadpool->execution_mutex != NULL) { + const BOOL close_status = CloseHandle(threadpool->execution_mutex); + assert(close_status != FALSE); + } + for (size_t i = 0; i < 2; i++) { + if (threadpool->command_event[i] != NULL) { + const BOOL close_status = CloseHandle(threadpool->command_event[i]); + assert(close_status != FALSE); + } + if (threadpool->completion_event != NULL) { + const BOOL close_status = CloseHandle(threadpool->completion_event[i]); + assert(close_status != FALSE); + } + } + } + pthreadpool_deallocate(threadpool); + } +} |