diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 398 |
1 files changed, 305 insertions, 93 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 354bce52a..23a2b5467 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,79 +10,116 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template <typename Environment> -class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { +class ThreadPoolTempl : public Eigen::ThreadPoolInterface { public: typedef typename Environment::Task Task; typedef RunQueue<Task, 1024> Queue; - NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment()) + ThreadPoolTempl(int num_threads, Environment env = Environment()) + : ThreadPoolTempl(num_threads, true, env) {} + + ThreadPoolTempl(int num_threads, bool allow_spinning, + Environment env = Environment()) : env_(env), - threads_(num_threads), - queues_(num_threads), - coprimes_(num_threads), + num_threads_(num_threads), + allow_spinning_(allow_spinning), + thread_data_(num_threads), + all_coprimes_(num_threads), waiters_(num_threads), + global_steal_partition_(EncodePartition(0, num_threads_)), blocked_(0), spinning_(0), done_(false), + cancelled_(false), ec_(waiters_) { - waiters_.resize(num_threads); - - // Calculate coprimes of num_threads. - // Coprimes are used for a random walk over all threads in Steal + waiters_.resize(num_threads_); + // Calculate coprimes of all numbers [1, num_threads]. + // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take - // a walk starting thread index t and calculate num_threads - 1 subsequent + // a random starting thread index t and calculate num_threads - 1 subsequent // indices as (t + coprime) % num_threads, we will cover all threads without // repetitions (effectively getting a presudo-random permutation of thread // indices). - for (int i = 1; i <= num_threads; i++) { - unsigned a = i; - unsigned b = num_threads; - // If GCD(a, b) == 1, then a and b are coprimes. - while (b != 0) { - unsigned tmp = a; - a = b; - b = tmp % b; - } - if (a == 1) { - coprimes_.push_back(i); - } - } - for (int i = 0; i < num_threads; i++) { - queues_.push_back(new Queue()); + eigen_plain_assert(num_threads_ < kMaxThreads); + for (int i = 1; i <= num_threads_; ++i) { + all_coprimes_.emplace_back(i); + ComputeCoprimes(i, &all_coprimes_.back()); } - for (int i = 0; i < num_threads; i++) { - threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif + thread_data_.resize(num_threads_); + for (int i = 0; i < num_threads_; i++) { + SetStealPartition(i, EncodePartition(0, num_threads_)); + thread_data_[i].thread.reset( + env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } - ~NonBlockingThreadPoolTempl() { + ~ThreadPoolTempl() { done_ = true; + // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - ec_.Notify(true); + if (!cancelled_) { + ec_.Notify(true); + } else { + // Since we were cancelled, there might be entries in the queues. + // Empty them to prevent their destructor from asserting. + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].queue.Flush(); + } + } + // Join threads explicitly (by destroying) to avoid destruction order within + // this class. + for (size_t i = 0; i < thread_data_.size(); ++i) + thread_data_[i].thread.reset(); + } + + void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) { + eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_)); - // Join threads explicitly to avoid destruction order issues. - for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; - for (size_t i = 0; i < threads_.size(); i++) delete queues_[i]; + // Pass this information to each thread queue. + for (int i = 0; i < num_threads_; i++) { + const auto& pair = partitions[i]; + unsigned start = pair.first, end = pair.second; + AssertBounds(start, end); + unsigned val = EncodePartition(start, end); + SetStealPartition(i, val); + } + } + + void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { + ScheduleWithHint(std::move(fn), 0, num_threads_); } - void Schedule(std::function<void()> fn) { + void ScheduleWithHint(std::function<void()> fn, int start, + int limit) override { Task t = env_.CreateTask(std::move(fn)); PerThread* pt = GetPerThread(); if (pt->pool == this) { // Worker thread of this pool, push onto the thread's queue. - Queue* q = queues_[pt->thread_id]; - t = q->PushFront(std::move(t)); + Queue& q = thread_data_[pt->thread_id].queue; + t = q.PushFront(std::move(t)); } else { // A free-standing thread (or worker of another pool), push onto a random // queue. - Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; - t = q->PushBack(std::move(t)); + eigen_plain_assert(start < limit); + eigen_plain_assert(limit <= num_threads_); + int num_queues = limit - start; + int rnd = Rand(&pt->rand) % num_queues; + eigen_plain_assert(start + rnd < limit); + Queue& q = thread_data_[start + rnd].queue; + t = q.PushBack(std::move(t)); } // Note: below we touch this after making w available to worker threads. // Strictly speaking, this can lead to a racy-use-after-free. Consider that @@ -91,19 +128,32 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // completes overall computations, which in turn leads to destruction of // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. - if (!t.f) + if (!t.f) { ec_.Notify(false); - else + } else { env_.ExecuteTask(t); // Push failed, execute directly. + } } - int NumThreads() const final { - return static_cast<int>(threads_.size()); + void Cancel() EIGEN_OVERRIDE { + cancelled_ = true; + done_ = true; + + // Let each thread know it's been cancelled. +#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION + for (size_t i = 0; i < thread_data_.size(); i++) { + thread_data_[i].thread->OnCancel(); + } +#endif + + // Wake up the threads without work to let them exit on their own. + ec_.Notify(true); } - int CurrentThreadId() const final { - const PerThread* pt = - const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread(); + int NumThreads() const EIGEN_FINAL { return num_threads_; } + + int CurrentThreadId() const EIGEN_FINAL { + const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -112,72 +162,191 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { } private: + // Create a single atomic<int> that encodes start and limit information for + // each thread. + // We expect num_threads_ < 65536, so we can store them in a single + // std::atomic<unsigned>. + // Exposed publicly as static functions so that external callers can reuse + // this encode/decode logic for maintaining their own thread-safe copies of + // scheduling and steal domain(s). + static const int kMaxPartitionBits = 16; + static const int kMaxThreads = 1 << kMaxPartitionBits; + + inline unsigned EncodePartition(unsigned start, unsigned limit) { + return (start << kMaxPartitionBits) | limit; + } + + inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) { + *limit = val & (kMaxThreads - 1); + val >>= kMaxPartitionBits; + *start = val; + } + + void AssertBounds(int start, int end) { + eigen_plain_assert(start >= 0); + eigen_plain_assert(start < end); // non-zero sized partition + eigen_plain_assert(end <= num_threads_); + } + + inline void SetStealPartition(size_t i, unsigned val) { + thread_data_[i].steal_partition.store(val, std::memory_order_relaxed); + } + + inline unsigned GetStealPartition(int i) { + return thread_data_[i].steal_partition.load(std::memory_order_relaxed); + } + + void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) { + for (int i = 1; i <= N; i++) { + unsigned a = i; + unsigned b = N; + // If GCD(a, b) == 1, then a and b are coprimes. + while (b != 0) { + unsigned tmp = a; + a = b; + b = tmp % b; + } + if (a == 1) { + coprimes->push_back(i); + } + } + } + typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } - NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} + ThreadPoolTempl* pool; // Parent pool, or null for normal threads. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. +#ifndef EIGEN_THREAD_LOCAL + // Prevent false sharing. + char pad_[128]; +#endif + }; + + struct ThreadData { + constexpr ThreadData() : thread(), steal_partition(0), queue() {} + std::unique_ptr<Thread> thread; + std::atomic<unsigned> steal_partition; + Queue queue; }; Environment env_; - MaxSizeVector<Thread*> threads_; - MaxSizeVector<Queue*> queues_; - MaxSizeVector<unsigned> coprimes_; + const int num_threads_; + const bool allow_spinning_; + MaxSizeVector<ThreadData> thread_data_; + MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_; MaxSizeVector<EventCount::Waiter> waiters_; + unsigned global_steal_partition_; std::atomic<unsigned> blocked_; std::atomic<bool> spinning_; std::atomic<bool> done_; + std::atomic<bool> cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<Barrier> init_barrier_; + std::mutex per_thread_map_mutex_; // Protects per_thread_map_. + std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr<PerThread> new_pt(new PerThread()); + per_thread_map_mutex_.lock(); + bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second; + eigen_plain_assert(insertOK); + EIGEN_UNUSED_VARIABLE(insertOK); + per_thread_map_mutex_.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#endif PerThread* pt = GetPerThread(); pt->pool = this; - pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; - Queue* q = queues_[thread_id]; + Queue& q = thread_data_[thread_id].queue; EventCount::Waiter* waiter = &waiters_[thread_id]; - for (;;) { - Task t = q->PopFront(); - if (!t.f) { - t = Steal(); + // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is + // proportional to num_threads_ and we assume that new work is scheduled at + // a constant rate, so we set spin_count to 5000 / num_threads_. The + // constant was picked based on a fair dice roll, tune it. + const int spin_count = + allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; + if (num_threads_ == 1) { + // For num_threads_ == 1 there is no point in going through the expensive + // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the + // victim queues it might reverse the order in which ops are executed + // compared to the order in which they are scheduled, which tends to be + // counter-productive for the types of I/O workloads the single thread + // pools tend to be used for. + while (!cancelled_) { + Task t = q.PopFront(); + for (int i = 0; i < spin_count && !t.f; i++) { + if (!cancelled_.load(std::memory_order_relaxed)) { + t = q.PopFront(); + } + } if (!t.f) { - // Leave one thread spinning. This reduces latency. - // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. - // Also, the time it takes to attempt to steal work 1000 times depends - // on the size of the thread pool. However the speed at which the user - // of the thread pool submit tasks is independent of the size of the - // pool. Consider a time based limit instead. - if (!spinning_ && !spinning_.exchange(true)) { - for (int i = 0; i < 1000 && !t.f; i++) { - t = Steal(); - } - spinning_ = false; + if (!WaitForWork(waiter, &t)) { + return; } + } + if (t.f) { + env_.ExecuteTask(t); + } + } + } else { + while (!cancelled_) { + Task t = q.PopFront(); + if (!t.f) { + t = LocalSteal(); if (!t.f) { - if (!WaitForWork(waiter, &t)) { - return; + t = GlobalSteal(); + if (!t.f) { + // Leave one thread spinning. This reduces latency. + if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { + for (int i = 0; i < spin_count && !t.f; i++) { + if (!cancelled_.load(std::memory_order_relaxed)) { + t = GlobalSteal(); + } else { + return; + } + } + spinning_ = false; + } + if (!t.f) { + if (!WaitForWork(waiter, &t)) { + return; + } + } } } } - } - if (t.f) { - env_.ExecuteTask(t); + if (t.f) { + env_.ExecuteTask(t); + } } } } - // Steal tries to steal work from other worker threads in best-effort manner. - Task Steal() { + // Steal tries to steal work from other worker threads in the range [start, + // limit) in best-effort manner. + Task Steal(unsigned start, unsigned limit) { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + const size_t size = limit - start; unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; - unsigned victim = r % size; + // Reduce r into [0, size) range, this utilizes trick from + // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30)); + unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32; + unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32; + unsigned inc = all_coprimes_[size - 1][index]; + for (unsigned i = 0; i < size; i++) { - Task t = queues_[victim]->PopBack(); + eigen_plain_assert(start + victim < limit); + Task t = thread_data_[start + victim].queue.PopBack(); if (t.f) { return t; } @@ -189,27 +358,52 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return Task(); } + // Steals work within threads belonging to the partition. + Task LocalSteal() { + PerThread* pt = GetPerThread(); + unsigned partition = GetStealPartition(pt->thread_id); + // If thread steal partition is the same as global partition, there is no + // need to go through the steal loop twice. + if (global_steal_partition_ == partition) return Task(); + unsigned start, limit; + DecodePartition(partition, &start, &limit); + AssertBounds(start, limit); + + return Steal(start, limit); + } + + // Steals work from any other thread in the pool. + Task GlobalSteal() { + return Steal(0, num_threads_); + } + + // WaitForWork blocks until new work is available (returns true), or if it is // time to exit (returns false). Can optionally return a task to execute in t // (in such case t.f != nullptr on return). bool WaitForWork(EventCount::Waiter* waiter, Task* t) { - eigen_assert(!t->f); + eigen_plain_assert(!t->f); // We already did best-effort emptiness check in Steal, so prepare for // blocking. - ec_.Prewait(waiter); + ec_.Prewait(); // Now do a reliable emptiness check. int victim = NonEmptyQueueIndex(); if (victim != -1) { - ec_.CancelWait(waiter); - *t = queues_[victim]->PopBack(); - return true; + ec_.CancelWait(); + if (cancelled_) { + return false; + } else { + *t = thread_data_[victim].queue.PopBack(); + return true; + } } // Number of blocked threads is used as termination condition. // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; - if (done_ && blocked_ == threads_.size()) { - ec_.CancelWait(waiter); + // TODO is blocked_ required to be unsigned? + if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) { + ec_.CancelWait(); // Almost done, but need to re-check queues. // Consider that all queues are empty and all worker threads are preempted // right after incrementing blocked_ above. Now a free-standing thread @@ -236,12 +430,15 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { int NonEmptyQueueIndex() { PerThread* pt = GetPerThread(); - const size_t size = queues_.size(); + // We intentionally design NonEmptyQueueIndex to steal work from + // anywhere in the queue so threads don't block in WaitForWork() forever + // when all threads in their partition go to sleep. Steal is still local. + const size_t size = thread_data_.size(); unsigned r = Rand(&pt->rand); - unsigned inc = coprimes_[r % coprimes_.size()]; + unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; unsigned victim = r % size; for (unsigned i = 0; i < size; i++) { - if (!queues_[victim]->Empty()) { + if (!thread_data_[victim].queue.Empty()) { return victim; } victim += inc; @@ -252,10 +449,24 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash<std::thread::id>()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second.get(); + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -263,11 +474,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast<unsigned>((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; -typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool; +typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool; } // namespace Eigen |