diff options
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h')
-rw-r--r-- | unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h | 100 |
1 files changed, 63 insertions, 37 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index 05ed76cbe..b572ebcdf 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - namespace Eigen { // RunQueue is a fixed-size, partially non-blocking deque or Work items. @@ -40,14 +39,14 @@ class RunQueue { public: RunQueue() : front_(0), back_(0) { // require power-of-two for fast masking - eigen_assert((kSize & (kSize - 1)) == 0); - eigen_assert(kSize > 2); // why would you do this? - eigen_assert(kSize <= (64 << 10)); // leave enough space for counter + eigen_plain_assert((kSize & (kSize - 1)) == 0); + eigen_plain_assert(kSize > 2); // why would you do this? + eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed); } - ~RunQueue() { eigen_assert(Size() == 0); } + ~RunQueue() { eigen_plain_assert(Size() == 0); } // PushFront inserts w at the beginning of the queue. // If queue is full returns w, otherwise returns default-constructed Work. @@ -98,11 +97,9 @@ class RunQueue { } // PopBack removes and returns the last elements in the queue. - // Can fail spuriously. Work PopBack() { if (Empty()) return Work(); - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return Work(); + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -116,11 +113,10 @@ class RunQueue { } // PopBackHalf removes and returns half last elements in the queue. - // Returns number of elements removed. But can also fail spuriously. + // Returns number of elements removed. unsigned PopBackHalf(std::vector<Work>* result) { if (Empty()) return 0; - std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); - if (!lock) return 0; + std::unique_lock<std::mutex> lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -131,15 +127,14 @@ class RunQueue { Elem* e = &array_[mid & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) + if (s != kReady || !e->state.compare_exchange_strong( + s, kBusy, std::memory_order_acquire)) continue; start = mid; } else { // Note: no need to store temporal kBusy, we exclusively own these // elements. - eigen_assert(s == kReady); + eigen_plain_assert(s == kReady); } result->push_back(std::move(e->w)); e->state.store(kEmpty, std::memory_order_release); @@ -152,30 +147,18 @@ class RunQueue { // Size returns current queue size. // Can be called by any thread at any time. - unsigned Size() const { - // Emptiness plays critical role in thread pool blocking. So we go to great - // effort to not produce false positives (claim non-empty queue as empty). - for (;;) { - // Capture a consistent snapshot of front/tail. - unsigned front = front_.load(std::memory_order_acquire); - unsigned back = back_.load(std::memory_order_acquire); - unsigned front1 = front_.load(std::memory_order_relaxed); - if (front != front1) continue; - int size = (front & kMask2) - (back & kMask2); - // Fix overflow. - if (size < 0) size += 2 * kSize; - // Order of modification in push/pop is crafted to make the queue look - // larger than it is during concurrent modifications. E.g. pop can - // decrement size before the corresponding push has incremented it. - // So the computed size can be up to kSize + 1, fix it. - if (size > static_cast<int>(kSize)) size = kSize; - return size; - } - } + unsigned Size() const { return SizeOrNotEmpty<true>(); } // Empty tests whether container is empty. // Can be called by any thread at any time. - bool Empty() const { return Size() == 0; } + bool Empty() const { return SizeOrNotEmpty<false>() == 0; } + + // Delete all the elements from the queue. + void Flush() { + while (!Empty()) { + PopFront(); + } + } private: static const unsigned kMask = kSize - 1; @@ -191,7 +174,7 @@ class RunQueue { }; std::mutex mutex_; // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of - // front/back, repsectively. The remaining bits contain modification counters + // front/back, respectively. The remaining bits contain modification counters // that are incremented on Push operations. This allows us to (1) distinguish // between empty and full conditions (if we would use log(kSize) bits for // position, these conditions would be indistinguishable); (2) obtain @@ -201,6 +184,49 @@ class RunQueue { std::atomic<unsigned> back_; Elem array_[kSize]; + // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, + // only whether the size is 0 is guaranteed to be correct. + // Can be called by any thread at any time. + template<bool NeedSizeEstimate> + unsigned SizeOrNotEmpty() const { + // Emptiness plays critical role in thread pool blocking. So we go to great + // effort to not produce false positives (claim non-empty queue as empty). + unsigned front = front_.load(std::memory_order_acquire); + for (;;) { + // Capture a consistent snapshot of front/tail. + unsigned back = back_.load(std::memory_order_acquire); + unsigned front1 = front_.load(std::memory_order_relaxed); + if (front != front1) { + front = front1; + std::atomic_thread_fence(std::memory_order_acquire); + continue; + } + if (NeedSizeEstimate) { + return CalculateSize(front, back); + } else { + // This value will be 0 if the queue is empty, and undefined otherwise. + unsigned maybe_zero = ((front ^ back) & kMask2); + // Queue size estimate must agree with maybe zero check on the queue + // empty/non-empty state. + eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); + return maybe_zero; + } + } + } + + EIGEN_ALWAYS_INLINE + unsigned CalculateSize(unsigned front, unsigned back) const { + int size = (front & kMask2) - (back & kMask2); + // Fix overflow. + if (size < 0) size += 2 * kSize; + // Order of modification in push/pop is crafted to make the queue look + // larger than it is during concurrent modifications. E.g. push can + // increment size before the corresponding pop has decremented it. + // So the computed size can be up to kSize + 1, fix it. + if (size > static_cast<int>(kSize)) size = kSize; + return static_cast<unsigned>(size); + } + RunQueue(const RunQueue&) = delete; void operator=(const RunQueue&) = delete; }; |