aboutsummaryrefslogtreecommitdiff
path: root/src/threadpool-object.h
diff options
context:
space:
mode:
authorMarat Dukhan <maratek@google.com>2020-04-01 17:08:30 -0700
committerMarat Dukhan <maratek@google.com>2020-04-01 17:08:30 -0700
commitfc793bc6d7eab64756df79971556594bf4ab145b (patch)
tree669f93c06ca276279aa3acb6b91edb8029e5f5a7 /src/threadpool-object.h
parent5b41aa6060588e26fd25ace6dc4afccfd4793997 (diff)
downloadpthreadpool-fc793bc6d7eab64756df79971556594bf4ab145b.tar.gz
Refactor pthreadpool implementation
Split implementation into two types of components: - Components dependent on threading API - Portable components
Diffstat (limited to 'src/threadpool-object.h')
-rw-r--r--src/threadpool-object.h164
1 files changed, 164 insertions, 0 deletions
diff --git a/src/threadpool-object.h b/src/threadpool-object.h
new file mode 100644
index 0000000..95ccd6b
--- /dev/null
+++ b/src/threadpool-object.h
@@ -0,0 +1,164 @@
+#pragma once
+
+/* Standard C headers */
+#include <stddef.h>
+#include <stdint.h>
+
+/* POSIX headers */
+#include <pthread.h>
+
+/* Library header */
+#include <pthreadpool.h>
+
+/* Internal headers */
+#include "threadpool-common.h"
+#include "threadpool-atomics.h"
+
+
+#define THREADPOOL_COMMAND_MASK UINT32_C(0x7FFFFFFF)
+
+enum threadpool_command {
+ threadpool_command_init,
+ threadpool_command_parallelize,
+ threadpool_command_shutdown,
+};
+
+struct PTHREADPOOL_CACHELINE_ALIGNED thread_info {
+ /**
+ * Index of the first element in the work range.
+ * Before processing a new element the owning worker thread increments this value.
+ */
+ pthreadpool_atomic_size_t range_start;
+ /**
+ * Index of the element after the last element of the work range.
+ * Before processing a new element the stealing worker thread decrements this value.
+ */
+ pthreadpool_atomic_size_t range_end;
+ /**
+ * The number of elements in the work range.
+ * Due to race conditions range_length <= range_end - range_start.
+ * The owning worker thread must decrement this value before incrementing @a range_start.
+ * The stealing worker thread must decrement this value before decrementing @a range_end.
+ */
+ pthreadpool_atomic_size_t range_length;
+ /**
+ * Thread number in the 0..threads_count-1 range.
+ */
+ size_t thread_number;
+ /**
+ * Thread pool which owns the thread.
+ */
+ struct pthreadpool* threadpool;
+ /**
+ * The pthread object corresponding to the thread.
+ */
+ pthread_t thread_object;
+};
+
+PTHREADPOOL_STATIC_ASSERT(sizeof(struct thread_info) % PTHREADPOOL_CACHELINE_SIZE == 0,
+ "thread_info structure must occupy an integer number of cache lines (64 bytes)");
+
+struct pthreadpool_1d_with_uarch_params {
+ /**
+ * Copy of the default uarch index argument passed to a microarchitecture-aware parallelization function.
+ */
+ uint32_t default_uarch_index;
+ /**
+ * Copy of the max uarch index argument passed to a microarchitecture-aware parallelization function.
+ */
+ uint32_t max_uarch_index;
+};
+
+struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool {
+ /**
+ * The number of threads that are processing an operation.
+ */
+ pthreadpool_atomic_size_t active_threads;
+#if PTHREADPOOL_USE_FUTEX
+ /**
+ * Indicates if there are active threads.
+ * Only two values are possible:
+ * - has_active_threads == 0 if active_threads == 0
+ * - has_active_threads == 1 if active_threads != 0
+ */
+ pthreadpool_atomic_uint32_t has_active_threads;
+#endif
+ /**
+ * The last command submitted to the thread pool.
+ */
+ pthreadpool_atomic_uint32_t command;
+ /**
+ * The entry point function to call for each thread in the thread pool for parallelization tasks.
+ */
+ pthreadpool_atomic_void_p thread_function;
+ /**
+ * The function to call for each item.
+ */
+ pthreadpool_atomic_void_p task;
+ /**
+ * The first argument to the item processing function.
+ */
+ pthreadpool_atomic_void_p argument;
+ /**
+ * Additional parallelization parameters.
+ * These parameters are specific for each thread_function.
+ */
+ union {
+ struct pthreadpool_1d_with_uarch_params parallelize_1d_with_uarch;
+ } params;
+ /**
+ * Copy of the flags passed to a parallelization function.
+ */
+ pthreadpool_atomic_uint32_t flags;
+ /**
+ * Serializes concurrent calls to @a pthreadpool_parallelize_* from different threads.
+ */
+ pthread_mutex_t execution_mutex;
+#if !PTHREADPOOL_USE_FUTEX
+ /**
+ * Guards access to the @a active_threads variable.
+ */
+ pthread_mutex_t completion_mutex;
+ /**
+ * Condition variable to wait until all threads complete an operation (until @a active_threads is zero).
+ */
+ pthread_cond_t completion_condvar;
+ /**
+ * Guards access to the @a command variable.
+ */
+ pthread_mutex_t command_mutex;
+ /**
+ * Condition variable to wait for change of the @a command variable.
+ */
+ pthread_cond_t command_condvar;
+#endif
+ /**
+ * The number of threads in the thread pool. Never changes after pthreadpool_create.
+ */
+ size_t threads_count;
+ /**
+ * Thread information structures that immediately follow this structure.
+ */
+ struct thread_info threads[];
+};
+
+PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZE == 0,
+ "pthreadpool structure must occupy an integer number of cache lines (64 bytes)");
+
+PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate(
+ size_t threads_count);
+
+PTHREADPOOL_INTERNAL void pthreadpool_deallocate(
+ struct pthreadpool* threadpool);
+
+typedef void (*thread_function_t)(struct pthreadpool* threadpool, struct thread_info* thread);
+
+PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
+ struct pthreadpool* threadpool,
+ thread_function_t thread_function,
+ const void* params,
+ size_t params_size,
+ void* task,
+ void* context,
+ size_t linear_range,
+ uint32_t flags);