diff options
author | Marat Dukhan <maratek@google.com> | 2020-04-01 17:08:30 -0700 |
---|---|---|
committer | Marat Dukhan <maratek@google.com> | 2020-04-01 17:08:30 -0700 |
commit | fc793bc6d7eab64756df79971556594bf4ab145b (patch) | |
tree | 669f93c06ca276279aa3acb6b91edb8029e5f5a7 | |
parent | 5b41aa6060588e26fd25ace6dc4afccfd4793997 (diff) | |
download | pthreadpool-fc793bc6d7eab64756df79971556594bf4ab145b.tar.gz |
Refactor pthreadpool implementation
Split implementation into two types of components:
- Components dependent on threading API
- Portable components
-rw-r--r-- | CMakeLists.txt | 55 | ||||
-rw-r--r-- | src/legacy-api.c (renamed from src/threadpool-legacy.c) | 0 | ||||
-rw-r--r-- | src/memory.c | 67 | ||||
-rw-r--r-- | src/portable-api.c (renamed from src/threadpool-pthreads.c) | 747 | ||||
-rw-r--r-- | src/pthreads.c | 462 | ||||
-rw-r--r-- | src/shim.c (renamed from src/threadpool-shim.c) | 0 | ||||
-rw-r--r-- | src/threadpool-atomics.h | 27 | ||||
-rw-r--r-- | src/threadpool-common.h | 44 | ||||
-rw-r--r-- | src/threadpool-object.h | 164 | ||||
-rw-r--r-- | src/threadpool-utils.h | 32 |
10 files changed, 840 insertions, 758 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 79a17a1..3282f8d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,6 +9,8 @@ PROJECT(pthreadpool C CXX) SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build") SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared) OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON) +SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, or default) for worker threads") +SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex) IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}") OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON) OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON) @@ -39,21 +41,6 @@ IF(NOT DEFINED FXDIV_SOURCE_DIR) SET(FXDIV_SOURCE_DIR "${CMAKE_BINARY_DIR}/FXdiv-source" CACHE STRING "FXdiv source directory") ENDIF() -IF(CMAKE_SYSTEM_NAME MATCHES "^(Linux|Android)$" AND CMAKE_SYSTEM_PROCESSOR MATCHES "^(armv[5-8].*|aarch64)$") - IF(NOT DEFINED CPUINFO_SOURCE_DIR) - MESSAGE(STATUS "Downloading cpuinfo to ${CMAKE_BINARY_DIR}/cpuinfo-source (define CPUINFO_SOURCE_DIR to avoid it)") - CONFIGURE_FILE(cmake/DownloadCpuinfo.cmake "${CMAKE_BINARY_DIR}/cpuinfo-download/CMakeLists.txt") - EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . - WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/cpuinfo-download") - EXECUTE_PROCESS(COMMAND "${CMAKE_COMMAND}" --build . - WORKING_DIRECTORY "${CMAKE_BINARY_DIR}/cpuinfo-download") - SET(CPUINFO_SOURCE_DIR "${CMAKE_BINARY_DIR}/cpuinfo-source" CACHE STRING "cpuinfo source directory") - ENDIF() - SET(PTHREADPOOL_USE_CPUINFO ON) -ELSE() - SET(PTHREADPOOL_USE_CPUINFO OFF) -ENDIF() - IF(PTHREADPOOL_BUILD_TESTS AND NOT DEFINED GOOGLETEST_SOURCE_DIR) MESSAGE(STATUS "Downloading Google Test to ${CMAKE_BINARY_DIR}/googletest-source (define GOOGLETEST_SOURCE_DIR to avoid it)") CONFIGURE_FILE(cmake/DownloadGoogleTest.cmake "${CMAKE_BINARY_DIR}/googletest-download/CMakeLists.txt") @@ -76,20 +63,15 @@ ENDIF() # ---[ pthreadpool library IF(PTHREADPOOL_ALLOW_DEPRECATED_API) - SET(PTHREADPOOL_SRCS src/threadpool-legacy.c) + SET(PTHREADPOOL_SRCS src/legacy-api.c) ENDIF() IF(CMAKE_SYSTEM_NAME STREQUAL "Emscripten") - LIST(APPEND PTHREADPOOL_SRCS src/threadpool-shim.c) + LIST(APPEND PTHREADPOOL_SRCS src/shim.c) ELSE() - LIST(APPEND PTHREADPOOL_SRCS src/threadpool-pthreads.c) + LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c src/pthreads.c) ENDIF() -IF(${CMAKE_VERSION} VERSION_LESS "3.0") - ADD_LIBRARY(pthreadpool_interface STATIC include/pthreadpool.h) - SET_TARGET_PROPERTIES(pthreadpool_interface PROPERTIES LINKER_LANGUAGE C) -ELSE() - ADD_LIBRARY(pthreadpool_interface INTERFACE) -ENDIF() +ADD_LIBRARY(pthreadpool_interface INTERFACE) TARGET_INCLUDE_DIRECTORIES(pthreadpool_interface INTERFACE include) IF(NOT PTHREADPOOL_ALLOW_DEPRECATED_API) TARGET_COMPILE_DEFINITIONS(pthreadpool_interface INTERFACE PTHREADPOOL_NO_DEPRECATED_API=1) @@ -106,6 +88,14 @@ ELSE() MESSAGE(FATAL_ERROR "Unsupported library type ${PTHREADPOOL_LIBRARY_TYPE}") ENDIF() +IF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "condvar") + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) +ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "futex") + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=1) +ELSEIF(NOT PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default") + MESSAGE(FATAL_ERROR "Unsupported synchronization primitive ${PTHREADPOOL_SYNC_PRIMITIVE}") +ENDIF() + SET_TARGET_PROPERTIES(pthreadpool PROPERTIES C_STANDARD 11 C_EXTENSIONS NO) @@ -137,23 +127,6 @@ IF(NOT TARGET fxdiv) ENDIF() TARGET_LINK_LIBRARIES(pthreadpool PRIVATE fxdiv) -# ---[ Configure cpuinfo -IF(PTHREADPOOL_USE_CPUINFO) - IF(NOT TARGET cpuinfo) - SET(CPUINFO_BUILD_TOOLS OFF CACHE BOOL "") - SET(CPUINFO_BUILD_UNIT_TESTS OFF CACHE BOOL "") - SET(CPUINFO_BUILD_MOCK_TESTS OFF CACHE BOOL "") - SET(CPUINFO_BUILD_BENCHMARKS OFF CACHE BOOL "") - ADD_SUBDIRECTORY( - "${CPUINFO_SOURCE_DIR}" - "${CMAKE_BINARY_DIR}/cpuinfo") - ENDIF() - TARGET_LINK_LIBRARIES(pthreadpool PRIVATE cpuinfo) - TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_CPUINFO=1) -ELSE() - TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_CPUINFO=0) -ENDIF() - INSTALL(TARGETS pthreadpool LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/src/threadpool-legacy.c b/src/legacy-api.c index 43fb798..43fb798 100644 --- a/src/threadpool-legacy.c +++ b/src/legacy-api.c diff --git a/src/memory.c b/src/memory.c new file mode 100644 index 0000000..020cb6d --- /dev/null +++ b/src/memory.c @@ -0,0 +1,67 @@ +/* Standard C headers */ +#include <assert.h> +#include <stddef.h> +#include <stdlib.h> +#include <string.h> + +/* POSIX headers */ +#ifdef __ANDROID__ + #include <malloc.h> +#endif + +/* Windows headers */ +#ifdef _WIN32 + #define NOMINMAX + #include <malloc.h> +#endif + +/* Internal headers */ +#include "threadpool-common.h" +#include "threadpool-object.h" + + +PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate( + size_t threads_count) +{ + assert(threads_count >= 1); + + const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info); + struct pthreadpool* threadpool = NULL; + #if defined(__ANDROID__) + /* + * Android didn't get posix_memalign until API level 17 (Android 4.2). + * Use (otherwise obsolete) memalign function on Android platform. + */ + threadpool = memalign(PTHREADPOOL_CACHELINE_SIZE, threadpool_size); + if (threadpool == NULL) { + return NULL; + } + #elif defined(_WIN32) + threadpool = _aligned_malloc(threadpool_size, PTHREADPOOL_CACHELINE_SIZE); + if (threadpool == NULL) { + return NULL; + } + #else + if (posix_memalign((void**) &threadpool, PTHREADPOOL_CACHELINE_SIZE, threadpool_size) != 0) { + return NULL; + } + #endif + memset(threadpool, 0, threadpool_size); + return threadpool; +} + + +PTHREADPOOL_INTERNAL void pthreadpool_deallocate( + struct pthreadpool* threadpool) +{ + assert(threadpool != NULL); + + const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->threads_count * sizeof(struct thread_info); + memset(threadpool, 0, threadpool_size); + + #ifdef _WIN32 + _aligned_free(threadpool); + #else + free(threadpool); + #endif +} diff --git a/src/threadpool-pthreads.c b/src/portable-api.c index 0e5b44e..5cf09c0 100644 --- a/src/threadpool-pthreads.c +++ b/src/portable-api.c @@ -5,57 +5,10 @@ #include <stdlib.h> #include <string.h> -/* POSIX headers */ -#include <pthread.h> -#include <unistd.h> - -#ifndef PTHREADPOOL_USE_CPUINFO - #define PTHREADPOOL_USE_CPUINFO 0 -#endif - -#ifndef PTHREADPOOL_USE_FUTEX - #if defined(__linux__) - #define PTHREADPOOL_USE_FUTEX 1 - #elif defined(__EMSCRIPTEN__) - #define PTHREADPOOL_USE_FUTEX 1 - #else - #define PTHREADPOOL_USE_FUTEX 0 - #endif -#endif - #if PTHREADPOOL_USE_CPUINFO #include <cpuinfo.h> #endif -/* Futex-specific headers */ -#if PTHREADPOOL_USE_FUTEX - #if defined(__linux__) - #include <sys/syscall.h> - #include <linux/futex.h> - - /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ - #ifndef SYS_futex - #define SYS_futex __NR_futex - #endif - #ifndef FUTEX_PRIVATE_FLAG - #define FUTEX_PRIVATE_FLAG 128 - #endif - #elif defined(__EMSCRIPTEN__) - /* math.h for INFINITY constant */ - #include <math.h> - - #include <emscripten/threading.h> - #else - #error "Platform-specific implementation of futex_wait and futex_wake_all required" - #endif -#endif - -#ifdef _WIN32 - #define NOMINMAX - #include <malloc.h> - #include <sysinfoapi.h> -#endif - /* Dependencies */ #include <fxdiv.h> @@ -65,303 +18,26 @@ /* Internal headers */ #include "threadpool-utils.h" #include "threadpool-atomics.h" +#include "threadpool-object.h" -/* Number of iterations in spin-wait loop before going into futex/mutex wait */ -#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 - -#define PTHREADPOOL_CACHELINE_SIZE 64 -#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE))) - -#if defined(__clang__) - #if __has_extension(c_static_assert) || __has_feature(c_static_assert) - #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) - #else - #define PTHREADPOOL_STATIC_ASSERT(predicate, message) - #endif -#elif defined(__GNUC__) && ((__GNUC__ > 4) || (__GNUC__ == 4) && (__GNUC_MINOR__ >= 6)) - /* Static assert is supported by gcc >= 4.6 */ - #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) -#else - #define PTHREADPOOL_STATIC_ASSERT(predicate, message) -#endif - -static inline size_t multiply_divide(size_t a, size_t b, size_t d) { - #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4) - return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d); - #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) - return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d); - #else - #error "Unsupported platform" - #endif -} - -static inline size_t divide_round_up(size_t dividend, size_t divisor) { - if (dividend % divisor == 0) { - return dividend / divisor; - } else { - return dividend / divisor + 1; - } -} - -static inline size_t min(size_t a, size_t b) { - return a < b ? a : b; -} - -#if PTHREADPOOL_USE_FUTEX - #if defined(__linux__) - static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { - return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); - } - - static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { - return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); - } - #elif defined(__EMSCRIPTEN__) - static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { - return emscripten_futex_wait((volatile void*) address, value, INFINITY); - } - - static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { - return emscripten_futex_wake((volatile void*) address, INT_MAX); - } - #else - #error "Platform-specific implementation of futex_wait and futex_wake_all required" - #endif -#endif - -#define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF) - -enum threadpool_command { - threadpool_command_init, - threadpool_command_parallelize, - threadpool_command_shutdown, -}; - -struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { - /** - * Index of the first element in the work range. - * Before processing a new element the owning worker thread increments this value. - */ - pthreadpool_atomic_size_t range_start; - /** - * Index of the element after the last element of the work range. - * Before processing a new element the stealing worker thread decrements this value. - */ - pthreadpool_atomic_size_t range_end; - /** - * The number of elements in the work range. - * Due to race conditions range_length <= range_end - range_start. - * The owning worker thread must decrement this value before incrementing @a range_start. - * The stealing worker thread must decrement this value before decrementing @a range_end. - */ - pthreadpool_atomic_size_t range_length; - /** - * Thread number in the 0..threads_count-1 range. - */ - size_t thread_number; - /** - * The pthread object corresponding to the thread. - */ - pthread_t thread_object; -}; - -PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, "thread_info structure must occupy an integer number of cache lines (64 bytes)"); - -struct pthreadpool_1d_with_uarch_params { - /** - * Copy of the default uarch index argument passed to a microarchitecture-aware parallelization function. - */ - uint32_t default_uarch_index; - /** - * Copy of the max uarch index argument passed to a microarchitecture-aware parallelization function. - */ - uint32_t max_uarch_index; -}; - -struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { - /** - * The number of threads that are processing an operation. - */ - pthreadpool_atomic_size_t active_threads; -#if PTHREADPOOL_USE_FUTEX - /** - * Indicates if there are active threads. - * Only two values are possible: - * - has_active_threads == 0 if active_threads == 0 - * - has_active_threads == 1 if active_threads != 0 - */ - pthreadpool_atomic_uint32_t has_active_threads; -#endif - /** - * The last command submitted to the thread pool. - */ - pthreadpool_atomic_uint32_t command; - /** - * The entry point function to call for each thread in the thread pool for parallelization tasks. - */ - pthreadpool_atomic_void_p thread_function; - /** - * The function to call for each item. - */ - pthreadpool_atomic_void_p task; - /** - * The first argument to the item processing function. - */ - pthreadpool_atomic_void_p argument; - /** - * Additional parallelization parameters. - * These parameters are specific for each thread_function. - */ - union { - struct pthreadpool_1d_with_uarch_params parallelize_1d_with_uarch; - } params; - /** - * Copy of the flags passed to a parallelization function. - */ - pthreadpool_atomic_uint32_t flags; - /** - * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. - */ - pthread_mutex_t execution_mutex; -#if !PTHREADPOOL_USE_FUTEX - /** - * Guards access to the @a active_threads variable. - */ - pthread_mutex_t completion_mutex; - /** - * Condition variable to wait until all threads complete an operation (until @a active_threads is zero). - */ - pthread_cond_t completion_condvar; - /** - * Guards access to the @a command variable. - */ - pthread_mutex_t command_mutex; - /** - * Condition variable to wait for change of the @a command variable. - */ - pthread_cond_t command_condvar; -#endif -#if PTHREADPOOL_USE_CPUINFO - /** - * Indication whether cpuinfo library initialized successfully. Never changes after pthreadpool_create. - */ - bool cpuinfo_is_initialized; -#endif - /** - * The number of threads in the thread pool. Never changes after pthreadpool_create. - */ - size_t threads_count; - /** - * Thread information structures that immediately follow this structure. - */ - struct thread_info threads[]; -}; - -PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0, "pthreadpool structure must occupy an integer number of cache lines (64 bytes)"); -static void checkin_worker_thread(struct pthreadpool* threadpool) { - #if PTHREADPOOL_USE_FUTEX - if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); - futex_wake_all(&threadpool->has_active_threads); - } - #else - pthread_mutex_lock(&threadpool->completion_mutex); - if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { - pthread_cond_signal(&threadpool->completion_condvar); - } - pthread_mutex_unlock(&threadpool->completion_mutex); - #endif -} - -static void wait_worker_threads(struct pthreadpool* threadpool) { - /* Initial check */ - #if PTHREADPOOL_USE_FUTEX - uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); - if (has_active_threads == 0) { - return; - } - #else - size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); - if (active_threads == 0) { - return; - } - #endif - - /* Spin-wait */ - for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { - /* This fence serves as a sleep instruction */ - pthreadpool_fence_acquire(); - - #if PTHREADPOOL_USE_FUTEX - has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); - if (has_active_threads == 0) { - return; - } - #else - active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); - if (active_threads == 0) { - return; - } - #endif +size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { + if (threadpool == NULL) { + return 1; } - /* Fall-back to mutex/futex wait */ - #if PTHREADPOOL_USE_FUTEX - while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { - futex_wait(&threadpool->has_active_threads, 1); - } - #else - pthread_mutex_lock(&threadpool->completion_mutex); - while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { - pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); - }; - pthread_mutex_unlock(&threadpool->completion_mutex); - #endif + return threadpool->threads_count; } -inline static bool atomic_decrement(pthreadpool_atomic_size_t* value) { - #if defined(__clang__) && (defined(__arm__) || defined(__aarch64__)) - size_t actual_value; - do { - actual_value = __builtin_arm_ldrex((const volatile size_t*) value); - if (actual_value == 0) { - __builtin_arm_clrex(); - return false; - } - } while (__builtin_arm_strex(actual_value - 1, (volatile size_t*) value) != 0); - return true; - #else - size_t actual_value = pthreadpool_load_relaxed_size_t(value); - if (actual_value == 0) { - return false; - } - while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { - if (actual_value == 0) { - return false; - } - } - return true; - #endif -} - -inline static size_t modulo_decrement(uint32_t i, uint32_t n) { - /* Wrap modulo n, if needed */ - if (i == 0) { - i = n; - } - /* Decrement input variable */ - return i - 1; -} - -typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread); - static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { + assert(threadpool != NULL); + assert(thread != NULL); + const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); /* Process thread's own range of items */ size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); - while (atomic_decrement(&thread->range_length)) { + while (pthreadpool_try_decrement_relaxed_size_t(&thread->range_length)) { task(argument, range_start++); } @@ -373,7 +49,7 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; - while (atomic_decrement(&other_thread->range_length)) { + while (pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; task(argument, item_id); } @@ -384,23 +60,24 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ } static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, struct thread_info* thread) { + assert(threadpool != NULL); + assert(thread != NULL); + const pthreadpool_task_1d_with_id_t task = (pthreadpool_task_1d_with_id_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); const uint32_t default_uarch_index = threadpool->params.parallelize_1d_with_uarch.default_uarch_index; uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO - if (threadpool && threadpool->cpuinfo_is_initialized) { - uarch_index = cpuinfo_get_current_uarch_index(); - if (uarch_index > threadpool->params.parallelize_1d_with_uarch.max_uarch_index) { - uarch_index = default_uarch_index; - } + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > threadpool->params.parallelize_1d_with_uarch.max_uarch_index) { + uarch_index = default_uarch_index; } #endif /* Process thread's own range of items */ size_t range_start = pthreadpool_load_relaxed_size_t(&thread->range_start); - while (atomic_decrement(&thread->range_length)) { + while (pthreadpool_try_decrement_relaxed_size_t(&thread->range_length)) { task(argument, uarch_index, range_start++); } @@ -412,7 +89,7 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, str tid = modulo_decrement(tid, threads_count)) { struct thread_info* other_thread = &threadpool->threads[tid]; - while (atomic_decrement(&other_thread->range_length)) { + while (pthreadpool_try_decrement_relaxed_size_t(&other_thread->range_length)) { const size_t item_id = pthreadpool_fetch_sub_relaxed_size_t(&other_thread->range_end, 1) - 1; task(argument, uarch_index, item_id); } @@ -422,297 +99,6 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, str pthreadpool_fence_release(); } -static uint32_t wait_for_new_command( - struct pthreadpool* threadpool, - uint32_t last_command, - uint32_t last_flags) -{ - uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - if (command != last_command) { - return command; - } - - if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { - /* Spin-wait loop */ - for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { - /* This fence serves as a sleep instruction */ - pthreadpool_fence_acquire(); - - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - if (command != last_command) { - return command; - } - } - } - - /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ - #if PTHREADPOOL_USE_FUTEX - do { - futex_wait(&threadpool->command, last_command); - command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - } while (command == last_command); - #else - /* Lock the command mutex */ - pthread_mutex_lock(&threadpool->command_mutex); - /* Read the command */ - while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { - /* Wait for new command */ - pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); - } - /* Read a new command */ - pthread_mutex_unlock(&threadpool->command_mutex); - #endif - return command; -} - -static void* thread_main(void* arg) { - struct thread_info* thread = (struct thread_info*) arg; - struct pthreadpool* threadpool = ((struct pthreadpool*) (thread - thread->thread_number)) - 1; - uint32_t last_command = threadpool_command_init; - struct fpu_state saved_fpu_state = { 0 }; - uint32_t flags = 0; - - /* Check in */ - checkin_worker_thread(threadpool); - - /* Monitor new commands and act accordingly */ - for (;;) { - uint32_t command = wait_for_new_command(threadpool, last_command, flags); - pthreadpool_fence_acquire(); - - flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); - - /* Process command */ - switch (command & THREADPOOL_COMMAND_MASK) { - case threadpool_command_parallelize: - { - const thread_function_t thread_function = - (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - saved_fpu_state = get_fpu_state(); - disable_fpu_denormals(); - } - - thread_function(threadpool, thread); - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - set_fpu_state(saved_fpu_state); - } - break; - } - case threadpool_command_shutdown: - /* Exit immediately: the master thread is waiting on pthread_join */ - return NULL; - case threadpool_command_init: - /* To inhibit compiler warning */ - break; - } - /* Notify the master thread that we finished processing */ - checkin_worker_thread(threadpool); - /* Update last command */ - last_command = command; - }; -} - -static struct pthreadpool* pthreadpool_allocate(size_t threads_count) { - const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info); - struct pthreadpool* threadpool = NULL; - #if defined(__ANDROID__) - /* - * Android didn't get posix_memalign until API level 17 (Android 4.2). - * Use (otherwise obsolete) memalign function on Android platform. - */ - threadpool = memalign(PTHREADPOOL_CACHELINE_SIZE, threadpool_size); - if (threadpool == NULL) { - return NULL; - } - #elif defined(_WIN32) - threadpool = _aligned_malloc(threadpool_size, PTHREADPOOL_CACHELINE_SIZE); - if (threadpool == NULL) { - return NULL; - } - #else - if (posix_memalign((void**) &threadpool, PTHREADPOOL_CACHELINE_SIZE, threadpool_size) != 0) { - return NULL; - } - #endif - memset(threadpool, 0, threadpool_size); - return threadpool; -} - -struct pthreadpool* pthreadpool_create(size_t threads_count) { - if (threads_count == 0) { - #if defined(_SC_NPROCESSORS_ONLN) - threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); - #if defined(__EMSCRIPTEN_PTHREADS__) - /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ - if (threads_count >= 8) { - threads_count = 8; - } - #endif - #elif defined(_WIN32) - SYSTEM_INFO system_info; - ZeroMemory(&system_info, sizeof(system_info)); - GetSystemInfo(&system_info); - threads_count = (size_t) system_info.dwNumberOfProcessors; - #else - #error "Unsupported platform" - #endif - } - - struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); - if (threadpool == NULL) { - return NULL; - } - threadpool->threads_count = threads_count; - for (size_t tid = 0; tid < threads_count; tid++) { - threadpool->threads[tid].thread_number = tid; - } - #if PTHREADPOOL_USE_CPUINFO - threadpool->cpuinfo_is_initialized = cpuinfo_initialize(); - #endif - - /* Thread pool with a single thread computes everything on the caller thread. */ - if (threads_count > 1) { - pthread_mutex_init(&threadpool->execution_mutex, NULL); - #if !PTHREADPOOL_USE_FUTEX - pthread_mutex_init(&threadpool->completion_mutex, NULL); - pthread_cond_init(&threadpool->completion_condvar, NULL); - pthread_mutex_init(&threadpool->command_mutex, NULL); - pthread_cond_init(&threadpool->command_condvar, NULL); - #endif - - #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - #endif - pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - - /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ - for (size_t tid = 1; tid < threads_count; tid++) { - pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); - } - - /* Wait until all threads initialize */ - wait_worker_threads(threadpool); - } - return threadpool; -} - -size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { - if (threadpool == NULL) { - return 1; - } else { - return threadpool->threads_count; - } -} - -static void pthreadpool_parallelize( - struct pthreadpool* threadpool, - thread_function_t thread_function, - const void* params, - size_t params_size, - void* task, - void* context, - size_t linear_range, - uint32_t flags) -{ - assert(threadpool != NULL); - assert(thread_function != NULL); - assert(task != NULL); - assert(linear_range > 1); - - /* Protect the global threadpool structures */ - pthread_mutex_lock(&threadpool->execution_mutex); - - #if !PTHREADPOOL_USE_FUTEX - /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ - pthread_mutex_lock(&threadpool->command_mutex); - #endif - - /* Setup global arguments */ - pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); - pthreadpool_store_relaxed_void_p(&threadpool->task, task); - pthreadpool_store_relaxed_void_p(&threadpool->argument, context); - pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); - - /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - const size_t threads_count = threadpool->threads_count; - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - #endif - - if (params_size != 0) { - memcpy(&threadpool->params, params, params_size); - pthreadpool_fence_release(); - } - - /* Spread the work between threads */ - size_t range_start = 0; - for (size_t tid = 0; tid < threads_count; tid++) { - struct thread_info* thread = &threadpool->threads[tid]; - const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); - pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); - pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); - pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); - - /* The next subrange starts where the previous ended */ - range_start = range_end; - } - - /* - * Update the threadpool command. - * Imporantly, do it after initializing command parameters (range, task, argument, flags) - * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask - * to ensure the unmasked command is different then the last command, because worker threads - * monitor for change in the unmasked command. - */ - const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); - const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; - - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated command parameters. - * - * Note: release semantics is necessary even with a conditional variable, because the workers might - * be waiting in a spin-loop rather than the conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, new_command); - #if PTHREADPOOL_USE_FUTEX - /* Wake up the threads */ - futex_wake_all(&threadpool->command); - #else - /* Unlock the command variables before waking up the threads for better performance */ - pthread_mutex_unlock(&threadpool->command_mutex); - - /* Wake up the threads */ - pthread_cond_broadcast(&threadpool->command_condvar); - #endif - - /* Save and modify FPU denormals control, if needed */ - struct fpu_state saved_fpu_state = { 0 }; - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - saved_fpu_state = get_fpu_state(); - disable_fpu_denormals(); - } - - /* Do computations as worker #0 */ - thread_function(threadpool, &threadpool->threads[0]); - - /* Restore FPU denormals control, if needed */ - if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { - set_fpu_state(saved_fpu_state); - } - - /* Wait until the threads finish computation */ - wait_worker_threads(threadpool); - - /* Make changes by other threads visible to this thread */ - pthreadpool_fence_acquire(); - - /* Unprotect the global threadpool structures */ - pthread_mutex_unlock(&threadpool->execution_mutex); -} - void pthreadpool_parallelize_1d( struct pthreadpool* threadpool, pthreadpool_task_1d_t task, @@ -754,11 +140,9 @@ void pthreadpool_parallelize_1d_with_uarch( uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO - if (threadpool && threadpool->cpuinfo_is_initialized) { - uarch_index = cpuinfo_get_current_uarch_index(); - if (uarch_index > max_uarch_index) { - uarch_index = default_uarch_index; - } + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; } #endif @@ -1047,11 +431,9 @@ void pthreadpool_parallelize_2d_tile_2d_with_uarch( uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO - if (threadpool && threadpool->cpuinfo_is_initialized) { - uarch_index = cpuinfo_get_current_uarch_index(); - if (uarch_index > max_uarch_index) { - uarch_index = default_uarch_index; - } + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; } #endif @@ -1209,11 +591,9 @@ void pthreadpool_parallelize_3d_tile_2d_with_uarch( uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO - if (threadpool && threadpool->cpuinfo_is_initialized) { - uarch_index = cpuinfo_get_current_uarch_index(); - if (uarch_index > max_uarch_index) { - uarch_index = default_uarch_index; - } + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; } #endif @@ -1388,11 +768,9 @@ void pthreadpool_parallelize_4d_tile_2d_with_uarch( uint32_t uarch_index = default_uarch_index; #if PTHREADPOOL_USE_CPUINFO - if (threadpool && threadpool->cpuinfo_is_initialized) { - uarch_index = cpuinfo_get_current_uarch_index(); - if (uarch_index > max_uarch_index) { - uarch_index = default_uarch_index; - } + uarch_index = cpuinfo_get_current_uarch_index(); + if (uarch_index > max_uarch_index) { + uarch_index = default_uarch_index; } #endif @@ -1630,68 +1008,3 @@ void pthreadpool_parallelize_6d_tile_2d( (void*) compute_6d_tile_2d, &context, range_i * range_j * range_k * range_l * tile_range_m * tile_range_n, flags); } } - -void pthreadpool_destroy(struct pthreadpool* threadpool) { - if (threadpool != NULL) { - const size_t threads_count = threadpool->threads_count; - if (threads_count > 1) { - #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); - - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated active_threads/has_active_threads values. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); - - /* Wake up worker threads */ - futex_wake_all(&threadpool->command); - #else - /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ - pthread_mutex_lock(&threadpool->command_mutex); - - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); - - /* - * Store the command with release semantics to guarantee that if a worker thread observes - * the new command value, it also observes the updated active_threads value. - * - * Note: the release fence inside pthread_mutex_unlock is insufficient, - * because the workers might be waiting in a spin-loop rather than the conditional variable. - */ - pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); - - /* Wake up worker threads */ - pthread_cond_broadcast(&threadpool->command_condvar); - - /* Commit the state changes and let workers start processing */ - pthread_mutex_unlock(&threadpool->command_mutex); - #endif - - /* Wait until all threads return */ - for (size_t thread = 1; thread < threads_count; thread++) { - pthread_join(threadpool->threads[thread].thread_object, NULL); - } - - /* Release resources */ - pthread_mutex_destroy(&threadpool->execution_mutex); - #if !PTHREADPOOL_USE_FUTEX - pthread_mutex_destroy(&threadpool->completion_mutex); - pthread_cond_destroy(&threadpool->completion_condvar); - pthread_mutex_destroy(&threadpool->command_mutex); - pthread_cond_destroy(&threadpool->command_condvar); - #endif - } - #if PTHREADPOOL_USE_CPUINFO - if (threadpool->cpuinfo_is_initialized) { - cpuinfo_deinitialize(); - } - #endif - #ifdef _WIN32 - _aligned_free(threadpool); - #else - free(threadpool); - #endif - } -} diff --git a/src/pthreads.c b/src/pthreads.c new file mode 100644 index 0000000..05c6aac --- /dev/null +++ b/src/pthreads.c @@ -0,0 +1,462 @@ +/* Standard C headers */ +#include <assert.h> +#include <limits.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +/* Configuration header */ +#include "threadpool-common.h" + +/* POSIX headers */ +#include <pthread.h> +#include <unistd.h> + +/* Futex-specific headers */ +#if PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + #include <sys/syscall.h> + #include <linux/futex.h> + + /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ + #ifndef SYS_futex + #define SYS_futex __NR_futex + #endif + #ifndef FUTEX_PRIVATE_FLAG + #define FUTEX_PRIVATE_FLAG 128 + #endif + #elif defined(__EMSCRIPTEN__) + /* math.h for INFINITY constant */ + #include <math.h> + + #include <emscripten/threading.h> + #else + #error "Platform-specific implementation of futex_wait and futex_wake_all required" + #endif +#endif + +/* Windows-specific headers */ +#ifdef _WIN32 + #define NOMINMAX + #include <sysinfoapi.h> +#endif + +/* Dependencies */ +#if PTHREADPOOL_USE_CPUINFO + #include <cpuinfo.h> +#endif + +/* Public library header */ +#include <pthreadpool.h> + +/* Internal library headers */ +#include "threadpool-atomics.h" +#include "threadpool-object.h" +#include "threadpool-utils.h" + + +#if PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { + return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); + } + + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { + return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); + } + #elif defined(__EMSCRIPTEN__) + static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { + return emscripten_futex_wait((volatile void*) address, value, INFINITY); + } + + static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { + return emscripten_futex_wake((volatile void*) address, INT_MAX); + } + #else + #error "Platform-specific implementation of futex_wait and futex_wake_all required" + #endif +#endif + +static void checkin_worker_thread(struct pthreadpool* threadpool) { + #if PTHREADPOOL_USE_FUTEX + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 0); + futex_wake_all(&threadpool->has_active_threads); + } + #else + pthread_mutex_lock(&threadpool->completion_mutex); + if (pthreadpool_fetch_sub_relaxed_size_t(&threadpool->active_threads, 1) == 1) { + pthread_cond_signal(&threadpool->completion_condvar); + } + pthread_mutex_unlock(&threadpool->completion_mutex); + #endif +} + +static void wait_worker_threads(struct pthreadpool* threadpool) { + /* Initial check */ + #if PTHREADPOOL_USE_FUTEX + uint32_t has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + if (has_active_threads == 0) { + return; + } + #else + size_t active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + if (active_threads == 0) { + return; + } + #endif + + /* Spin-wait */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + pthreadpool_fence_acquire(); + + #if PTHREADPOOL_USE_FUTEX + has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads); + if (has_active_threads == 0) { + return; + } + #else + active_threads = pthreadpool_load_relaxed_size_t(&threadpool->active_threads); + if (active_threads == 0) { + return; + } + #endif + } + + /* Fall-back to mutex/futex wait */ + #if PTHREADPOOL_USE_FUTEX + while ((has_active_threads = pthreadpool_load_relaxed_uint32_t(&threadpool->has_active_threads)) != 0) { + futex_wait(&threadpool->has_active_threads, 1); + } + #else + pthread_mutex_lock(&threadpool->completion_mutex); + while (pthreadpool_load_relaxed_size_t(&threadpool->active_threads) != 0) { + pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); + }; + pthread_mutex_unlock(&threadpool->completion_mutex); + #endif +} + +static uint32_t wait_for_new_command( + struct pthreadpool* threadpool, + uint32_t last_command, + uint32_t last_flags) +{ + uint32_t command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + if (command != last_command) { + return command; + } + + if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { + /* Spin-wait loop */ + for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { + /* This fence serves as a sleep instruction */ + pthreadpool_fence_acquire(); + + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + if (command != last_command) { + return command; + } + } + } + + /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ + #if PTHREADPOOL_USE_FUTEX + do { + futex_wait(&threadpool->command, last_command); + command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + } while (command == last_command); + #else + /* Lock the command mutex */ + pthread_mutex_lock(&threadpool->command_mutex); + /* Read the command */ + while ((command = pthreadpool_load_relaxed_uint32_t(&threadpool->command)) == last_command) { + /* Wait for new command */ + pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); + } + /* Read a new command */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + return command; +} + +static void* thread_main(void* arg) { + struct thread_info* thread = (struct thread_info*) arg; + struct pthreadpool* threadpool = thread->threadpool; + uint32_t last_command = threadpool_command_init; + struct fpu_state saved_fpu_state = { 0 }; + uint32_t flags = 0; + + /* Check in */ + checkin_worker_thread(threadpool); + + /* Monitor new commands and act accordingly */ + for (;;) { + uint32_t command = wait_for_new_command(threadpool, last_command, flags); + pthreadpool_fence_acquire(); + + flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); + + /* Process command */ + switch (command & THREADPOOL_COMMAND_MASK) { + case threadpool_command_parallelize: + { + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + thread_function(threadpool, thread); + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + break; + } + case threadpool_command_shutdown: + /* Exit immediately: the master thread is waiting on pthread_join */ + return NULL; + case threadpool_command_init: + /* To inhibit compiler warning */ + break; + } + /* Notify the master thread that we finished processing */ + checkin_worker_thread(threadpool); + /* Update last command */ + last_command = command; + }; +} + +struct pthreadpool* pthreadpool_create(size_t threads_count) { + #if PTHREADPOOL_USE_CPUINFO + if (!cpuinfo_initialize()) { + return NULL; + } + #endif + + if (threads_count == 0) { + #if PTHREADPOOL_USE_CPUINFO + threads_count = cpuinfo_get_processors_count(); + #elif defined(_SC_NPROCESSORS_ONLN) + threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + #if defined(__EMSCRIPTEN_PTHREADS__) + /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ + if (threads_count >= 8) { + threads_count = 8; + } + #endif + #elif defined(_WIN32) + SYSTEM_INFO system_info; + ZeroMemory(&system_info, sizeof(system_info)); + GetSystemInfo(&system_info); + threads_count = (size_t) system_info.dwNumberOfProcessors; + #else + #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required" + #endif + } + + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + if (threadpool == NULL) { + return NULL; + } + threadpool->threads_count = threads_count; + for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + threadpool->threads[tid].threadpool = threadpool; + } + + /* Thread pool with a single thread computes everything on the caller thread. */ + if (threads_count > 1) { + pthread_mutex_init(&threadpool->execution_mutex, NULL); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_init(&threadpool->completion_mutex, NULL); + pthread_cond_init(&threadpool->completion_condvar, NULL); + pthread_mutex_init(&threadpool->command_mutex, NULL); + pthread_cond_init(&threadpool->command_condvar, NULL); + #endif + + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + #endif + pthreadpool_store_release_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + + /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ + for (size_t tid = 1; tid < threads_count; tid++) { + pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); + } + + /* Wait until all threads initialize */ + wait_worker_threads(threadpool); + } + return threadpool; +} + +PTHREADPOOL_INTERNAL void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + const void* params, + size_t params_size, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + pthread_mutex_lock(&threadpool->execution_mutex); + + #if !PTHREADPOOL_USE_FUTEX + /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ + pthread_mutex_lock(&threadpool->command_mutex); + #endif + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + #endif + + if (params_size != 0) { + memcpy(&threadpool->params, params, params_size); + pthreadpool_fence_release(); + } + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + /* + * Update the threadpool command. + * Imporantly, do it after initializing command parameters (range, task, argument, flags) + * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask + * to ensure the unmasked command is different then the last command, because worker threads + * monitor for change in the unmasked command. + */ + const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); + const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated command parameters. + * + * Note: release semantics is necessary even with a conditional variable, because the workers might + * be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, new_command); + #if PTHREADPOOL_USE_FUTEX + /* Wake up the threads */ + futex_wake_all(&threadpool->command); + #else + /* Unlock the command variables before waking up the threads for better performance */ + pthread_mutex_unlock(&threadpool->command_mutex); + + /* Wake up the threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + #endif + + /* Save and modify FPU denormals control, if needed */ + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + /* Do computations as worker #0 */ + thread_function(threadpool, &threadpool->threads[0]); + + /* Restore FPU denormals control, if needed */ + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } + + /* Wait until the threads finish computation */ + wait_worker_threads(threadpool); + + /* Make changes by other threads visible to this thread */ + pthreadpool_fence_acquire(); + + /* Unprotect the global threadpool structures */ + pthread_mutex_unlock(&threadpool->execution_mutex); +} + +void pthreadpool_destroy(struct pthreadpool* threadpool) { + if (threadpool != NULL) { + const size_t threads_count = threadpool->threads_count; + if (threads_count > 1) { + #if PTHREADPOOL_USE_FUTEX + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads/has_active_threads values. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); + + /* Wake up worker threads */ + futex_wake_all(&threadpool->command); + #else + /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ + pthread_mutex_lock(&threadpool->command_mutex); + + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + + /* + * Store the command with release semantics to guarantee that if a worker thread observes + * the new command value, it also observes the updated active_threads value. + * + * Note: the release fence inside pthread_mutex_unlock is insufficient, + * because the workers might be waiting in a spin-loop rather than the conditional variable. + */ + pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); + + /* Wake up worker threads */ + pthread_cond_broadcast(&threadpool->command_condvar); + + /* Commit the state changes and let workers start processing */ + pthread_mutex_unlock(&threadpool->command_mutex); + #endif + + /* Wait until all threads return */ + for (size_t thread = 1; thread < threads_count; thread++) { + pthread_join(threadpool->threads[thread].thread_object, NULL); + } + + /* Release resources */ + pthread_mutex_destroy(&threadpool->execution_mutex); + #if !PTHREADPOOL_USE_FUTEX + pthread_mutex_destroy(&threadpool->completion_mutex); + pthread_cond_destroy(&threadpool->completion_condvar); + pthread_mutex_destroy(&threadpool->command_mutex); + pthread_cond_destroy(&threadpool->command_condvar); + #endif + } + #if PTHREADPOOL_USE_CPUINFO + cpuinfo_deinitialize(); + #endif + pthreadpool_deallocate(threadpool); + } +} diff --git a/src/threadpool-shim.c b/src/shim.c index b5670ea..b5670ea 100644 --- a/src/threadpool-shim.c +++ b/src/shim.c diff --git a/src/threadpool-atomics.h b/src/threadpool-atomics.h index 92fcd8d..28c10e3 100644 --- a/src/threadpool-atomics.h +++ b/src/threadpool-atomics.h @@ -176,3 +176,30 @@ atomic_thread_fence(memory_order_release); } #endif + +static inline bool pthreadpool_try_decrement_relaxed_size_t( + pthreadpool_atomic_size_t* value) +{ + #if defined(__clang__) && (defined(__arm__) || defined(__aarch64__)) + size_t actual_value; + do { + actual_value = __builtin_arm_ldrex((const volatile size_t*) value); + if (actual_value == 0) { + __builtin_arm_clrex(); + return false; + } + } while (__builtin_arm_strex(actual_value - 1, (volatile size_t*) value) != 0); + return true; + #else + size_t actual_value = pthreadpool_load_relaxed_size_t(value); + if (actual_value == 0) { + return false; + } + while (!pthreadpool_compare_exchange_weak_relaxed_size_t(value, &actual_value, actual_value - 1)) { + if (actual_value == 0) { + return false; + } + } + return true; + #endif +} diff --git a/src/threadpool-common.h b/src/threadpool-common.h new file mode 100644 index 0000000..1a18c60 --- /dev/null +++ b/src/threadpool-common.h @@ -0,0 +1,44 @@ +#pragma once + +#ifndef PTHREADPOOL_USE_CPUINFO + #define PTHREADPOOL_USE_CPUINFO 0 +#endif + +#ifndef PTHREADPOOL_USE_FUTEX + #if defined(__linux__) + #define PTHREADPOOL_USE_FUTEX 1 + #elif defined(__EMSCRIPTEN__) + #define PTHREADPOOL_USE_FUTEX 1 + #else + #define PTHREADPOOL_USE_FUTEX 0 + #endif +#endif + +/* Number of iterations in spin-wait loop before going into futex/condvar wait */ +#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 + +#define PTHREADPOOL_CACHELINE_SIZE 64 +#define PTHREADPOOL_CACHELINE_ALIGNED __attribute__((__aligned__(PTHREADPOOL_CACHELINE_SIZE))) + +#if defined(__clang__) + #if __has_extension(c_static_assert) || __has_feature(c_static_assert) + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) + #else + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) + #endif +#elif defined(__GNUC__) && ((__GNUC__ > 4) || (__GNUC__ == 4) && (__GNUC_MINOR__ >= 6)) + /* Static assert is supported by gcc >= 4.6 */ + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) _Static_assert((predicate), message) +#else + #define PTHREADPOOL_STATIC_ASSERT(predicate, message) +#endif + +#ifndef PTHREADPOOL_INTERNAL + #if defined(__ELF__) + #define PTHREADPOOL_INTERNAL __attribute__((__visibility__("internal"))) + #elif defined(__MACH__) + #define PTHREADPOOL_INTERNAL __attribute__((__visibility__("hidden"))) + #else + #define PTHREADPOOL_INTERNAL + #endif +#endif diff --git a/src/threadpool-object.h b/src/threadpool-object.h new file mode 100644 index 0000000..95ccd6b --- /dev/null +++ b/src/threadpool-object.h @@ -0,0 +1,164 @@ +#pragma once + +/* Standard C headers */ +#include <stddef.h> +#include <stdint.h> + +/* POSIX headers */ +#include <pthread.h> + +/* Library header */ +#include <pthreadpool.h> + +/* Internal headers */ +#include "threadpool-common.h" +#include "threadpool-atomics.h" + + +#define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF) + +enum threadpool_command { + threadpool_command_init, + threadpool_command_parallelize, + threadpool_command_shutdown, +}; + +struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { + /** + * Index of the first element in the work range. + * Before processing a new element the owning worker thread increments this value. + */ + pthreadpool_atomic_size_t range_start; + /** + * Index of the element after the last element of the work range. + * Before processing a new element the stealing worker thread decrements this value. + */ + pthreadpool_atomic_size_t range_end; + /** + * The number of elements in the work range. + * Due to race conditions range_length <= range_end - range_start. + * The owning worker thread must decrement this value before incrementing @a range_start. + * The stealing worker thread must decrement this value before decrementing @a range_end. + */ + pthreadpool_atomic_size_t range_length; + /** + * Thread number in the 0..threads_count-1 range. + */ + size_t thread_number; + /** + * Thread pool which owns the thread. + */ + struct pthreadpool* threadpool; + /** + * The pthread object corresponding to the thread. + */ + pthread_t thread_object; +}; + +PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, + "thread_info structure must occupy an integer number of cache lines (64 bytes)"); + +struct pthreadpool_1d_with_uarch_params { + /** + * Copy of the default uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t default_uarch_index; + /** + * Copy of the max uarch index argument passed to a microarchitecture-aware parallelization function. + */ + uint32_t max_uarch_index; +}; + +struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { + /** + * The number of threads that are processing an operation. + */ + pthreadpool_atomic_size_t active_threads; +#if PTHREADPOOL_USE_FUTEX + /** + * Indicates if there are active threads. + * Only two values are possible: + * - has_active_threads == 0 if active_threads == 0 + * - has_active_threads == 1 if active_threads != 0 + */ + pthreadpool_atomic_uint32_t has_active_threads; +#endif + /** + * The last command submitted to the thread pool. + */ + pthreadpool_atomic_uint32_t command; + /** + * The entry point function to call for each thread in the thread pool for parallelization tasks. + */ + pthreadpool_atomic_void_p thread_function; + /** + * The function to call for each item. + */ + pthreadpool_atomic_void_p task; + /** + * The first argument to the item processing function. + */ + pthreadpool_atomic_void_p argument; + /** + * Additional parallelization parameters. + * These parameters are specific for each thread_function. + */ + union { + struct pthreadpool_1d_with_uarch_params parallelize_1d_with_uarch; + } params; + /** + * Copy of the flags passed to a parallelization function. + */ + pthreadpool_atomic_uint32_t flags; + /** + * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. + */ + pthread_mutex_t execution_mutex; +#if !PTHREADPOOL_USE_FUTEX + /** + * Guards access to the @a active_threads variable. + */ + pthread_mutex_t completion_mutex; + /** + * Condition variable to wait until all threads complete an operation (until @a active_threads is zero). + */ + pthread_cond_t completion_condvar; + /** + * Guards access to the @a command variable. + */ + pthread_mutex_t command_mutex; + /** + * Condition variable to wait for change of the @a command variable. + */ + pthread_cond_t command_condvar; +#endif + /** + * The number of threads in the thread pool. Never changes after pthreadpool_create. + */ + size_t threads_count; + /** + * Thread information structures that immediately follow this structure. + */ + struct thread_info threads[]; +}; + +PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0, + "pthreadpool structure must occupy an integer number of cache lines (64 bytes)"); + +PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate( + size_t threads_count); + +PTHREADPOOL_INTERNAL void pthreadpool_deallocate( + struct pthreadpool* threadpool); + +typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread); + +PTHREADPOOL_INTERNAL void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + const void* params, + size_t params_size, + void* task, + void* context, + size_t linear_range, + uint32_t flags); diff --git a/src/threadpool-utils.h b/src/threadpool-utils.h index 65c7fb0..1d147e0 100644 --- a/src/threadpool-utils.h +++ b/src/threadpool-utils.h @@ -1,6 +1,7 @@ #pragma once #include <stdint.h> +#include <stddef.h> #if defined(__SSE__) || defined(__x86_64__) #include <xmmintrin.h> @@ -60,3 +61,34 @@ static inline void disable_fpu_denormals() { : [fpcr] "=r" (fpcr)); #endif } + +static inline size_t multiply_divide(size_t a, size_t b, size_t d) { + #if defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 4) + return (size_t) (((uint64_t) a) * ((uint64_t) b)) / ((uint64_t) d); + #elif defined(__SIZEOF_SIZE_T__) && (__SIZEOF_SIZE_T__ == 8) + return (size_t) (((__uint128_t) a) * ((__uint128_t) b)) / ((__uint128_t) d); + #else + #error "Platform-specific implementation of multiply_divide required" + #endif +} + +static inline size_t modulo_decrement(uint32_t i, uint32_t n) { + /* Wrap modulo n, if needed */ + if (i == 0) { + i = n; + } + /* Decrement input variable */ + return i - 1; +} + +static inline size_t divide_round_up(size_t dividend, size_t divisor) { + if (dividend % divisor == 0) { + return dividend / divisor; + } else { + return dividend / divisor + 1; + } +} + +static inline size_t min(size_t a, size_t b) { + return a < b ? a : b; +} |