aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@gmail.com>2020-04-07 19:14:45 -0700
committerMarat Dukhan <maratek@gmail.com>2020-04-07 19:15:27 -0700
commitbe1bd8ed45f30ccdc23e5dcbf3896c1ae85f1ef3 (patch)
tree8ac2f511c4d3cc540cab4a0a0b716b7595f50ea1
parentfa67ff531c0f9999c742d500a4fa061b96937297 (diff)
downloadpthreadpool-be1bd8ed45f30ccdc23e5dcbf3896c1ae85f1ef3.tar.gz
Windows implementation using Events
-rw-r--r--CMakeLists.txt13
-rw-r--r--README.md2
-rw-r--r--src/legacy-api.c15
-rw-r--r--src/memory.c3
-rw-r--r--src/portable-api.c6
-rw-r--r--src/pthreads.c1
-rw-r--r--src/shim.c8
-rw-r--r--src/threadpool-atomics.h179
-rw-r--r--src/threadpool-common.h18
-rw-r--r--src/threadpool-object.h31
-rw-r--r--src/threadpool-utils.h38
-rw-r--r--src/windows.c363
12 files changed, 642 insertions, 35 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index da42de5..c84d915 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,8 +7,8 @@ PROJECT(pthreadpool C CXX)
SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build")
SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared)
OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON)
-SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, or default) for worker threads")
-SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd)
+SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, event, or default) for worker threads")
+SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd event)
IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}")
OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON)
OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON)
@@ -71,6 +71,8 @@ ELSE()
LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c)
IF(APPLE AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd"))
LIST(APPEND PTHREADPOOL_SRCS src/gcd.c)
+ ELSEIF(WIN32 AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "event"))
+ LIST(APPEND PTHREADPOOL_SRCS src/windows.c)
ELSE()
LIST(APPEND PTHREADPOOL_SRCS src/pthreads.c)
ENDIF()
@@ -96,12 +98,19 @@ ENDIF()
IF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "condvar")
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0)
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0)
ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "futex")
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=1)
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0)
ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd")
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0)
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=1)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=0)
+ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "event")
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_EVENT=1)
ELSEIF(NOT PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default")
MESSAGE(FATAL_ERROR "Unsupported synchronization primitive ${PTHREADPOOL_SYNC_PRIMITIVE}")
ENDIF()
diff --git a/README.md b/README.md
index 164aab5..57ed3d4 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ It provides similar functionality to `#pragma omp parallel for`, but with additi
* Run on user-specified or auto-detected number of threads.
* Work-stealing scheduling for efficient work balancing.
* Wait-free synchronization of work items.
-* Compatible with Linux (including Android), macOS, iOS, MinGW, Emscripten environments.
+* Compatible with Linux (including Android), macOS, iOS, Windows, Emscripten environments.
* 100% unit tests coverage.
* Throughput and latency microbenchmarks.
diff --git a/src/legacy-api.c b/src/legacy-api.c
index 43fb798..8d5a6fd 100644
--- a/src/legacy-api.c
+++ b/src/legacy-api.c
@@ -4,21 +4,12 @@
/* Dependencies */
#include <fxdiv.h>
-/* Library header */
+/* Public library header */
#include <pthreadpool.h>
+/* Internal library headers */
+#include "threadpool-utils.h"
-static inline size_t divide_round_up(size_t dividend, size_t divisor) {
- if (dividend % divisor == 0) {
- return dividend / divisor;
- } else {
- return dividend / divisor + 1;
- }
-}
-
-static inline size_t min(size_t a, size_t b) {
- return a < b ? a : b;
-}
void pthreadpool_compute_1d(
pthreadpool_t threadpool,
diff --git a/src/memory.c b/src/memory.c
index 020cb6d..c2ac38c 100644
--- a/src/memory.c
+++ b/src/memory.c
@@ -11,11 +11,10 @@
/* Windows headers */
#ifdef _WIN32
- #define NOMINMAX
#include <malloc.h>
#endif
-/* Internal headers */
+/* Internal library headers */
#include "threadpool-common.h"
#include "threadpool-object.h"
diff --git a/src/portable-api.c b/src/portable-api.c
index c2c33a8..32df0d6 100644
--- a/src/portable-api.c
+++ b/src/portable-api.c
@@ -12,13 +12,13 @@
/* Dependencies */
#include <fxdiv.h>
-/* Library header */
+/* Public library header */
#include <pthreadpool.h>
-/* Internal headers */
-#include "threadpool-utils.h"
+/* Internal library headers */
#include "threadpool-atomics.h"
#include "threadpool-object.h"
+#include "threadpool-utils.h"
size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) {
diff --git a/src/pthreads.c b/src/pthreads.c
index 05c6aac..8e4e234 100644
--- a/src/pthreads.c
+++ b/src/pthreads.c
@@ -38,7 +38,6 @@
/* Windows-specific headers */
#ifdef _WIN32
- #define NOMINMAX
#include <sysinfoapi.h>
#endif
diff --git a/src/shim.c b/src/shim.c
index b5670ea..b376a7a 100644
--- a/src/shim.c
+++ b/src/shim.c
@@ -1,12 +1,12 @@
/* Standard C headers */
#include <stddef.h>
-/* Library header */
+/* Public library header */
#include <pthreadpool.h>
-static inline size_t min(size_t a, size_t b) {
- return a < b ? a : b;
-}
+/* Internal library headers */
+#include "threadpool-utils.h"
+
struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h
index 4e48da7..9bfeea0 100644
--- a/src/threadpool-atomics.h
+++ b/src/threadpool-atomics.h
@@ -4,6 +4,13 @@
#include <stddef.h>
#include <stdint.h>
+/* MSVC-specific headers */
+#ifdef _MSC_VER
+ #include <intrin.h>
+ #include <immintrin.h>
+#endif
+
+
#if defined(__wasm__) && defined(__EMSCRIPTEN_PTHREADS__) && defined(__clang__)
/*
* Clang for WebAssembly target lacks stdatomic.h header,
@@ -92,6 +99,178 @@
static inline void pthreadpool_fence_release() {
__c11_atomic_thread_fence(__ATOMIC_RELEASE);
}
+#elif defined(_MSC_VER) && (defined(_M_X64) || defined(_M_AMD64))
+ typedef volatile uint32_t pthreadpool_atomic_uint32_t;
+ typedef volatile size_t pthreadpool_atomic_size_t;
+ typedef void *volatile pthreadpool_atomic_void_p;
+
+ static inline uint32_t pthreadpool_load_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return *address;
+ }
+
+ static inline size_t pthreadpool_load_relaxed_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return *address;
+ }
+
+ static inline void* pthreadpool_load_relaxed_void_p(
+ pthreadpool_atomic_void_p* address)
+ {
+ return *address;
+ }
+
+ static inline void pthreadpool_store_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_relaxed_void_p(
+ pthreadpool_atomic_void_p* address,
+ void* value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_release_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ /* x86-64 stores always have release semantics */
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_release_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ /* x86-64 stores always have release semantics */
+ *address = value;
+ }
+
+ static inline size_t pthreadpool_fetch_sub_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t decrement)
+ {
+ return (size_t) _InterlockedExchangeAdd64((__int64 volatile*) address, (__int64) -decrement);
+ }
+
+ static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t* expected_value,
+ size_t new_value)
+ {
+ const __int64 expected_old_value = *expected_value;
+ const __int64 actual_old_value = _InterlockedCompareExchange64(
+ (__int64 volatile*) address, (__int64) new_value, expected_old_value);
+ *expected_value = (size_t) actual_old_value;
+ return actual_old_value == expected_old_value;
+ }
+
+ static inline void pthreadpool_fence_acquire() {
+ _mm_lfence();
+ }
+
+ static inline void pthreadpool_fence_release() {
+ _mm_sfence();
+ }
+#elif defined(_MSC_VER) && defined(_M_IX86)
+ typedef volatile uint32_t pthreadpool_atomic_uint32_t;
+ typedef volatile size_t pthreadpool_atomic_size_t;
+ typedef void *volatile pthreadpool_atomic_void_p;
+
+ static inline uint32_t pthreadpool_load_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address)
+ {
+ return *address;
+ }
+
+ static inline size_t pthreadpool_load_relaxed_size_t(
+ pthreadpool_atomic_size_t* address)
+ {
+ return *address;
+ }
+
+ static inline void* pthreadpool_load_relaxed_void_p(
+ pthreadpool_atomic_void_p* address)
+ {
+ return *address;
+ }
+
+ static inline void pthreadpool_store_relaxed_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_relaxed_void_p(
+ pthreadpool_atomic_void_p* address,
+ void* value)
+ {
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_release_uint32_t(
+ pthreadpool_atomic_uint32_t* address,
+ uint32_t value)
+ {
+ /* x86 stores always have release semantics */
+ *address = value;
+ }
+
+ static inline void pthreadpool_store_release_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t value)
+ {
+ /* x86 stores always have release semantics */
+ *address = value;
+ }
+
+ static inline size_t pthreadpool_fetch_sub_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t decrement)
+ {
+ return (size_t) _InterlockedExchangeAdd((long volatile*) address, (long) -decrement);
+ }
+
+ static inline bool pthreadpool_compare_exchange_weak_relaxed_size_t(
+ pthreadpool_atomic_size_t* address,
+ size_t* expected_value,
+ size_t new_value)
+ {
+ const long expected_old_value = *expected_value;
+ const long actual_old_value = _InterlockedCompareExchange(
+ (long volatile*) address, (long) new_value, expected_old_value);
+ *expected_value = (size_t) actual_old_value;
+ return actual_old_value == expected_old_value;
+ }
+
+ static inline void pthreadpool_fence_acquire() {
+ _mm_lfence();
+ }
+
+ static inline void pthreadpool_fence_release() {
+ _mm_sfence();
+ }
#else
#include <stdatomic.h>
diff --git a/src/threadpool-common.h b/src/threadpool-common.h
index ba5587e..8576517 100644
--- a/src/threadpool-common.h
+++ b/src/threadpool-common.h
@@ -22,8 +22,16 @@
#endif
#endif
+#ifndef PTHREADPOOL_USE_EVENT
+ #if defined(_WIN32)
+ #define PTHREADPOOL_USE_EVENT 1
+ #else
+ #define PTHREADPOOL_USE_EVENT 0
+ #endif
+#endif
+
#ifndef PTHREADPOOL_USE_CONDVAR
- #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX
+ #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX || PTHREADPOOL_USE_EVENT
#define PTHREADPOOL_USE_CONDVAR 0
#else
#define PTHREADPOOL_USE_CONDVAR 1
@@ -35,7 +43,13 @@
#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000
#define PTHREADPOOL_CACHELINE_SIZE 64
-#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE)))
+#if defined(__GNUC__)
+ #define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE)))
+#elif defined(_MSC_VER)
+ #define PTHREADPOOL_CACHELINE_ALIGNED __declspec(align(PTHREADPOOL_CACHELINE_SIZE))
+#else
+ #error "Platform-specific implementation of PTHREADPOOL_CACHELINE_ALIGNED required"
+#endif
#if defined(__clang__)
#if __has_extension(c_static_assert) || __has_feature(c_static_assert)
diff --git a/src/threadpool-object.h b/src/threadpool-object.h
index 81e3515..0b4e56a 100644
--- a/src/threadpool-object.h
+++ b/src/threadpool-object.h
@@ -18,6 +18,11 @@
#include <dispatch/dispatch.h>
#endif
+/* Windows headers */
+#if PTHREADPOOL_USE_EVENT
+#include <windows.h>
+#endif
+
/* Dependencies */
#include <fxdiv.h>
@@ -65,6 +70,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
*/
pthread_t thread_object;
#endif
+#if PTHREADPOOL_USE_EVENT
+ /**
+ * The Windows thread handle corresponding to the thread.
+ */
+ HANDLE thread_handle;
+#endif
};
PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0,
@@ -446,6 +457,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
*/
dispatch_semaphore_t execution_semaphore;
#endif
+#if PTHREADPOOL_USE_EVENT
+ /**
+ * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
+ */
+ HANDLE execution_mutex;
+#endif
#if PTHREADPOOL_USE_CONDVAR
/**
* Guards access to the @a active_threads variable.
@@ -464,6 +481,20 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
*/
pthread_cond_t command_condvar;
#endif
+#if PTHREADPOOL_USE_EVENT
+ /**
+ * Events to wait on until all threads complete an operation (until @a active_threads is zero).
+ * To avoid race conditions due to spin-lock synchronization, we use two events and switch event in use after every
+ * submitted command according to the high bit of the command word.
+ */
+ HANDLE completion_event[2];
+ /**
+ * Events to wait on for change of the @a command variable.
+ * To avoid race conditions due to spin-lock synchronization, we use two events and switch event in use after every
+ * submitted command according to the high bit of the command word.
+ */
+ HANDLE command_event[2];
+#endif
/**
* The number of threads in the thread pool. Never changes after pthreadpool_create.
*/
diff --git a/src/threadpool-utils.h b/src/threadpool-utils.h
index 1d147e0..a86392b 100644
--- a/src/threadpool-utils.h
+++ b/src/threadpool-utils.h
@@ -3,12 +3,20 @@
#include <stdint.h>
#include <stddef.h>
-#if defined(__SSE__) || defined(__x86_64__)
-#include <xmmintrin.h>
+/* SSE-specific headers */
+#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1)
+ #include <xmmintrin.h>
#endif
+/* MSVC-specific headers */
+#if defined(_MSC_VER) && _MSC_VER >= 1920
+ #include <intrin.h>
+ #include <immintrin.h>
+#endif
+
+
struct fpu_state {
-#if defined(__SSE__) || defined(__x86_64__)
+#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1)
uint32_t mxcsr;
#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0)
uint32_t fpscr;
@@ -21,7 +29,7 @@ struct fpu_state {
static inline struct fpu_state get_fpu_state() {
struct fpu_state state = { 0 };
-#if defined(__SSE__) || defined(__x86_64__)
+#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1)
state.mxcsr = (uint32_t) _mm_getcsr();
#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0)
__asm__ __volatile__("VMRS %[fpscr], fpscr" : [fpscr] "=r" (state.fpscr));
@@ -32,7 +40,7 @@ static inline struct fpu_state get_fpu_state() {
}
static inline void set_fpu_state(const struct fpu_state state) {
-#if defined(__SSE__) || defined(__x86_64__)
+#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1)
_mm_setcsr((unsigned int) state.mxcsr);
#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0)
__asm__ __volatile__("VMSR fpscr, %[fpscr]" : : [fpscr] "r" (state.fpscr));
@@ -42,7 +50,7 @@ static inline void set_fpu_state(const struct fpu_state state) {
}
static inline void disable_fpu_denormals() {
-#if defined(__SSE__) || defined(__x86_64__)
+#if defined(__SSE__) || defined(__x86_64__) || defined(_M_X64) || defined(_M_AMD64) || (defined(_M_IX86_FP) && _M_IX86_FP >= 1)
_mm_setcsr(_mm_getcsr() | 0x8040);
#elif defined(__arm__) && defined(__ARM_FP) && (__ARM_FP != 0)
uint32_t fpscr;
@@ -65,14 +73,23 @@ static inline void disable_fpu_denormals() {
static inline size_t multiply_divide(size_t a, size_t b, size_t d) {
#if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4)
return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d);
- #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8)
+ #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) && defined(__SIZEOF_INT128__)
return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d);
+ #elif (defined(_MSC_VER) && _MSC_VER >= 1920) && (defined(_M_AMD64) || defined(_M_X64))
+ uint64_t product_hi;
+ const uint64_t product_lo = _umul128(a, b, &product_hi);
+ uint64_t remainder;
+ return (size_t) _udiv128(product_hi, product_lo, d, &remainder);
+ #elif (defined(_MSC_VER) && _MSC_VER >= 1920) && defined(_M_IX86)
+ const unsigned __int64 product_full = __emulu((unsigned int) a, (unsigned int) b);
+ unsigned int remainder;
+ return (size_t) _udiv64(product_full, (unsigned int) d, &remainder);
#else
#error "Platform-specific implementation of multiply_divide required"
#endif
}
-static inline size_t modulo_decrement(uint32_t i, uint32_t n) {
+static inline size_t modulo_decrement(size_t i, size_t n) {
/* Wrap modulo n, if needed */
if (i == 0) {
i = n;
@@ -89,6 +106,11 @@ static inline size_t divide_round_up(size_t dividend, size_t divisor) {
}
}
+/* Windows headers define min and max macros; undefine it here */
+#ifdef min
+ #undef min
+#endif
+
static inline size_t min(size_t a, size_t b) {
return a < b ? a : b;
}
diff --git a/src/windows.c b/src/windows.c
new file mode 100644
index 0000000..0139441
--- /dev/null
+++ b/src/windows.c
@@ -0,0 +1,363 @@
+/* Standard C headers */
+#include <assert.h>
+#include <limits.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Configuration header */
+#include "threadpool-common.h"
+
+/* Windows headers */
+#include <windows.h>
+
+/* MSVC-specific headers */
+#ifdef _MSC_VER
+ #include <intrin.h>
+ #include <immintrin.h>
+#endif
+
+/* Public library header */
+#include <pthreadpool.h>
+
+/* Internal library headers */
+#include "threadpool-atomics.h"
+#include "threadpool-object.h"
+#include "threadpool-utils.h"
+
+
+static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
+ if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) {
+ SetEvent(threadpool->completion_event[event_index]);
+ }
+}
+
+static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
+ /* Initial check */
+ size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ if (active_threads == 0) {
+ return;
+ }
+
+ /* Spin-wait */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ /* This fence serves as a sleep instruction */
+ pthreadpool_fence_acquire();
+
+ active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads);
+ if (active_threads == 0) {
+ return;
+ }
+ }
+
+ /* Fall-back to event wait */
+ const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+ assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
+}
+
+static uint32_t wait_for_new_command(
+ struct pthreadpool* threadpool,
+ uint32_t last_command,
+ uint32_t last_flags)
+{
+ uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ if (command != last_command) {
+ return command;
+ }
+
+ if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
+ /* Spin-wait loop */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ /* This fence serves as a sleep instruction */
+ pthreadpool_fence_acquire();
+
+ command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ if (command != last_command) {
+ return command;
+ }
+ }
+ }
+
+ /* Spin-wait disabled or timed out, fall back to event wait */
+ const uint32_t event_index = (last_command >> 31);
+ const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ assert(command != last_command);
+ return command;
+}
+
+static DWORD WINAPI thread_main(LPVOID arg) {
+ struct thread_info* thread = (struct thread_info*) arg;
+ struct pthreadpool* threadpool = thread->threadpool;
+ uint32_t last_command = threadpool_command_init;
+ struct fpu_state saved_fpu_state = { 0 };
+ uint32_t flags = 0;
+
+ /* Check in */
+ checkin_worker_thread(threadpool, 0);
+
+ /* Monitor new commands and act accordingly */
+ for (;;) {
+ uint32_t command = wait_for_new_command(threadpool, last_command, flags);
+ pthreadpool_fence_acquire();
+
+ flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
+
+ /* Process command */
+ switch (command & THREADPOOL_COMMAND_MASK) {
+ case threadpool_command_parallelize:
+ {
+ const thread_function_t thread_function =
+ (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ thread_function(threadpool, thread);
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+ break;
+ }
+ case threadpool_command_shutdown:
+ /* Exit immediately: the master thread is waiting on pthread_join */
+ return 0;
+ case threadpool_command_init:
+ /* To inhibit compiler warning */
+ break;
+ }
+ /* Notify the master thread that we finished processing */
+ const uint32_t event_index = command >> 31;
+ checkin_worker_thread(threadpool, event_index);
+ /* Update last command */
+ last_command = command;
+ };
+ return 0;
+}
+
+struct pthreadpool* pthreadpool_create(size_t threads_count) {
+ if (threads_count == 0) {
+ SYSTEM_INFO system_info;
+ ZeroMemory(&system_info, sizeof(system_info));
+ GetSystemInfo(&system_info);
+ threads_count = (size_t) system_info.dwNumberOfProcessors;
+ }
+
+ struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
+ if (threadpool == NULL) {
+ return NULL;
+ }
+ threadpool->threads_count = threads_count;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_number = tid;
+ threadpool->threads[tid].threadpool = threadpool;
+ }
+
+ /* Thread pool with a single thread computes everything on the caller thread. */
+ if (threads_count > 1) {
+ threadpool->execution_mutex = CreateMutexW(
+ NULL /* mutex attributes */,
+ FALSE /* initially owned */,
+ NULL /* name */);
+ for (size_t i = 0; i < 2; i++) {
+ threadpool->completion_event[i] = CreateEventW(
+ NULL /* event attributes */,
+ TRUE /* manual-reset event: yes */,
+ FALSE /* initial state: nonsignaled */,
+ NULL /* name */);
+ threadpool->command_event[i] = CreateEventW(
+ NULL /* event attributes */,
+ TRUE /* manual-reset event: yes */,
+ FALSE /* initial state: nonsignaled */,
+ NULL /* name */);
+ }
+
+ pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+
+ /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
+ for (size_t tid = 1; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_handle = CreateThread(
+ NULL /* thread attributes */,
+ 0 /* stack size: default */,
+ &thread_main,
+ &threadpool->threads[tid],
+ 0 /* creation flags */,
+ NULL /* thread id */);
+ }
+
+ /* Wait until all threads initialize */
+ wait_worker_threads(threadpool, 0);
+ }
+ return threadpool;
+}
+
+PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
+ struct pthreadpool* threadpool,
+ thread_function_t thread_function,
+ const void* params,
+ size_t params_size,
+ void* task,
+ void* context,
+ size_t linear_range,
+ uint32_t flags)
+{
+ assert(threadpool != NULL);
+ assert(thread_function != NULL);
+ assert(task != NULL);
+ assert(linear_range > 1);
+
+ /* Protect the global threadpool structures */
+ const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ /* Setup global arguments */
+ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
+ pthreadpool_store_relaxed_void_p(&threadpool->task, task);
+ pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
+
+ const size_t threads_count = threadpool->threads_count;
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+
+ if (params_size != 0) {
+ CopyMemory(&threadpool->params, params, params_size);
+ pthreadpool_fence_release();
+ }
+
+ /* Spread the work between threads */
+ size_t range_start = 0;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ struct thread_info* thread = &threadpool->threads[tid];
+ const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count);
+ pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
+ pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
+ pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start);
+
+ /* The next subrange starts where the previous ended */
+ range_start = range_end;
+ }
+
+ /*
+ * Update the threadpool command.
+ * Imporantly, do it after initializing command parameters (range, task, argument, flags)
+ * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
+ * to ensure the unmasked command is different then the last command, because worker threads
+ * monitor for change in the unmasked command.
+ */
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
+
+ /*
+ * Store the command with release semantics to guarantee that if a worker thread observes
+ * the new command value, it also observes the updated command parameters.
+ *
+ * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
+ * rather than on the event object.
+ */
+ pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
+
+ /*
+ * Signal the event to wake up the threads.
+ * Event in use must be switched after every submitted command to avoid race conditions.
+ * Choose the event based on the high bit of the command, which is flipped on every update.
+ */
+ const uint32_t event_index = (old_command >> 31);
+ const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
+ assert(set_event_status != FALSE);
+
+ /* Save and modify FPU denormals control, if needed */
+ struct fpu_state saved_fpu_state = { 0 };
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ /* Do computations as worker #0 */
+ thread_function(threadpool, &threadpool->threads[0]);
+
+ /* Restore FPU denormals control, if needed */
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+
+ /*
+ * Wait until the threads finish computation
+ * Use the complementary event because it corresponds to the new command.
+ */
+ wait_worker_threads(threadpool, event_index ^ 1);
+
+ /*
+ * Reset events for the next command.
+ * Note: the reset events are different from the events used for synchronization in this update.
+ */
+ BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
+ assert(reset_event_status != FALSE);
+ reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
+ assert(reset_event_status != FALSE);
+
+ /* Make changes by other threads visible to this thread */
+ pthreadpool_fence_acquire();
+
+ /* Unprotect the global threadpool structures */
+ const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
+ assert(release_mutex_status != FALSE);
+}
+
+void pthreadpool_destroy(struct pthreadpool* threadpool) {
+ if (threadpool != NULL) {
+ const size_t threads_count = threadpool->threads_count;
+ if (threads_count > 1) {
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+
+ /*
+ * Store the command with release semantics to guarantee that if a worker thread observes
+ * the new command value, it also observes the updated active_threads values.
+ */
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
+
+ /*
+ * Signal the event to wake up the threads.
+ * Event in use must be switched after every submitted command to avoid race conditions.
+ * Choose the event based on the high bit of the command, which is flipped on every update.
+ */
+ const uint32_t event_index = (old_command >> 31);
+ const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
+ assert(set_event_status != FALSE);
+
+ /* Wait until all threads return */
+ for (size_t tid = 1; tid < threads_count; tid++) {
+ const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
+ if (thread_handle != NULL) {
+ const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ const BOOL close_status = CloseHandle(thread_handle);
+ assert(close_status != FALSE);
+ }
+ }
+
+ /* Release resources */
+ if (threadpool->execution_mutex != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->execution_mutex);
+ assert(close_status != FALSE);
+ }
+ for (size_t i = 0; i < 2; i++) {
+ if (threadpool->command_event[i] != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->command_event[i]);
+ assert(close_status != FALSE);
+ }
+ if (threadpool->completion_event != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
+ assert(close_status != FALSE);
+ }
+ }
+ }
+ pthreadpool_deallocate(threadpool);
+ }
+}