aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@gmail.com>2020-04-01 22:40:00 -0700
committerMarat Dukhan <maratek@gmail.com>2020-04-01 22:40:00 -0700
commitbe9c89379384a261026c8bf517ec3ed651bb171c (patch)
treeaad41dc2cd010ac96e053b940a7b3bf31ecd4be5
parent23bc8d1e42d73e5df79bba61048b8da9bf14c194 (diff)
downloadpthreadpool-be9c89379384a261026c8bf517ec3ed651bb171c.tar.gz
Implementation using Grand Central Dispatch
-rw-r--r--CMakeLists.txt18
-rw-r--r--src/gcd.c135
-rw-r--r--src/threadpool-common.h17
-rw-r--r--src/threadpool-object.h29
4 files changed, 190 insertions, 9 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6043fe4..da42de5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,8 +7,8 @@ PROJECT(pthreadpool C CXX)
SET(PTHREADPOOL_LIBRARY_TYPE "default" CACHE STRING "Type of library (shared, static, or default) to build")
SET_PROPERTY(CACHE PTHREADPOOL_LIBRARY_TYPE PROPERTY STRINGS default static shared)
OPTION(PTHREADPOOL_ALLOW_DEPRECATED_API "Enable deprecated API functions" ON)
-SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, or default) for worker threads")
-SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex)
+SET(PTHREADPOOL_SYNC_PRIMITIVE "default" CACHE STRING "Synchronization primitive (condvar, futex, gcd, or default) for worker threads")
+SET_PROPERTY(CACHE PTHREADPOOL_SYNC_PRIMITIVE PROPERTY STRINGS default condvar futex gcd)
IF("${CMAKE_SOURCE_DIR}" STREQUAL "${PROJECT_SOURCE_DIR}")
OPTION(PTHREADPOOL_BUILD_TESTS "Build pthreadpool unit tests" ON)
OPTION(PTHREADPOOL_BUILD_BENCHMARKS "Build pthreadpool micro-benchmarks" ON)
@@ -65,10 +65,15 @@ ENDIF()
IF(PTHREADPOOL_ALLOW_DEPRECATED_API)
SET(PTHREADPOOL_SRCS src/legacy-api.c)
ENDIF()
-IF(CMAKE_SYSTEM_NAME STREQUAL "Emscripten")
+IF(EMSCRIPTEN)
LIST(APPEND PTHREADPOOL_SRCS src/shim.c)
ELSE()
- LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c src/pthreads.c)
+ LIST(APPEND PTHREADPOOL_SRCS src/portable-api.c src/memory.c)
+ IF(APPLE AND (PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default" OR PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd"))
+ LIST(APPEND PTHREADPOOL_SRCS src/gcd.c)
+ ELSE()
+ LIST(APPEND PTHREADPOOL_SRCS src/pthreads.c)
+ ENDIF()
ENDIF()
ADD_LIBRARY(pthreadpool_interface INTERFACE)
@@ -90,8 +95,13 @@ ENDIF()
IF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "condvar")
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0)
ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "futex")
TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=1)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=0)
+ELSEIF(PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "gcd")
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_FUTEX=0)
+ TARGET_COMPILE_DEFINITIONS(pthreadpool PRIVATE PTHREADPOOL_USE_GCD=1)
ELSEIF(NOT PTHREADPOOL_SYNC_PRIMITIVE STREQUAL "default")
MESSAGE(FATAL_ERROR "Unsupported synchronization primitive ${PTHREADPOOL_SYNC_PRIMITIVE}")
ENDIF()
diff --git a/src/gcd.c b/src/gcd.c
new file mode 100644
index 0000000..5179f4c
--- /dev/null
+++ b/src/gcd.c
@@ -0,0 +1,135 @@
+/* Standard C headers */
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Configuration header */
+#include "threadpool-common.h"
+
+/* Mach headers */
+#include <dispatch/dispatch.h>
+#include <sys/types.h>
+#include <sys/sysctl.h>
+
+/* Public library header */
+#include <pthreadpool.h>
+
+/* Internal library headers */
+#include "threadpool-atomics.h"
+#include "threadpool-object.h"
+#include "threadpool-utils.h"
+
+
+static void thread_main(void* arg, size_t thread_index) {
+ struct pthreadpool* threadpool = (struct pthreadpool*) arg;
+ struct thread_info* thread = &threadpool->threads[thread_index];
+
+ const uint32_t flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
+ const thread_function_t thread_function =
+ (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
+
+ struct fpu_state saved_fpu_state = { 0 };
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ thread_function(threadpool, thread);
+
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+}
+
+struct pthreadpool* pthreadpool_create(size_t threads_count) {
+ if (threads_count == 0) {
+ int threads = 1;
+ size_t sizeof_threads = sizeof(threads);
+ if (sysctlbyname("hw.logicalcpu_max", &threads, &sizeof_threads, NULL, 0) != 0) {
+ return NULL;
+ }
+
+ if (threads <= 0) {
+ return NULL;
+ }
+
+ threads_count = (size_t) threads;
+ }
+
+ struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
+ if (threadpool == NULL) {
+ return NULL;
+ }
+ threadpool->threads_count = threads_count;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_number = tid;
+ }
+
+ /* Thread pool with a single thread computes everything on the caller thread. */
+ if (threads_count > 1) {
+ threadpool->execution_semaphore = dispatch_semaphore_create(1);
+ }
+ return threadpool;
+}
+
+PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
+ struct pthreadpool* threadpool,
+ thread_function_t thread_function,
+ const void* params,
+ size_t params_size,
+ void* task,
+ void* context,
+ size_t linear_range,
+ uint32_t flags)
+{
+ assert(threadpool != NULL);
+ assert(thread_function != NULL);
+ assert(task != NULL);
+ assert(linear_range > 1);
+
+ /* Protect the global threadpool structures */
+ dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
+
+ /* Setup global arguments */
+ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
+ pthreadpool_store_relaxed_void_p(&threadpool->task, task);
+ pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
+
+ /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
+ const size_t threads_count = threadpool->threads_count;
+
+ if (params_size != 0) {
+ memcpy(&threadpool->params, params, params_size);
+ }
+
+ /* Spread the work between threads */
+ size_t range_start = 0;
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ struct thread_info* thread = &threadpool->threads[tid];
+ const size_t range_end = multiply_divide(linear_range, tid + 1, threads_count);
+ pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
+ pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
+ pthreadpool_store_relaxed_size_t(&thread->range_length, range_end - range_start);
+
+ /* The next subrange starts where the previous ended */
+ range_start = range_end;
+ }
+
+ dispatch_apply_f(threads_count, DISPATCH_APPLY_AUTO, threadpool, thread_main);
+
+ /* Unprotect the global threadpool structures */
+ dispatch_semaphore_signal(threadpool->execution_semaphore);
+}
+
+void pthreadpool_destroy(struct pthreadpool* threadpool) {
+ if (threadpool != NULL) {
+ if (threadpool->execution_semaphore != NULL) {
+ /* Release resources */
+ dispatch_release(threadpool->execution_semaphore);
+ }
+ pthreadpool_deallocate(threadpool);
+ }
+}
diff --git a/src/threadpool-common.h b/src/threadpool-common.h
index 1a18c60..ba5587e 100644
--- a/src/threadpool-common.h
+++ b/src/threadpool-common.h
@@ -14,6 +14,23 @@
#endif
#endif
+#ifndef PTHREADPOOL_USE_GCD
+ #if defined(__APPLE__)
+ #define PTHREADPOOL_USE_GCD 1
+ #else
+ #define PTHREADPOOL_USE_GCD 0
+ #endif
+#endif
+
+#ifndef PTHREADPOOL_USE_CONDVAR
+ #if PTHREADPOOL_USE_GCD || PTHREADPOOL_USE_FUTEX
+ #define PTHREADPOOL_USE_CONDVAR 0
+ #else
+ #define PTHREADPOOL_USE_CONDVAR 1
+ #endif
+#endif
+
+
/* Number of iterations in spin-wait loop before going into futex/condvar wait */
#define PTHREADPOOL_SPIN_WAIT_ITERATIONS 1000000
diff --git a/src/threadpool-object.h b/src/threadpool-object.h
index 95ccd6b..0b52964 100644
--- a/src/threadpool-object.h
+++ b/src/threadpool-object.h
@@ -4,16 +4,23 @@
#include <stddef.h>
#include <stdint.h>
+/* Internal headers */
+#include "threadpool-common.h"
+#include "threadpool-atomics.h"
+
/* POSIX headers */
+#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX
#include <pthread.h>
+#endif
+
+/* Mach headers */
+#if PTHREADPOOL_USE_GCD
+#include <dispatch/dispatch.h>
+#endif
/* Library header */
#include <pthreadpool.h>
-/* Internal headers */
-#include "threadpool-common.h"
-#include "threadpool-atomics.h"
-
#define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF)
@@ -49,10 +56,12 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
* Thread pool which owns the thread.
*/
struct pthreadpool* threadpool;
+#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX
/**
* The pthread object corresponding to the thread.
*/
pthread_t thread_object;
+#endif
};
PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0,
@@ -70,10 +79,12 @@ struct pthreadpool_1d_with_uarch_params {
};
struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
+#if !PTHREADPOOL_USE_GCD
/**
* The number of threads that are processing an operation.
*/
pthreadpool_atomic_size_t active_threads;
+#endif
#if PTHREADPOOL_USE_FUTEX
/**
* Indicates if there are active threads.
@@ -110,11 +121,19 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
* Copy of the flags passed to a parallelization function.
*/
pthreadpool_atomic_uint32_t flags;
+#if PTHREADPOOL_USE_CONDVAR || PTHREADPOOL_USE_FUTEX
/**
* Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
*/
pthread_mutex_t execution_mutex;
-#if !PTHREADPOOL_USE_FUTEX
+#endif
+#if PTHREADPOOL_USE_GCD
+ /**
+ * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
+ */
+ dispatch_semaphore_t execution_semaphore;
+#endif
+#if PTHREADPOOL_USE_CONDVAR
/**
* Guards access to the @a active_threads variable.
*/