aboutsummaryrefslogtreecommitdiff
path: root/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
diff options
context:
space:
mode:
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h398
1 files changed, 305 insertions, 93 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
index 354bce52a..23a2b5467 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
@@ -10,79 +10,116 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
-
namespace Eigen {
template <typename Environment>
-class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
+class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
public:
typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue;
- NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
+ ThreadPoolTempl(int num_threads, Environment env = Environment())
+ : ThreadPoolTempl(num_threads, true, env) {}
+
+ ThreadPoolTempl(int num_threads, bool allow_spinning,
+ Environment env = Environment())
: env_(env),
- threads_(num_threads),
- queues_(num_threads),
- coprimes_(num_threads),
+ num_threads_(num_threads),
+ allow_spinning_(allow_spinning),
+ thread_data_(num_threads),
+ all_coprimes_(num_threads),
waiters_(num_threads),
+ global_steal_partition_(EncodePartition(0, num_threads_)),
blocked_(0),
spinning_(0),
done_(false),
+ cancelled_(false),
ec_(waiters_) {
- waiters_.resize(num_threads);
-
- // Calculate coprimes of num_threads.
- // Coprimes are used for a random walk over all threads in Steal
+ waiters_.resize(num_threads_);
+ // Calculate coprimes of all numbers [1, num_threads].
+ // Coprimes are used for random walks over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
- // a walk starting thread index t and calculate num_threads - 1 subsequent
+ // a random starting thread index t and calculate num_threads - 1 subsequent
// indices as (t + coprime) % num_threads, we will cover all threads without
// repetitions (effectively getting a presudo-random permutation of thread
// indices).
- for (int i = 1; i <= num_threads; i++) {
- unsigned a = i;
- unsigned b = num_threads;
- // If GCD(a, b) == 1, then a and b are coprimes.
- while (b != 0) {
- unsigned tmp = a;
- a = b;
- b = tmp % b;
- }
- if (a == 1) {
- coprimes_.push_back(i);
- }
- }
- for (int i = 0; i < num_threads; i++) {
- queues_.push_back(new Queue());
+ eigen_plain_assert(num_threads_ < kMaxThreads);
+ for (int i = 1; i <= num_threads_; ++i) {
+ all_coprimes_.emplace_back(i);
+ ComputeCoprimes(i, &all_coprimes_.back());
}
- for (int i = 0; i < num_threads; i++) {
- threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
+#ifndef EIGEN_THREAD_LOCAL
+ init_barrier_.reset(new Barrier(num_threads_));
+#endif
+ thread_data_.resize(num_threads_);
+ for (int i = 0; i < num_threads_; i++) {
+ SetStealPartition(i, EncodePartition(0, num_threads_));
+ thread_data_[i].thread.reset(
+ env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
+#ifndef EIGEN_THREAD_LOCAL
+ // Wait for workers to initialize per_thread_map_. Otherwise we might race
+ // with them in Schedule or CurrentThreadId.
+ init_barrier_->Wait();
+#endif
}
- ~NonBlockingThreadPoolTempl() {
+ ~ThreadPoolTempl() {
done_ = true;
+
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
- ec_.Notify(true);
+ if (!cancelled_) {
+ ec_.Notify(true);
+ } else {
+ // Since we were cancelled, there might be entries in the queues.
+ // Empty them to prevent their destructor from asserting.
+ for (size_t i = 0; i < thread_data_.size(); i++) {
+ thread_data_[i].queue.Flush();
+ }
+ }
+ // Join threads explicitly (by destroying) to avoid destruction order within
+ // this class.
+ for (size_t i = 0; i < thread_data_.size(); ++i)
+ thread_data_[i].thread.reset();
+ }
+
+ void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
+ eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
- // Join threads explicitly to avoid destruction order issues.
- for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
- for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
+ // Pass this information to each thread queue.
+ for (int i = 0; i < num_threads_; i++) {
+ const auto& pair = partitions[i];
+ unsigned start = pair.first, end = pair.second;
+ AssertBounds(start, end);
+ unsigned val = EncodePartition(start, end);
+ SetStealPartition(i, val);
+ }
+ }
+
+ void Schedule(std::function<void()> fn) EIGEN_OVERRIDE {
+ ScheduleWithHint(std::move(fn), 0, num_threads_);
}
- void Schedule(std::function<void()> fn) {
+ void ScheduleWithHint(std::function<void()> fn, int start,
+ int limit) override {
Task t = env_.CreateTask(std::move(fn));
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
- Queue* q = queues_[pt->thread_id];
- t = q->PushFront(std::move(t));
+ Queue& q = thread_data_[pt->thread_id].queue;
+ t = q.PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
- Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
- t = q->PushBack(std::move(t));
+ eigen_plain_assert(start < limit);
+ eigen_plain_assert(limit <= num_threads_);
+ int num_queues = limit - start;
+ int rnd = Rand(&pt->rand) % num_queues;
+ eigen_plain_assert(start + rnd < limit);
+ Queue& q = thread_data_[start + rnd].queue;
+ t = q.PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
@@ -91,19 +128,32 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// completes overall computations, which in turn leads to destruction of
// this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule.
- if (!t.f)
+ if (!t.f) {
ec_.Notify(false);
- else
+ } else {
env_.ExecuteTask(t); // Push failed, execute directly.
+ }
}
- int NumThreads() const final {
- return static_cast<int>(threads_.size());
+ void Cancel() EIGEN_OVERRIDE {
+ cancelled_ = true;
+ done_ = true;
+
+ // Let each thread know it's been cancelled.
+#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
+ for (size_t i = 0; i < thread_data_.size(); i++) {
+ thread_data_[i].thread->OnCancel();
+ }
+#endif
+
+ // Wake up the threads without work to let them exit on their own.
+ ec_.Notify(true);
}
- int CurrentThreadId() const final {
- const PerThread* pt =
- const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread();
+ int NumThreads() const EIGEN_FINAL { return num_threads_; }
+
+ int CurrentThreadId() const EIGEN_FINAL {
+ const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
@@ -112,72 +162,191 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
private:
+ // Create a single atomic<int> that encodes start and limit information for
+ // each thread.
+ // We expect num_threads_ < 65536, so we can store them in a single
+ // std::atomic<unsigned>.
+ // Exposed publicly as static functions so that external callers can reuse
+ // this encode/decode logic for maintaining their own thread-safe copies of
+ // scheduling and steal domain(s).
+ static const int kMaxPartitionBits = 16;
+ static const int kMaxThreads = 1 << kMaxPartitionBits;
+
+ inline unsigned EncodePartition(unsigned start, unsigned limit) {
+ return (start << kMaxPartitionBits) | limit;
+ }
+
+ inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
+ *limit = val & (kMaxThreads - 1);
+ val >>= kMaxPartitionBits;
+ *start = val;
+ }
+
+ void AssertBounds(int start, int end) {
+ eigen_plain_assert(start >= 0);
+ eigen_plain_assert(start < end); // non-zero sized partition
+ eigen_plain_assert(end <= num_threads_);
+ }
+
+ inline void SetStealPartition(size_t i, unsigned val) {
+ thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
+ }
+
+ inline unsigned GetStealPartition(int i) {
+ return thread_data_[i].steal_partition.load(std::memory_order_relaxed);
+ }
+
+ void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
+ for (int i = 1; i <= N; i++) {
+ unsigned a = i;
+ unsigned b = N;
+ // If GCD(a, b) == 1, then a and b are coprimes.
+ while (b != 0) {
+ unsigned tmp = a;
+ a = b;
+ b = tmp % b;
+ }
+ if (a == 1) {
+ coprimes->push_back(i);
+ }
+ }
+ }
+
typedef typename Environment::EnvThread Thread;
struct PerThread {
- constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
- NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
- uint64_t rand; // Random generator state.
- int thread_id; // Worker thread index in pool.
+ constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
+ ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
+ uint64_t rand; // Random generator state.
+ int thread_id; // Worker thread index in pool.
+#ifndef EIGEN_THREAD_LOCAL
+ // Prevent false sharing.
+ char pad_[128];
+#endif
+ };
+
+ struct ThreadData {
+ constexpr ThreadData() : thread(), steal_partition(0), queue() {}
+ std::unique_ptr<Thread> thread;
+ std::atomic<unsigned> steal_partition;
+ Queue queue;
};
Environment env_;
- MaxSizeVector<Thread*> threads_;
- MaxSizeVector<Queue*> queues_;
- MaxSizeVector<unsigned> coprimes_;
+ const int num_threads_;
+ const bool allow_spinning_;
+ MaxSizeVector<ThreadData> thread_data_;
+ MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
+ unsigned global_steal_partition_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
+ std::atomic<bool> cancelled_;
EventCount ec_;
+#ifndef EIGEN_THREAD_LOCAL
+ std::unique_ptr<Barrier> init_barrier_;
+ std::mutex per_thread_map_mutex_; // Protects per_thread_map_.
+ std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
+#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
+#ifndef EIGEN_THREAD_LOCAL
+ std::unique_ptr<PerThread> new_pt(new PerThread());
+ per_thread_map_mutex_.lock();
+ bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
+ eigen_plain_assert(insertOK);
+ EIGEN_UNUSED_VARIABLE(insertOK);
+ per_thread_map_mutex_.unlock();
+ init_barrier_->Notify();
+ init_barrier_->Wait();
+#endif
PerThread* pt = GetPerThread();
pt->pool = this;
- pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
+ pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
- Queue* q = queues_[thread_id];
+ Queue& q = thread_data_[thread_id].queue;
EventCount::Waiter* waiter = &waiters_[thread_id];
- for (;;) {
- Task t = q->PopFront();
- if (!t.f) {
- t = Steal();
+ // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
+ // proportional to num_threads_ and we assume that new work is scheduled at
+ // a constant rate, so we set spin_count to 5000 / num_threads_. The
+ // constant was picked based on a fair dice roll, tune it.
+ const int spin_count =
+ allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
+ if (num_threads_ == 1) {
+ // For num_threads_ == 1 there is no point in going through the expensive
+ // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
+ // victim queues it might reverse the order in which ops are executed
+ // compared to the order in which they are scheduled, which tends to be
+ // counter-productive for the types of I/O workloads the single thread
+ // pools tend to be used for.
+ while (!cancelled_) {
+ Task t = q.PopFront();
+ for (int i = 0; i < spin_count && !t.f; i++) {
+ if (!cancelled_.load(std::memory_order_relaxed)) {
+ t = q.PopFront();
+ }
+ }
if (!t.f) {
- // Leave one thread spinning. This reduces latency.
- // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
- // Also, the time it takes to attempt to steal work 1000 times depends
- // on the size of the thread pool. However the speed at which the user
- // of the thread pool submit tasks is independent of the size of the
- // pool. Consider a time based limit instead.
- if (!spinning_ && !spinning_.exchange(true)) {
- for (int i = 0; i < 1000 && !t.f; i++) {
- t = Steal();
- }
- spinning_ = false;
+ if (!WaitForWork(waiter, &t)) {
+ return;
}
+ }
+ if (t.f) {
+ env_.ExecuteTask(t);
+ }
+ }
+ } else {
+ while (!cancelled_) {
+ Task t = q.PopFront();
+ if (!t.f) {
+ t = LocalSteal();
if (!t.f) {
- if (!WaitForWork(waiter, &t)) {
- return;
+ t = GlobalSteal();
+ if (!t.f) {
+ // Leave one thread spinning. This reduces latency.
+ if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
+ for (int i = 0; i < spin_count && !t.f; i++) {
+ if (!cancelled_.load(std::memory_order_relaxed)) {
+ t = GlobalSteal();
+ } else {
+ return;
+ }
+ }
+ spinning_ = false;
+ }
+ if (!t.f) {
+ if (!WaitForWork(waiter, &t)) {
+ return;
+ }
+ }
}
}
}
- }
- if (t.f) {
- env_.ExecuteTask(t);
+ if (t.f) {
+ env_.ExecuteTask(t);
+ }
}
}
}
- // Steal tries to steal work from other worker threads in best-effort manner.
- Task Steal() {
+ // Steal tries to steal work from other worker threads in the range [start,
+ // limit) in best-effort manner.
+ Task Steal(unsigned start, unsigned limit) {
PerThread* pt = GetPerThread();
- const size_t size = queues_.size();
+ const size_t size = limit - start;
unsigned r = Rand(&pt->rand);
- unsigned inc = coprimes_[r % coprimes_.size()];
- unsigned victim = r % size;
+ // Reduce r into [0, size) range, this utilizes trick from
+ // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
+ eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30));
+ unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
+ unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
+ unsigned inc = all_coprimes_[size - 1][index];
+
for (unsigned i = 0; i < size; i++) {
- Task t = queues_[victim]->PopBack();
+ eigen_plain_assert(start + victim < limit);
+ Task t = thread_data_[start + victim].queue.PopBack();
if (t.f) {
return t;
}
@@ -189,27 +358,52 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
return Task();
}
+ // Steals work within threads belonging to the partition.
+ Task LocalSteal() {
+ PerThread* pt = GetPerThread();
+ unsigned partition = GetStealPartition(pt->thread_id);
+ // If thread steal partition is the same as global partition, there is no
+ // need to go through the steal loop twice.
+ if (global_steal_partition_ == partition) return Task();
+ unsigned start, limit;
+ DecodePartition(partition, &start, &limit);
+ AssertBounds(start, limit);
+
+ return Steal(start, limit);
+ }
+
+ // Steals work from any other thread in the pool.
+ Task GlobalSteal() {
+ return Steal(0, num_threads_);
+ }
+
+
// WaitForWork blocks until new work is available (returns true), or if it is
// time to exit (returns false). Can optionally return a task to execute in t
// (in such case t.f != nullptr on return).
bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
- eigen_assert(!t->f);
+ eigen_plain_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for
// blocking.
- ec_.Prewait(waiter);
+ ec_.Prewait();
// Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
- ec_.CancelWait(waiter);
- *t = queues_[victim]->PopBack();
- return true;
+ ec_.CancelWait();
+ if (cancelled_) {
+ return false;
+ } else {
+ *t = thread_data_[victim].queue.PopBack();
+ return true;
+ }
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
- if (done_ && blocked_ == threads_.size()) {
- ec_.CancelWait(waiter);
+ // TODO is blocked_ required to be unsigned?
+ if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
+ ec_.CancelWait();
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
@@ -236,12 +430,15 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
int NonEmptyQueueIndex() {
PerThread* pt = GetPerThread();
- const size_t size = queues_.size();
+ // We intentionally design NonEmptyQueueIndex to steal work from
+ // anywhere in the queue so threads don't block in WaitForWork() forever
+ // when all threads in their partition go to sleep. Steal is still local.
+ const size_t size = thread_data_.size();
unsigned r = Rand(&pt->rand);
- unsigned inc = coprimes_[r % coprimes_.size()];
+ unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
- if (!queues_[victim]->Empty()) {
+ if (!thread_data_[victim].queue.Empty()) {
return victim;
}
victim += inc;
@@ -252,10 +449,24 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
return -1;
}
- static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+ static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
+ return std::hash<std::thread::id>()(std::this_thread::get_id());
+ }
+
+ EIGEN_STRONG_INLINE PerThread* GetPerThread() {
+#ifndef EIGEN_THREAD_LOCAL
+ static PerThread dummy;
+ auto it = per_thread_map_.find(GlobalThreadIdHash());
+ if (it == per_thread_map_.end()) {
+ return &dummy;
+ } else {
+ return it->second.get();
+ }
+#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
+#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
@@ -263,11 +474,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
- return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
+ return static_cast<unsigned>((current ^ (current >> 22)) >>
+ (22 + (current >> 61)));
}
};
-typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
+typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
} // namespace Eigen