aboutsummaryrefslogtreecommitdiff
path: root/src/windows.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/windows.c')
-rw-r--r--src/windows.c364
1 files changed, 364 insertions, 0 deletions
diff --git a/src/windows.c b/src/windows.c
new file mode 100644
index 0000000..c9b88f7
--- /dev/null
+++ b/src/windows.c
@@ -0,0 +1,364 @@
+/* Standard C headers */
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Configuration header */
+#include "threadpool-common.h"
+
+/* Windows headers */
+#include <windows.h>
+
+/* Public library header */
+#include <pthreadpool.h>
+
+/* Internal library headers */
+#include "threadpool-atomics.h"
+#include "threadpool-object.h"
+#include "threadpool-utils.h"
+
+
+static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
+ if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
+ SetEvent(threadpool->completion_event[event_index]);
+ }
+}
+
+static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
+ /* Initial check */
+ size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
+ if (active_threads == 0) {
+ return;
+ }
+
+ /* Spin-wait */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ pthreadpool_yield();
+
+ active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
+ if (active_threads == 0) {
+ return;
+ }
+ }
+
+ /* Fall-back to event wait */
+ const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+ assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
+}
+
+static uint32_t wait_for_new_command(
+ struct pthreadpool* threadpool,
+ uint32_t last_command,
+ uint32_t last_flags)
+{
+ uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
+ if (command != last_command) {
+ return command;
+ }
+
+ if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
+ /* Spin-wait loop */
+ for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
+ pthreadpool_yield();
+
+ command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
+ if (command != last_command) {
+ return command;
+ }
+ }
+ }
+
+ /* Spin-wait disabled or timed out, fall back to event wait */
+ const uint32_t event_index = (last_command >> 31);
+ const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ assert(command != last_command);
+ return command;
+}
+
+static DWORD WINAPI thread_main(LPVOID arg) {
+ struct thread_info* thread = (struct thread_info*) arg;
+ struct pthreadpool* threadpool = thread->threadpool;
+ uint32_t last_command = threadpool_command_init;
+ struct fpu_state saved_fpu_state = { 0 };
+ uint32_t flags = 0;
+
+ /* Check in */
+ checkin_worker_thread(threadpool, 0);
+
+ /* Monitor new commands and act accordingly */
+ for (;;) {
+ uint32_t command = wait_for_new_command(threadpool, last_command, flags);
+ pthreadpool_fence_acquire();
+
+ flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
+
+ /* Process command */
+ switch (command & THREADPOOL_COMMAND_MASK) {
+ case threadpool_command_parallelize:
+ {
+ const thread_function_t thread_function =
+ (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ thread_function(threadpool, thread);
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+ break;
+ }
+ case threadpool_command_shutdown:
+ /* Exit immediately: the master thread is waiting on pthread_join */
+ return 0;
+ case threadpool_command_init:
+ /* To inhibit compiler warning */
+ break;
+ }
+ /* Notify the master thread that we finished processing */
+ const uint32_t event_index = command >> 31;
+ checkin_worker_thread(threadpool, event_index);
+ /* Update last command */
+ last_command = command;
+ };
+ return 0;
+}
+
+struct pthreadpool* pthreadpool_create(size_t threads_count) {
+ if (threads_count == 0) {
+ SYSTEM_INFO system_info;
+ ZeroMemory(&system_info, sizeof(system_info));
+ GetSystemInfo(&system_info);
+ threads_count = (size_t) system_info.dwNumberOfProcessors;
+ }
+
+ struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
+ if (threadpool == NULL) {
+ return NULL;
+ }
+ threadpool->threads_count = fxdiv_init_size_t(threads_count);
+ for (size_t tid = 0; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_number = tid;
+ threadpool->threads[tid].threadpool = threadpool;
+ }
+
+ /* Thread pool with a single thread computes everything on the caller thread. */
+ if (threads_count > 1) {
+ threadpool->execution_mutex = CreateMutexW(
+ NULL /* mutex attributes */,
+ FALSE /* initially owned */,
+ NULL /* name */);
+ for (size_t i = 0; i < 2; i++) {
+ threadpool->completion_event[i] = CreateEventW(
+ NULL /* event attributes */,
+ TRUE /* manual-reset event: yes */,
+ FALSE /* initial state: nonsignaled */,
+ NULL /* name */);
+ threadpool->command_event[i] = CreateEventW(
+ NULL /* event attributes */,
+ TRUE /* manual-reset event: yes */,
+ FALSE /* initial state: nonsignaled */,
+ NULL /* name */);
+ }
+
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+
+ /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
+ for (size_t tid = 1; tid < threads_count; tid++) {
+ threadpool->threads[tid].thread_handle = CreateThread(
+ NULL /* thread attributes */,
+ 0 /* stack size: default */,
+ &thread_main,
+ &threadpool->threads[tid],
+ 0 /* creation flags */,
+ NULL /* thread id */);
+ }
+
+ /* Wait until all threads initialize */
+ wait_worker_threads(threadpool, 0);
+ }
+ return threadpool;
+}
+
+PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
+ struct pthreadpool* threadpool,
+ thread_function_t thread_function,
+ const void* params,
+ size_t params_size,
+ void* task,
+ void* context,
+ size_t linear_range,
+ uint32_t flags)
+{
+ assert(threadpool != NULL);
+ assert(thread_function != NULL);
+ assert(task != NULL);
+ assert(linear_range > 1);
+
+ /* Protect the global threadpool structures */
+ const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ /* Setup global arguments */
+ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
+ pthreadpool_store_relaxed_void_p(&threadpool->task, task);
+ pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
+ pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
+
+ const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
+
+ if (params_size != 0) {
+ CopyMemory(&threadpool->params, params, params_size);
+ pthreadpool_fence_release();
+ }
+
+ /* Spread the work between threads */
+ const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
+ size_t range_start = 0;
+ for (size_t tid = 0; tid < threads_count.value; tid++) {
+ struct thread_info* thread = &threadpool->threads[tid];
+ const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
+ const size_t range_end = range_start + range_length;
+ pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
+ pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
+ pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
+
+ /* The next subrange starts where the previous ended */
+ range_start = range_end;
+ }
+
+ /*
+ * Update the threadpool command.
+ * Imporantly, do it after initializing command parameters (range, task, argument, flags)
+ * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
+ * to ensure the unmasked command is different then the last command, because worker threads
+ * monitor for change in the unmasked command.
+ */
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
+
+ /*
+ * Reset the command event for the next command.
+ * It is important to reset the event before writing out the new command, because as soon as the worker threads
+ * observe the new command, they may process it and switch to waiting on the next command event.
+ *
+ * Note: the event is different from the command event signalled in this update.
+ */
+ const uint32_t event_index = (old_command >> 31);
+ BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
+ assert(reset_event_status != FALSE);
+
+ /*
+ * Store the command with release semantics to guarantee that if a worker thread observes
+ * the new command value, it also observes the updated command parameters.
+ *
+ * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
+ * rather than on the event object.
+ */
+ pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
+
+ /*
+ * Signal the event to wake up the threads.
+ * Event in use must be switched after every submitted command to avoid race conditions.
+ * Choose the event based on the high bit of the command, which is flipped on every update.
+ */
+ const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
+ assert(set_event_status != FALSE);
+
+ /* Save and modify FPU denormals control, if needed */
+ struct fpu_state saved_fpu_state = { 0 };
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ saved_fpu_state = get_fpu_state();
+ disable_fpu_denormals();
+ }
+
+ /* Do computations as worker #0 */
+ thread_function(threadpool, &threadpool->threads[0]);
+
+ /* Restore FPU denormals control, if needed */
+ if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
+ set_fpu_state(saved_fpu_state);
+ }
+
+ /*
+ * Wait until the threads finish computation
+ * Use the complementary event because it corresponds to the new command.
+ */
+ wait_worker_threads(threadpool, event_index ^ 1);
+
+ /*
+ * Reset the completion event for the next command.
+ * Note: the event is different from the one used for waiting in this update.
+ */
+ reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
+ assert(reset_event_status != FALSE);
+
+ /* Make changes by other threads visible to this thread */
+ pthreadpool_fence_acquire();
+
+ /* Unprotect the global threadpool structures */
+ const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
+ assert(release_mutex_status != FALSE);
+}
+
+void pthreadpool_destroy(struct pthreadpool* threadpool) {
+ if (threadpool != NULL) {
+ const size_t threads_count = threadpool->threads_count.value;
+ if (threads_count > 1) {
+ pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
+
+ /*
+ * Store the command with release semantics to guarantee that if a worker thread observes
+ * the new command value, it also observes the updated active_threads values.
+ */
+ const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
+ pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
+
+ /*
+ * Signal the event to wake up the threads.
+ * Event in use must be switched after every submitted command to avoid race conditions.
+ * Choose the event based on the high bit of the command, which is flipped on every update.
+ */
+ const uint32_t event_index = (old_command >> 31);
+ const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
+ assert(set_event_status != FALSE);
+
+ /* Wait until all threads return */
+ for (size_t tid = 1; tid < threads_count; tid++) {
+ const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
+ if (thread_handle != NULL) {
+ const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
+ assert(wait_status == WAIT_OBJECT_0);
+
+ const BOOL close_status = CloseHandle(thread_handle);
+ assert(close_status != FALSE);
+ }
+ }
+
+ /* Release resources */
+ if (threadpool->execution_mutex != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->execution_mutex);
+ assert(close_status != FALSE);
+ }
+ for (size_t i = 0; i < 2; i++) {
+ if (threadpool->command_event[i] != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->command_event[i]);
+ assert(close_status != FALSE);
+ }
+ if (threadpool->completion_event[i] != NULL) {
+ const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
+ assert(close_status != FALSE);
+ }
+ }
+ }
+ pthreadpool_deallocate(threadpool);
+ }
+}