diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool')
9 files changed, 865 insertions, 379 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h new file mode 100644 index 000000000..e4c59dc3d --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h @@ -0,0 +1,67 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2018 Rasmus Munk Larsen <rmlarsen@google.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Barrier is an object that allows one or more threads to wait until +// Notify has been called a specified number of times. + +#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H +#define EIGEN_CXX11_THREADPOOL_BARRIER_H + +namespace Eigen { + +class Barrier { + public: + Barrier(unsigned int count) : state_(count << 1), notified_(false) { + eigen_plain_assert(((count << 1) >> 1) == count); + } + ~Barrier() { eigen_plain_assert((state_ >> 1) == 0); } + + void Notify() { + unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; + if (v != 1) { + // Clear the lowest bit (waiter flag) and check that the original state + // value was not zero. If it was zero, it means that notify was called + // more times than the original count. + eigen_plain_assert(((v + 2) & ~1) != 0); + return; // either count has not dropped to 0, or waiter is not waiting + } + std::unique_lock<std::mutex> l(mu_); + eigen_plain_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void Wait() { + unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); + if ((v >> 1) == 0) return; + std::unique_lock<std::mutex> l(mu_); + while (!notified_) { + cv_.wait(l); + } + } + + private: + std::mutex mu_; + std::condition_variable cv_; + std::atomic<unsigned int> state_; // low bit is waiter flag + bool notified_; +}; + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object, +// but only one caller must call Notify() on the object. +struct Notification : Barrier { + Notification() : Barrier(1){}; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 71d55552d..4549aa069 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -33,10 +33,10 @@ namespace Eigen { // ec.Notify(true); // // Notify is cheap if there are no waiting threads. Prewait/CommitWait are not -// cheap, but they are executed only if the preceeding predicate check has +// cheap, but they are executed only if the preceding predicate check has // failed. // -// Algorihtm outline: +// Algorithm outline: // There are two main variables: predicate (managed by user) and state_. // Operation closely resembles Dekker mutual algorithm: // https://en.wikipedia.org/wiki/Dekker%27s_algorithm @@ -50,117 +50,114 @@ class EventCount { public: class Waiter; - EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) { - eigen_assert(waiters.size() < (1 << kWaiterBits) - 1); - // Initialize epoch to something close to overflow to test overflow. - state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2); + EventCount(MaxSizeVector<Waiter>& waiters) + : state_(kStackMask), waiters_(waiters) { + eigen_plain_assert(waiters.size() < (1 << kWaiterBits) - 1); } ~EventCount() { // Ensure there are no waiters. - eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + eigen_plain_assert(state_.load() == kStackMask); } // Prewait prepares for waiting. - // After calling this function the thread must re-check the wait predicate - // and call either CancelWait or CommitWait passing the same Waiter object. - void Prewait(Waiter* w) { - w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_seq_cst); + // After calling Prewait, the thread must re-check the wait predicate + // and then call either CancelWait or CommitWait. + void Prewait() { + uint64_t state = state_.load(std::memory_order_relaxed); + for (;;) { + CheckState(state); + uint64_t newstate = state + kWaiterInc; + CheckState(newstate); + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_seq_cst)) + return; + } } - // CommitWait commits waiting. + // CommitWait commits waiting after Prewait. void CommitWait(Waiter* w) { + eigen_plain_assert((w->epoch & ~kEpochMask) == 0); w->state = Waiter::kNotSignaled; - // Modification epoch of this waiter. - uint64_t epoch = - (w->epoch & kEpochMask) + - (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + const uint64_t me = (w - &waiters_[0]) | w->epoch; uint64_t state = state_.load(std::memory_order_seq_cst); for (;;) { - if (int64_t((state & kEpochMask) - epoch) < 0) { - // The preceeding waiter has not decided on its fate. Wait until it - // calls either CancelWait or CommitWait, or is notified. - EIGEN_THREAD_YIELD(); - state = state_.load(std::memory_order_seq_cst); - continue; + CheckState(state, true); + uint64_t newstate; + if ((state & kSignalMask) != 0) { + // Consume the signal and return immidiately. + newstate = state - kWaiterInc - kSignalInc; + } else { + // Remove this thread from pre-wait counter and add to the waiter stack. + newstate = ((state & kWaiterMask) - kWaiterInc) | me; + w->next.store(state & (kStackMask | kEpochMask), + std::memory_order_relaxed); } - // We've already been notified. - if (int64_t((state & kEpochMask) - epoch) > 0) return; - // Remove this thread from prewait counter and add it to the waiter list. - eigen_assert((state & kWaiterMask) != 0); - uint64_t newstate = state - kWaiterInc + kEpochInc; - newstate = (newstate & ~kStackMask) | (w - &waiters_[0]); - if ((state & kStackMask) == kStackMask) - w->next.store(nullptr, std::memory_order_relaxed); - else - w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed); + CheckState(newstate); if (state_.compare_exchange_weak(state, newstate, - std::memory_order_release)) - break; + std::memory_order_acq_rel)) { + if ((state & kSignalMask) == 0) { + w->epoch += kEpochInc; + Park(w); + } + return; + } } - Park(w); } // CancelWait cancels effects of the previous Prewait call. - void CancelWait(Waiter* w) { - uint64_t epoch = - (w->epoch & kEpochMask) + - (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + void CancelWait() { uint64_t state = state_.load(std::memory_order_relaxed); for (;;) { - if (int64_t((state & kEpochMask) - epoch) < 0) { - // The preceeding waiter has not decided on its fate. Wait until it - // calls either CancelWait or CommitWait, or is notified. - EIGEN_THREAD_YIELD(); - state = state_.load(std::memory_order_relaxed); - continue; - } - // We've already been notified. - if (int64_t((state & kEpochMask) - epoch) > 0) return; - // Remove this thread from prewait counter. - eigen_assert((state & kWaiterMask) != 0); - if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc, - std::memory_order_relaxed)) + CheckState(state, true); + uint64_t newstate = state - kWaiterInc; + // We don't know if the thread was also notified or not, + // so we should not consume a signal unconditionaly. + // Only if number of waiters is equal to number of signals, + // we know that the thread was notified and we must take away the signal. + if (((state & kWaiterMask) >> kWaiterShift) == + ((state & kSignalMask) >> kSignalShift)) + newstate -= kSignalInc; + CheckState(newstate); + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_acq_rel)) return; } } // Notify wakes one or all waiting threads. // Must be called after changing the associated wait predicate. - void Notify(bool all) { + void Notify(bool notifyAll) { std::atomic_thread_fence(std::memory_order_seq_cst); uint64_t state = state_.load(std::memory_order_acquire); for (;;) { + CheckState(state); + const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; + const uint64_t signals = (state & kSignalMask) >> kSignalShift; // Easy case: no waiters. - if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0) - return; - uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; + if ((state & kStackMask) == kStackMask && waiters == signals) return; uint64_t newstate; - if (all) { - // Reset prewait counter and empty wait list. - newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask; - } else if (waiters) { + if (notifyAll) { + // Empty wait stack and set signal to number of pre-wait threads. + newstate = + (state & kWaiterMask) | (waiters << kSignalShift) | kStackMask; + } else if (signals < waiters) { // There is a thread in pre-wait state, unblock it. - newstate = state + kEpochInc - kWaiterInc; + newstate = state + kSignalInc; } else { // Pop a waiter from list and unpark it. Waiter* w = &waiters_[state & kStackMask]; - Waiter* wnext = w->next.load(std::memory_order_relaxed); - uint64_t next = kStackMask; - if (wnext != nullptr) next = wnext - &waiters_[0]; - // Note: we don't add kEpochInc here. ABA problem on the lock-free stack - // can't happen because a waiter is re-pushed onto the stack only after - // it was in the pre-wait state which inevitably leads to epoch - // increment. - newstate = (state & kEpochMask) + next; + uint64_t next = w->next.load(std::memory_order_relaxed); + newstate = (state & (kWaiterMask | kSignalMask)) | next; } + CheckState(newstate); if (state_.compare_exchange_weak(state, newstate, - std::memory_order_acquire)) { - if (!all && waiters) return; // unblocked pre-wait thread + std::memory_order_acq_rel)) { + if (!notifyAll && (signals < waiters)) + return; // unblocked pre-wait thread if ((state & kStackMask) == kStackMask) return; Waiter* w = &waiters_[state & kStackMask]; - if (!all) w->next.store(nullptr, std::memory_order_relaxed); + if (!notifyAll) w->next.store(kStackMask, std::memory_order_relaxed); Unpark(w); return; } @@ -169,12 +166,13 @@ class EventCount { class Waiter { friend class EventCount; - // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. - EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next; + // Align to 128 byte boundary to prevent false sharing with other Waiter + // objects in the same vector. + EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<uint64_t> next; std::mutex mu; std::condition_variable cv; - uint64_t epoch; - unsigned state; + uint64_t epoch = 0; + unsigned state = kNotSignaled; enum { kNotSignaled, kWaiting, @@ -184,23 +182,41 @@ class EventCount { private: // State_ layout: - // - low kStackBits is a stack of waiters committed wait. + // - low kWaiterBits is a stack of waiters committed wait + // (indexes in waiters_ array are used as stack elements, + // kStackMask means empty stack). // - next kWaiterBits is count of waiters in prewait state. - // - next kEpochBits is modification counter. - static const uint64_t kStackBits = 16; - static const uint64_t kStackMask = (1ull << kStackBits) - 1; - static const uint64_t kWaiterBits = 16; - static const uint64_t kWaiterShift = 16; + // - next kWaiterBits is count of pending signals. + // - remaining bits are ABA counter for the stack. + // (stored in Waiter node and incremented on push). + static const uint64_t kWaiterBits = 14; + static const uint64_t kStackMask = (1ull << kWaiterBits) - 1; + static const uint64_t kWaiterShift = kWaiterBits; static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1) << kWaiterShift; - static const uint64_t kWaiterInc = 1ull << kWaiterBits; - static const uint64_t kEpochBits = 32; - static const uint64_t kEpochShift = 32; + static const uint64_t kWaiterInc = 1ull << kWaiterShift; + static const uint64_t kSignalShift = 2 * kWaiterBits; + static const uint64_t kSignalMask = ((1ull << kWaiterBits) - 1) + << kSignalShift; + static const uint64_t kSignalInc = 1ull << kSignalShift; + static const uint64_t kEpochShift = 3 * kWaiterBits; + static const uint64_t kEpochBits = 64 - kEpochShift; static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift; static const uint64_t kEpochInc = 1ull << kEpochShift; std::atomic<uint64_t> state_; MaxSizeVector<Waiter>& waiters_; + static void CheckState(uint64_t state, bool waiter = false) { + static_assert(kEpochBits >= 20, "not enough bits to prevent ABA problem"); + const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; + const uint64_t signals = (state & kSignalMask) >> kSignalShift; + eigen_plain_assert(waiters >= signals); + eigen_plain_assert(waiters < (1 << kWaiterBits) - 1); + eigen_plain_assert(!waiter || waiters > 0); + (void)waiters; + (void)signals; + } + void Park(Waiter* w) { std::unique_lock<std::mutex> lock(w->mu); while (w->state != Waiter::kSignaled) { @@ -209,10 +225,10 @@ class EventCount { } } - void Unpark(Waiter* waiters) { - Waiter* next = nullptr; - for (Waiter* w = waiters; w; w = next) { - next = w->next.load(std::memory_order_relaxed); + void Unpark(Waiter* w) { + for (Waiter* next; w; w = next) { + uint64_t wnext = w->next.load(std::memory_order_relaxed) & kStackMask; + next = wnext == kStackMask ? nullptr : &waiters_[wnext]; unsigned state; { std::unique_lock<std::mutex> lock(w->mu); diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 354bce52a..23a2b5467 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,79 +10,116 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template <typename Environment> -class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { +class ThreadPoolTempl : public Eigen::ThreadPoolInterface { public: typedef typename Environment::Task Task; typedef RunQueue<Task, 1024> Queue; - NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment()) + ThreadPoolTempl(int num_threads, Environment env = Environment()) + : ThreadPoolTempl(num_threads, true, env) {} + + ThreadPoolTempl(int num_threads, bool allow_spinning, + Environment env = Environment()) : env_(env), - threads_(num_threads), - queues_(num_threads), - coprimes_(num_threads), + num_threads_(num_threads), + allow_spinning_(allow_spinning), + thread_data_(num_threads), + all_coprimes_(num_threads), waiters_(num_threads), + global_steal_partition_(EncodePartition(0, num_threads_)), blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { - waiters_.resize(num_threads); - - // Calculate coprimes of num_threads. - // Coprimes are used for a random walk over all threads in Steal + waiters_.resize(num_threads_); + // Calculate coprimes of all numbers [1, num_threads]. + // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take - // a walk starting thread index t and calculate num_threads - 1 subsequent + // a random starting thread index t and calculate num_threads - 1 subsequent // indices as (t + coprime) % num_threads, we will cover all threads without // repetitions (effectively getting a presudo-random permutation of thread // indices). - for (int i = 1; i <= num_threads; i++) { - unsigned a = i; - unsigned b = num_threads; - // If GCD(a, b) == 1, then a and b are coprimes. - while (b != 0) { - unsigned tmp = a; - a = b; - b = tmp % b; - } - if (a == 1) { - coprimes_.push_back(i); - } - } - for (int i = 0; i < num_threads; i++) { - queues_.push_back(new Queue()); + eigen_plain_assert(num_threads_ < kMaxThreads); + for (int i = 1; i <= num_threads_; ++i) { + all_coprimes_.emplace_back(i); + ComputeCoprimes(i, &all_coprimes_.back()); } - for (int i = 0; i < num_threads; i++) { - threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif + thread_data_.resize(num_threads_); + for (int i = 0; i < num_threads_; i++) { + SetStealPartition(i, EncodePartition(0, num_threads_)); + thread_data_[i].thread.reset( + env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } - ~NonBlockingThreadPoolTempl() { + ~ThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].queue.Flush(); + } + } + // Join threads explicitly (by destroying) to avoid destruction order within + // this class. + for (size_t i = 0; i < thread_data_.size(); ++i) + thread_data_[i].thread.reset(); + } + + void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) { + eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_)); - // Join threads explicitly to avoid destruction order issues. - for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; - for (size_t i = 0; i < threads_.size(); i++) delete queues_[i]; + // Pass this information to each thread queue. + for (int i = 0; i < num_threads_; i++) { + const auto& pair = partitions[i]; + unsigned start = pair.first, end = pair.second; + AssertBounds(start, end); + unsigned val = EncodePartition(start, end); + SetStealPartition(i, val); + } + } + + void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { + ScheduleWithHint(std::move(fn), 0, num_threads_); } - void Schedule(std::function<void()> fn) { + void ScheduleWithHint(std::function<void()> fn, int start, + int limit) override { Task t = env_.CreateTask(std::move(fn)); PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->thread_id]; - t = q->PushFront(std::move(t)); + Queue& q = thread_data_[pt->thread_id].queue; + t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; - t = q->PushBack(std::move(t)); + eigen_plain_assert(start < limit); + eigen_plain_assert(limit <= num_threads_); + int num_queues = limit - start; + int rnd = Rand(&pt->rand) % num_queues; + eigen_plain_assert(start + rnd < limit); + Queue& q = thread_data_[start + rnd].queue; + t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. // Strictly speaking, this can lead to a racy-use-after-free. Consider that @@ -91,19 +128,32 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } else { env_.ExecuteTask(t); // Push failed, execute directly. + } } - int NumThreads() const final { - return static_cast<int>(threads_.size()); + void Cancel() EIGEN_OVERRIDE { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. +#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].thread->OnCancel(); + } +#endif + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } - int CurrentThreadId() const final { - const PerThread* pt = - const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread(); + int NumThreads() const EIGEN_FINAL { return num_threads_; } + + int CurrentThreadId() const EIGEN_FINAL { + const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -112,72 +162,191 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { } private: + // Create a single atomic<int> that encodes start and limit information for + // each thread. + // We expect num_threads_ < 65536, so we can store them in a single + // std::atomic<unsigned>. + // Exposed publicly as static functions so that external callers can reuse + // this encode/decode logic for maintaining their own thread-safe copies of + // scheduling and steal domain(s). + static const int kMaxPartitionBits = 16; + static const int kMaxThreads = 1 << kMaxPartitionBits; + + inline unsigned EncodePartition(unsigned start, unsigned limit) { + return (start << kMaxPartitionBits) | limit; + } + + inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) { + *limit = val & (kMaxThreads - 1); + val >>= kMaxPartitionBits; + *start = val; + } + + void AssertBounds(int start, int end) { + eigen_plain_assert(start >= 0); + eigen_plain_assert(start < end); // non-zero sized partition + eigen_plain_assert(end <= num_threads_); + } + + inline void SetStealPartition(size_t i, unsigned val) { + thread_data_[i].steal_partition.store(val, std::memory_order_relaxed); + } + + inline unsigned GetStealPartition(int i) { + return thread_data_[i].steal_partition.load(std::memory_order_relaxed); + } + + void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) { + for (int i = 1; i <= N; i++) { + unsigned a = i; + unsigned b = N; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes->push_back(i); + } + } + } + typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } - NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} + ThreadPoolTempl* pool; // Parent pool, or null for normal threads. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. +#ifndef EIGEN_THREAD_LOCAL + // Prevent false sharing. + char pad_[128]; +#endif + }; + + struct ThreadData { + constexpr ThreadData() : thread(), steal_partition(0), queue() {} + std::unique_ptr<Thread> thread; + std::atomic<unsigned> steal_partition; + Queue queue; }; Environment env_; - MaxSizeVector<Thread*> threads_; - MaxSizeVector<Queue*> queues_; - MaxSizeVector<unsigned> coprimes_; + const int num_threads_; + const bool allow_spinning_; + MaxSizeVector<ThreadData> thread_data_; + MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_; MaxSizeVector<EventCount::Waiter> waiters_; + unsigned global_steal_partition_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<Barrier> init_barrier_; + std::mutex per_thread_map_mutex_; // Protects per_thread_map_. + std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<PerThread> new_pt(new PerThread()); + per_thread_map_mutex_.lock(); + bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second; + eigen_plain_assert(insertOK); + EIGEN_UNUSED_VARIABLE(insertOK); + per_thread_map_mutex_.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#endif PerThread* pt = GetPerThread(); pt->pool = this; - pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; - Queue* q = queues_[thread_id]; + Queue& q = thread_data_[thread_id].queue; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { - Task t = q->PopFront(); - if (!t.f) { - t = Steal(); + // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is + // proportional to num_threads_ and we assume that new work is scheduled at + // a constant rate, so we set spin_count to 5000 / num_threads_. The + // constant was picked based on a fair dice roll, tune it. + const int spin_count = + allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; + if (num_threads_ == 1) { + // For num_threads_ == 1 there is no point in going through the expensive + // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the + // victim queues it might reverse the order in which ops are executed + // compared to the order in which they are scheduled, which tends to be + // counter-productive for the types of I/O workloads the single thread + // pools tend to be used for. + while (!cancelled_) { + Task t = q.PopFront(); + for (int i = 0; i < spin_count && !t.f; i++) { + if (!cancelled_.load(std::memory_order_relaxed)) { + t = q.PopFront(); + } + } if (!t.f) { - // Leave one thread spinning. This reduces latency. - // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. - // Also, the time it takes to attempt to steal work 1000 times depends - // on the size of the thread pool. However the speed at which the user - // of the thread pool submit tasks is independent of the size of the - // pool. Consider a time based limit instead. - if (!spinning_ && !spinning_.exchange(true)) { - for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); - } - spinning_ = false; + if (!WaitForWork(waiter, &t)) { + return; } + } + if (t.f) { + env_.ExecuteTask(t); + } + } + } else { + while (!cancelled_) { + Task t = q.PopFront(); + if (!t.f) { + t = LocalSteal(); if (!t.f) { - if (!WaitForWork(waiter, &t)) { - return; + t = GlobalSteal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < spin_count && !t.f; i++) { + if (!cancelled_.load(std::memory_order_relaxed)) { + t = GlobalSteal(); + } else { + return; + } + } + spinning_ = false; + } + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; + } + } } } } - } - if (t.f) { - env_.ExecuteTask(t); + if (t.f) { + env_.ExecuteTask(t); + } } } } - // Steal tries to steal work from other worker threads in best-effort manner. - Task Steal() { + // Steal tries to steal work from other worker threads in the range [start, + // limit) in best-effort manner. + Task Steal(unsigned start, unsigned limit) { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + const size_t size = limit - start; unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; - unsigned victim = r % size; + // Reduce r into [0, size) range, this utilizes trick from + // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30)); + unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32; + unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32; + unsigned inc = all_coprimes_[size - 1][index]; + for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim]->PopBack(); + eigen_plain_assert(start + victim < limit); + Task t = thread_data_[start + victim].queue.PopBack(); if (t.f) { return t; } @@ -189,27 +358,52 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return Task(); } + // Steals work within threads belonging to the partition. + Task LocalSteal() { + PerThread* pt = GetPerThread(); + unsigned partition = GetStealPartition(pt->thread_id); + // If thread steal partition is the same as global partition, there is no + // need to go through the steal loop twice. + if (global_steal_partition_ == partition) return Task(); + unsigned start, limit; + DecodePartition(partition, &start, &limit); + AssertBounds(start, limit); + + return Steal(start, limit); + } + + // Steals work from any other thread in the pool. + Task GlobalSteal() { + return Steal(0, num_threads_); + } + + // WaitForWork blocks until new work is available (returns true), or if it is // time to exit (returns false). Can optionally return a task to execute in t // (in such case t.f != nullptr on return). bool WaitForWork(EventCount::Waiter* waiter, Task* t) { - eigen_assert(!t->f); + eigen_plain_assert(!t->f); // We already did best-effort emptiness check in Steal, so prepare for // blocking. - ec_.Prewait(waiter); + ec_.Prewait(); // Now do a reliable emptiness check. int victim = NonEmptyQueueIndex(); if (victim != -1) { - ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + ec_.CancelWait(); + if (cancelled_) { + return false; + } else { + *t = thread_data_[victim].queue.PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; - if (done_ && blocked_ == threads_.size()) { - ec_.CancelWait(waiter); + // TODO is blocked_ required to be unsigned? + if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) { + ec_.CancelWait(); // Almost done, but need to re-check queues. // Consider that all queues are empty and all worker threads are preempted // right after incrementing blocked_ above. Now a free-standing thread @@ -236,12 +430,15 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int NonEmptyQueueIndex() { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + // We intentionally design NonEmptyQueueIndex to steal work from + // anywhere in the queue so threads don't block in WaitForWork() forever + // when all threads in their partition go to sleep. Steal is still local. + const size_t size = thread_data_.size(); unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim]->Empty()) { + if (!thread_data_[victim].queue.Empty()) { return victim; } victim += inc; @@ -252,10 +449,24 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash<std::thread::id>()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second.get(); + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -263,11 +474,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast<unsigned>((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; -typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool; +typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool; } // namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index 05ed76cbe..b572ebcdf 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - namespace Eigen { // RunQueue is a fixed-size, partially non-blocking deque or Work items. @@ -40,14 +39,14 @@ class RunQueue { public: RunQueue() : front_(0), back_(0) { // require power-of-two for fast masking - eigen_assert((kSize & (kSize - 1)) == 0); - eigen_assert(kSize > 2); // why would you do this? - eigen_assert(kSize <= (64 << 10)); // leave enough space for counter + eigen_plain_assert((kSize & (kSize - 1)) == 0); + eigen_plain_assert(kSize > 2); // why would you do this? + eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed); } - ~RunQueue() { eigen_assert(Size() == 0); } + ~RunQueue() { eigen_plain_assert(Size() == 0); } // PushFront inserts w at the beginning of the queue. // If queue is full returns w, otherwise returns default-constructed Work. @@ -98,11 +97,9 @@ class RunQueue { } // PopBack removes and returns the last elements in the queue. - // Can fail spuriously. Work PopBack() { if (Empty()) return Work(); - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return Work(); + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -116,11 +113,10 @@ class RunQueue { } // PopBackHalf removes and returns half last elements in the queue. - // Returns number of elements removed. But can also fail spuriously. + // Returns number of elements removed. unsigned PopBackHalf(std::vector<Work>* result) { if (Empty()) return 0; - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return 0; + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -131,15 +127,14 @@ class RunQueue { Elem* e = &array_[mid & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) + if (s != kReady || !e->state.compare_exchange_strong( + s, kBusy, std::memory_order_acquire)) continue; start = mid; } else { // Note: no need to store temporal kBusy, we exclusively own these // elements. - eigen_assert(s == kReady); + eigen_plain_assert(s == kReady); } result->push_back(std::move(e->w)); e->state.store(kEmpty, std::memory_order_release); @@ -152,30 +147,18 @@ class RunQueue { // Size returns current queue size. // Can be called by any thread at any time. - unsigned Size() const { - // Emptiness plays critical role in thread pool blocking. So we go to great - // effort to not produce false positives (claim non-empty queue as empty). - for (;;) { - // Capture a consistent snapshot of front/tail. - unsigned front = front_.load(std::memory_order_acquire); - unsigned back = back_.load(std::memory_order_acquire); - unsigned front1 = front_.load(std::memory_order_relaxed); - if (front != front1) continue; - int size = (front & kMask2) - (back & kMask2); - // Fix overflow. - if (size < 0) size += 2 * kSize; - // Order of modification in push/pop is crafted to make the queue look - // larger than it is during concurrent modifications. E.g. pop can - // decrement size before the corresponding push has incremented it. - // So the computed size can be up to kSize + 1, fix it. - if (size > static_cast<int>(kSize)) size = kSize; - return size; - } - } + unsigned Size() const { return SizeOrNotEmpty<true>(); } // Empty tests whether container is empty. // Can be called by any thread at any time. - bool Empty() const { return Size() == 0; } + bool Empty() const { return SizeOrNotEmpty<false>() == 0; } + + // Delete all the elements from the queue. + void Flush() { + while (!Empty()) { + PopFront(); + } + } private: static const unsigned kMask = kSize - 1; @@ -191,7 +174,7 @@ class RunQueue { }; std::mutex mutex_; // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of - // front/back, repsectively. The remaining bits contain modification counters + // front/back, respectively. The remaining bits contain modification counters // that are incremented on Push operations. This allows us to (1) distinguish // between empty and full conditions (if we would use log(kSize) bits for // position, these conditions would be indistinguishable); (2) obtain @@ -201,6 +184,49 @@ class RunQueue { std::atomic<unsigned> back_; Elem array_[kSize]; + // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, + // only whether the size is 0 is guaranteed to be correct. + // Can be called by any thread at any time. + template<bool NeedSizeEstimate> + unsigned SizeOrNotEmpty() const { + // Emptiness plays critical role in thread pool blocking. So we go to great + // effort to not produce false positives (claim non-empty queue as empty). + unsigned front = front_.load(std::memory_order_acquire); + for (;;) { + // Capture a consistent snapshot of front/tail. + unsigned back = back_.load(std::memory_order_acquire); + unsigned front1 = front_.load(std::memory_order_relaxed); + if (front != front1) { + front = front1; + std::atomic_thread_fence(std::memory_order_acquire); + continue; + } + if (NeedSizeEstimate) { + return CalculateSize(front, back); + } else { + // This value will be 0 if the queue is empty, and undefined otherwise. + unsigned maybe_zero = ((front ^ back) & kMask2); + // Queue size estimate must agree with maybe zero check on the queue + // empty/non-empty state. + eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); + return maybe_zero; + } + } + } + + EIGEN_ALWAYS_INLINE + unsigned CalculateSize(unsigned front, unsigned back) const { + int size = (front & kMask2) - (back & kMask2); + // Fix overflow. + if (size < 0) size += 2 * kSize; + // Order of modification in push/pop is crafted to make the queue look + // larger than it is during concurrent modifications. E.g. push can + // increment size before the corresponding pop has decremented it. + // So the computed size can be up to kSize + 1, fix it. + if (size > static_cast<int>(kSize)) size = kSize; + return static_cast<unsigned>(size); + } + RunQueue(const RunQueue&) = delete; void operator=(const RunQueue&) = delete; }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h deleted file mode 100644 index e75d0f467..000000000 --- a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h +++ /dev/null @@ -1,154 +0,0 @@ -// This file is part of Eigen, a lightweight C++ template library -// for linear algebra. -// -// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com> -// -// This Source Code Form is subject to the terms of the Mozilla -// Public License v. 2.0. If a copy of the MPL was not distributed -// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. - -#ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H -#define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H - -namespace Eigen { - -// The implementation of the ThreadPool type ensures that the Schedule method -// runs the functions it is provided in FIFO order when the scheduling is done -// by a single thread. -// Environment provides a way to create threads and also allows to intercept -// task submission and execution. -template <typename Environment> -class SimpleThreadPoolTempl : public ThreadPoolInterface { - public: - // Construct a pool that contains "num_threads" threads. - explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment()) - : env_(env), threads_(num_threads), waiters_(num_threads) { - for (int i = 0; i < num_threads; i++) { - threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); })); - } - } - - // Wait until all scheduled work has finished and then destroy the - // set of threads. - ~SimpleThreadPoolTempl() { - { - // Wait for all work to get done. - std::unique_lock<std::mutex> l(mu_); - while (!pending_.empty()) { - empty_.wait(l); - } - exiting_ = true; - - // Wakeup all waiters. - for (auto w : waiters_) { - w->ready = true; - w->task.f = nullptr; - w->cv.notify_one(); - } - } - - // Wait for threads to finish. - for (auto t : threads_) { - delete t; - } - } - - // Schedule fn() for execution in the pool of threads. The functions are - // executed in the order in which they are scheduled. - void Schedule(std::function<void()> fn) final { - Task t = env_.CreateTask(std::move(fn)); - std::unique_lock<std::mutex> l(mu_); - if (waiters_.empty()) { - pending_.push_back(std::move(t)); - } else { - Waiter* w = waiters_.back(); - waiters_.pop_back(); - w->ready = true; - w->task = std::move(t); - w->cv.notify_one(); - } - } - - int NumThreads() const final { - return static_cast<int>(threads_.size()); - } - - int CurrentThreadId() const final { - const PerThread* pt = this->GetPerThread(); - if (pt->pool == this) { - return pt->thread_id; - } else { - return -1; - } - } - - protected: - void WorkerLoop(int thread_id) { - std::unique_lock<std::mutex> l(mu_); - PerThread* pt = GetPerThread(); - pt->pool = this; - pt->thread_id = thread_id; - Waiter w; - Task t; - while (!exiting_) { - if (pending_.empty()) { - // Wait for work to be assigned to me - w.ready = false; - waiters_.push_back(&w); - while (!w.ready) { - w.cv.wait(l); - } - t = w.task; - w.task.f = nullptr; - } else { - // Pick up pending work - t = std::move(pending_.front()); - pending_.pop_front(); - if (pending_.empty()) { - empty_.notify_all(); - } - } - if (t.f) { - mu_.unlock(); - env_.ExecuteTask(t); - t.f = nullptr; - mu_.lock(); - } - } - } - - private: - typedef typename Environment::Task Task; - typedef typename Environment::EnvThread Thread; - - struct Waiter { - std::condition_variable cv; - Task task; - bool ready; - }; - - struct PerThread { - constexpr PerThread() : pool(NULL), thread_id(-1) { } - SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads. - int thread_id; // Worker thread index in pool. - }; - - Environment env_; - std::mutex mu_; - MaxSizeVector<Thread*> threads_; // All threads - MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads. - std::deque<Task> pending_; // Queue of pending work - std::condition_variable empty_; // Signaled on pending_.empty() - bool exiting_ = false; - - PerThread* GetPerThread() const { - EIGEN_THREAD_LOCAL PerThread per_thread; - return &per_thread; - } -}; - -typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool; - -} // namespace Eigen - -#endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadCancel.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadCancel.h new file mode 100644 index 000000000..a05685f11 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadCancel.h @@ -0,0 +1,23 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com> +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_THREAD_CANCEL_H +#define EIGEN_CXX11_THREADPOOL_THREAD_CANCEL_H + +// Try to come up with a portable way to cancel a thread +#if EIGEN_OS_GNULINUX + #define EIGEN_THREAD_CANCEL(t) \ + pthread_cancel(t.native_handle()); + #define EIGEN_SUPPORTS_THREAD_CANCELLATION 1 +#else +#define EIGEN_THREAD_CANCEL(t) +#endif + + +#endif // EIGEN_CXX11_THREADPOOL_THREAD_CANCEL_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h index 399f95cc1..d94a06416 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -23,6 +23,8 @@ struct StlThreadEnvironment { public: EnvThread(std::function<void()> f) : thr_(std::move(f)) {} ~EnvThread() { thr_.join(); } + // This function is called when the threadpool is cancelled. + void OnCancel() { } private: std::thread thr_; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index cfa221732..4e6847404 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,13 +10,292 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -// Try to come up with a portable implementation of thread local variables -#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7) -#define EIGEN_THREAD_LOCAL static __thread -#elif EIGEN_COMP_CLANG -#define EIGEN_THREAD_LOCAL static __thread +#ifdef EIGEN_AVOID_THREAD_LOCAL + +#ifdef EIGEN_THREAD_LOCAL +#undef EIGEN_THREAD_LOCAL +#endif + #else + +#if EIGEN_MAX_CPP_VER >= 11 && \ + ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \ + __has_feature(cxx_thread_local) || \ + (EIGEN_COMP_MSVC >= 1900) ) #define EIGEN_THREAD_LOCAL static thread_local #endif +// Disable TLS for Apple and Android builds with older toolchains. +#if defined(__APPLE__) +// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED, +// __IPHONE_8_0. +#include <Availability.h> +#include <TargetConditionals.h> +#endif +// Checks whether C++11's `thread_local` storage duration specifier is +// supported. +#if defined(__apple_build_version__) && \ + ((__apple_build_version__ < 8000042) || \ + (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)) +// Notes: Xcode's clang did not support `thread_local` until version +// 8, and even then not for all iOS < 9.0. +#undef EIGEN_THREAD_LOCAL + +#elif defined(__ANDROID__) && EIGEN_COMP_CLANG +// There are platforms for which TLS should not be used even though the compiler +// makes it seem like it's supported (Android NDK < r12b for example). +// This is primarily because of linker problems and toolchain misconfiguration: +// TLS isn't supported until NDK r12b per +// https://developer.android.com/ndk/downloads/revision_history.html +// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in +// <android/ndk-version.h>. For NDK < r16, users should define these macros, +// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11. +#if __has_include(<android/ndk-version.h>) +#include <android/ndk-version.h> +#endif // __has_include(<android/ndk-version.h>) +#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \ + defined(__NDK_MINOR__) && \ + ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1))) +#undef EIGEN_THREAD_LOCAL +#endif +#endif // defined(__ANDROID__) && defined(__clang__) + +#endif // EIGEN_AVOID_THREAD_LOCAL + +namespace Eigen { + +namespace internal { +template <typename T> +struct ThreadLocalNoOpInitialize { + void operator()(T&) const {} +}; + +template <typename T> +struct ThreadLocalNoOpRelease { + void operator()(T&) const {} +}; + +} // namespace internal + +// Thread local container for elements of type T, that does not use thread local +// storage. As long as the number of unique threads accessing this storage +// is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will +// use a mutex for synchronization. +// +// Type `T` has to be default constructible, and by default each thread will get +// a default constructed value. It is possible to specify custom `initialize` +// callable, that will be called lazily from each thread accessing this object, +// and will be passed a default initialized object of type `T`. Also it's +// possible to pass a custom `release` callable, that will be invoked before +// calling ~T(). +// +// Example: +// +// struct Counter { +// int value = 0; +// } +// +// Eigen::ThreadLocal<Counter> counter(10); +// +// // Each thread will have access to it's own counter object. +// Counter& cnt = counter.local(); +// cnt++; +// +// WARNING: Eigen::ThreadLocal uses the OS-specific value returned by +// std::this_thread::get_id() to identify threads. This value is not guaranteed +// to be unique except for the life of the thread. A newly created thread may +// get an OS-specific ID equal to that of an already destroyed thread. +// +// Somewhat similar to TBB thread local storage, with similar restrictions: +// https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html +// +template <typename T, + typename Initialize = internal::ThreadLocalNoOpInitialize<T>, + typename Release = internal::ThreadLocalNoOpRelease<T>> +class ThreadLocal { + // We preallocate default constructed elements in MaxSizedVector. + static_assert(std::is_default_constructible<T>::value, + "ThreadLocal data type must be default constructible"); + + public: + explicit ThreadLocal(int capacity) + : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(), + internal::ThreadLocalNoOpRelease<T>()) {} + + ThreadLocal(int capacity, Initialize initialize) + : ThreadLocal(capacity, std::move(initialize), + internal::ThreadLocalNoOpRelease<T>()) {} + + ThreadLocal(int capacity, Initialize initialize, Release release) + : initialize_(std::move(initialize)), + release_(std::move(release)), + capacity_(capacity), + data_(capacity_), + ptr_(capacity_), + filled_records_(0) { + eigen_assert(capacity_ >= 0); + data_.resize(capacity_); + for (int i = 0; i < capacity_; ++i) { + ptr_.emplace_back(nullptr); + } + } + + T& local() { + std::thread::id this_thread = std::this_thread::get_id(); + if (capacity_ == 0) return SpilledLocal(this_thread); + + std::size_t h = std::hash<std::thread::id>()(this_thread); + const int start_idx = h % capacity_; + + // NOTE: From the definition of `std::this_thread::get_id()` it is + // guaranteed that we never can have concurrent insertions with the same key + // to our hash-map like data structure. If we didn't find an element during + // the initial traversal, it's guaranteed that no one else could have + // inserted it while we are in this function. This allows to massively + // simplify out lock-free insert-only hash map. + + // Check if we already have an element for `this_thread`. + int idx = start_idx; + while (ptr_[idx].load() != nullptr) { + ThreadIdAndValue& record = *(ptr_[idx].load()); + if (record.thread_id == this_thread) return record.value; + + idx += 1; + if (idx >= capacity_) idx -= capacity_; + if (idx == start_idx) break; + } + + // If we are here, it means that we found an insertion point in lookup + // table at `idx`, or we did a full traversal and table is full. + + // If lock-free storage is full, fallback on mutex. + if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread); + + // We double check that we still have space to insert an element into a lock + // free storage. If old value in `filled_records_` is larger than the + // records capacity, it means that some other thread added an element while + // we were traversing lookup table. + int insertion_index = + filled_records_.fetch_add(1, std::memory_order_relaxed); + if (insertion_index >= capacity_) return SpilledLocal(this_thread); + + // At this point it's guaranteed that we can access to + // data_[insertion_index_] without a data race. + data_[insertion_index].thread_id = this_thread; + initialize_(data_[insertion_index].value); + + // That's the pointer we'll put into the lookup table. + ThreadIdAndValue* inserted = &data_[insertion_index]; + + // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop. + ThreadIdAndValue* empty = nullptr; + + // Now we have to find an insertion point into the lookup table. We start + // from the `idx` that was identified as an insertion point above, it's + // guaranteed that we will have an empty record somewhere in a lookup table + // (because we created a record in the `data_`). + const int insertion_idx = idx; + + do { + // Always start search from the original insertion candidate. + idx = insertion_idx; + while (ptr_[idx].load() != nullptr) { + idx += 1; + if (idx >= capacity_) idx -= capacity_; + // If we did a full loop, it means that we don't have any free entries + // in the lookup table, and this means that something is terribly wrong. + eigen_assert(idx != insertion_idx); + } + // Atomic CAS of the pointer guarantees that any other thread, that will + // follow this pointer will see all the mutations in the `data_`. + } while (!ptr_[idx].compare_exchange_weak(empty, inserted)); + + return inserted->value; + } + + // WARN: It's not thread safe to call it concurrently with `local()`. + void ForEach(std::function<void(std::thread::id, T&)> f) { + // Reading directly from `data_` is unsafe, because only CAS to the + // record in `ptr_` makes all changes visible to other threads. + for (auto& ptr : ptr_) { + ThreadIdAndValue* record = ptr.load(); + if (record == nullptr) continue; + f(record->thread_id, record->value); + } + + // We did not spill into the map based storage. + if (filled_records_.load(std::memory_order_relaxed) < capacity_) return; + + // Adds a happens before edge from the last call to SpilledLocal(). + std::unique_lock<std::mutex> lock(mu_); + for (auto& kv : per_thread_map_) { + f(kv.first, kv.second); + } + } + + // WARN: It's not thread safe to call it concurrently with `local()`. + ~ThreadLocal() { + // Reading directly from `data_` is unsafe, because only CAS to the record + // in `ptr_` makes all changes visible to other threads. + for (auto& ptr : ptr_) { + ThreadIdAndValue* record = ptr.load(); + if (record == nullptr) continue; + release_(record->value); + } + + // We did not spill into the map based storage. + if (filled_records_.load(std::memory_order_relaxed) < capacity_) return; + + // Adds a happens before edge from the last call to SpilledLocal(). + std::unique_lock<std::mutex> lock(mu_); + for (auto& kv : per_thread_map_) { + release_(kv.second); + } + } + + private: + struct ThreadIdAndValue { + std::thread::id thread_id; + T value; + }; + + // Use unordered map guarded by a mutex when lock free storage is full. + T& SpilledLocal(std::thread::id this_thread) { + std::unique_lock<std::mutex> lock(mu_); + + auto it = per_thread_map_.find(this_thread); + if (it == per_thread_map_.end()) { + auto result = per_thread_map_.emplace(this_thread, T()); + eigen_assert(result.second); + initialize_((*result.first).second); + return (*result.first).second; + } else { + return it->second; + } + } + + Initialize initialize_; + Release release_; + const int capacity_; + + // Storage that backs lock-free lookup table `ptr_`. Records stored in this + // storage contiguously starting from index 0. + MaxSizeVector<ThreadIdAndValue> data_; + + // Atomic pointers to the data stored in `data_`. Used as a lookup table for + // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing). + MaxSizeVector<std::atomic<ThreadIdAndValue*>> ptr_; + + // Number of records stored in the `data_`. + std::atomic<int> filled_records_; + + // We fallback on per thread map if lock-free storage is full. In practice + // this should never happen, if `capacity_` is a reasonable estimate of the + // number of threads running in a system. + std::mutex mu_; // Protects per_thread_map_. + std::unordered_map<std::thread::id, T> per_thread_map_; +}; + +} // namespace Eigen + #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h index a65ee97c9..25030dc0b 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -16,8 +16,23 @@ namespace Eigen { // custom thread pools underneath. class ThreadPoolInterface { public: + // Submits a closure to be run by a thread in the pool. virtual void Schedule(std::function<void()> fn) = 0; + // Submits a closure to be run by threads in the range [start, end) in the + // pool. + virtual void ScheduleWithHint(std::function<void()> fn, int /*start*/, + int /*end*/) { + // Just defer to Schedule in case sub-classes aren't interested in + // overriding this functionality. + Schedule(fn); + } + + // If implemented, stop processing the closures that have been enqueued. + // Currently running closures may still be processed. + // If not implemented, does nothing. + virtual void Cancel() {} + // Returns the number of threads in the pool. virtual int NumThreads() const = 0; |