From be9c89379384a261026c8bf517ec3ed651bb171c Mon Sep 17 00:00:00 2001 From: Marat Dukhan Date: Wed, 1 Apr 2020 22:40:00 -0700 Subject: Implementation using Grand Central Dispatch --- CMakeLists.txt | 18 +++++-- src/gcd.c | 135 ++++++++++++++++++++++++++++++++++++++++++++++++ src/threadpool-common.h | 17 ++++++ src/threadpool-object.h | 29 +++++++++-- 4 files changed, 190 insertions(+), 9 deletions(-) create mode 100644 src/gcd.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 6043fe4..da42de5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,8 +7,8 @@ PROJECT(pthreadpool C CXX) SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build") SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared) OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON) -SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, or default) for worker threads") -SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex) +SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, or default) for worker threads") +SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd) IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}") OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON) OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON) @@ -65,10 +65,15 @@ ENDIF() IF(PTHREADPOOL_ALLOW_DEPRECATED_API) SET(PTHREADPOOL_SRCS src/legacy-api.c) ENDIF() -IF(CMAKE_SYSTEM_NAME STREQUAL "Emscripten") +IF(EMSCRIPTEN) LIST(APPEND PTHREADPOOL_SRCS src/shim.c) ELSE() - LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c src/pthreads.c) + LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c) + IF(APPLE AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd")) + LIST(APPEND PTHREADPOOL_SRCS src/gcd.c) + ELSE() + LIST(APPEND PTHREADPOOL_SRCS src/pthreads.c) + ENDIF() ENDIF() ADD_LIBRARY(pthreadpool_interface INTERFACE) @@ -90,8 +95,13 @@ ENDIF() IF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "condvar") TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0) ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "futex") TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=1) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0) +ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd") + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0) + TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=1) ELSEIF(NOT PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default") MESSAGE(FATAL_ERROR "Unsupported synchronization primitive ${PTHREADPOOL_SYNC_PRIMITIVE}") ENDIF() diff --git a/src/gcd.c b/src/gcd.c new file mode 100644 index 0000000..5179f4c --- /dev/null +++ b/src/gcd.c @@ -0,0 +1,135 @@ +/* Standard C headers */ +#include +#include +#include +#include +#include + +/* Configuration header */ +#include "threadpool-common.h" + +/* Mach headers */ +#include +#include +#include + +/* Public library header */ +#include + +/* Internal library headers */ +#include "threadpool-atomics.h" +#include "threadpool-object.h" +#include "threadpool-utils.h" + + +static void thread_main(void* arg, size_t thread_index) { + struct pthreadpool* threadpool = (struct pthreadpool*) arg; + struct thread_info* thread = &threadpool->threads[thread_index]; + + const uint32_t flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); + const thread_function_t thread_function = + (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); + + struct fpu_state saved_fpu_state = { 0 }; + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + saved_fpu_state = get_fpu_state(); + disable_fpu_denormals(); + } + + thread_function(threadpool, thread); + + if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { + set_fpu_state(saved_fpu_state); + } +} + +struct pthreadpool* pthreadpool_create(size_t threads_count) { + if (threads_count == 0) { + int threads = 1; + size_t sizeof_threads = sizeof(threads); + if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) { + return NULL; + } + + if (threads <= 0) { + return NULL; + } + + threads_count = (size_t) threads; + } + + struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + if (threadpool == NULL) { + return NULL; + } + threadpool->threads_count = threads_count; + for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->threads[tid].thread_number = tid; + } + + /* Thread pool with a single thread computes everything on the caller thread. */ + if (threads_count > 1) { + threadpool->execution_semaphore = dispatch_semaphore_create(1); + } + return threadpool; +} + +PTHREADPOOL_INTERNAL void pthreadpool_parallelize( + struct pthreadpool* threadpool, + thread_function_t thread_function, + const void* params, + size_t params_size, + void* task, + void* context, + size_t linear_range, + uint32_t flags) +{ + assert(threadpool != NULL); + assert(thread_function != NULL); + assert(task != NULL); + assert(linear_range > 1); + + /* Protect the global threadpool structures */ + dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER); + + /* Setup global arguments */ + pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); + pthreadpool_store_relaxed_void_p(&threadpool->task, task); + pthreadpool_store_relaxed_void_p(&threadpool->argument, context); + pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); + + /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ + const size_t threads_count = threadpool->threads_count; + + if (params_size != 0) { + memcpy(&threadpool->params, params, params_size); + } + + /* Spread the work between threads */ + size_t range_start = 0; + for (size_t tid = 0; tid < threads_count; tid++) { + struct thread_info* thread = &threadpool->threads[tid]; + const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count); + pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); + pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); + pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start); + + /* The next subrange starts where the previous ended */ + range_start = range_end; + } + + dispatch_apply_f(threads_count, DISPATCH_APPLY_AUTO, threadpool, thread_main); + + /* Unprotect the global threadpool structures */ + dispatch_semaphore_signal(threadpool->execution_semaphore); +} + +void pthreadpool_destroy(struct pthreadpool* threadpool) { + if (threadpool != NULL) { + if (threadpool->execution_semaphore != NULL) { + /* Release resources */ + dispatch_release(threadpool->execution_semaphore); + } + pthreadpool_deallocate(threadpool); + } +} diff --git a/src/threadpool-common.h b/src/threadpool-common.h index 1a18c60..ba5587e 100644 --- a/src/threadpool-common.h +++ b/src/threadpool-common.h @@ -14,6 +14,23 @@ #endif #endif +#ifndef PTHREADPOOL_USE_GCD + #if defined(__APPLE__) + #define PTHREADPOOL_USE_GCD 1 + #else + #define PTHREADPOOL_USE_GCD 0 + #endif +#endif + +#ifndef PTHREADPOOL_USE_CONDVAR + #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX + #define PTHREADPOOL_USE_CONDVAR 0 + #else + #define PTHREADPOOL_USE_CONDVAR 1 + #endif +#endif + + /* Number of iterations in spin-wait loop before going into futex/condvar wait */ #define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000 diff --git a/src/threadpool-object.h b/src/threadpool-object.h index 95ccd6b..0b52964 100644 --- a/src/threadpool-object.h +++ b/src/threadpool-object.h @@ -4,16 +4,23 @@ #include #include +/* Internal headers */ +#include "threadpool-common.h" +#include "threadpool-atomics.h" + /* POSIX headers */ +#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX #include +#endif + +/* Mach headers */ +#if PTHREADPOOL_USE_GCD +#include +#endif /* Library header */ #include -/* Internal headers */ -#include "threadpool-common.h" -#include "threadpool-atomics.h" - #define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF) @@ -49,10 +56,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { * Thread pool which owns the thread. */ struct pthreadpool* threadpool; +#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX /** * The pthread object corresponding to the thread. */ pthread_t thread_object; +#endif }; PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0, @@ -70,10 +79,12 @@ struct pthreadpool_1d_with_uarch_params { }; struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { +#if !PTHREADPOOL_USE_GCD /** * The number of threads that are processing an operation. */ pthreadpool_atomic_size_t active_threads; +#endif #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -110,11 +121,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * Copy of the flags passed to a parallelization function. */ pthreadpool_atomic_uint32_t flags; +#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX /** * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. */ pthread_mutex_t execution_mutex; -#if !PTHREADPOOL_USE_FUTEX +#endif +#if PTHREADPOOL_USE_GCD + /** + * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads. + */ + dispatch_semaphore_t execution_semaphore; +#endif +#if PTHREADPOOL_USE_CONDVAR /** * Guards access to the @a active_threads variable. */ -- cgit v1.2.3