aboutsummaryrefslogtreecommitdiff
path: root/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h')
-rw-r--r--unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h100
1 files changed, 63 insertions, 37 deletions
diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
index 05ed76cbe..b572ebcdf 100644
--- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
+++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
@@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
-
namespace Eigen {
// RunQueue is a fixed-size, partially non-blocking deque or Work items.
@@ -40,14 +39,14 @@ class RunQueue {
public:
RunQueue() : front_(0), back_(0) {
// require power-of-two for fast masking
- eigen_assert((kSize & (kSize - 1)) == 0);
- eigen_assert(kSize > 2); // why would you do this?
- eigen_assert(kSize <= (64 << 10)); // leave enough space for counter
+ eigen_plain_assert((kSize & (kSize - 1)) == 0);
+ eigen_plain_assert(kSize > 2); // why would you do this?
+ eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
for (unsigned i = 0; i < kSize; i++)
array_[i].state.store(kEmpty, std::memory_order_relaxed);
}
- ~RunQueue() { eigen_assert(Size() == 0); }
+ ~RunQueue() { eigen_plain_assert(Size() == 0); }
// PushFront inserts w at the beginning of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
@@ -98,11 +97,9 @@ class RunQueue {
}
// PopBack removes and returns the last elements in the queue.
- // Can fail spuriously.
Work PopBack() {
if (Empty()) return Work();
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (!lock) return Work();
+ std::unique_lock<std::mutex> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
@@ -116,11 +113,10 @@ class RunQueue {
}
// PopBackHalf removes and returns half last elements in the queue.
- // Returns number of elements removed. But can also fail spuriously.
+ // Returns number of elements removed.
unsigned PopBackHalf(std::vector<Work>* result) {
if (Empty()) return 0;
- std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
- if (!lock) return 0;
+ std::unique_lock<std::mutex> lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size();
unsigned mid = back;
@@ -131,15 +127,14 @@ class RunQueue {
Elem* e = &array_[mid & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
if (n == 0) {
- if (s != kReady ||
- !e->state.compare_exchange_strong(s, kBusy,
- std::memory_order_acquire))
+ if (s != kReady || !e->state.compare_exchange_strong(
+ s, kBusy, std::memory_order_acquire))
continue;
start = mid;
} else {
// Note: no need to store temporal kBusy, we exclusively own these
// elements.
- eigen_assert(s == kReady);
+ eigen_plain_assert(s == kReady);
}
result->push_back(std::move(e->w));
e->state.store(kEmpty, std::memory_order_release);
@@ -152,30 +147,18 @@ class RunQueue {
// Size returns current queue size.
// Can be called by any thread at any time.
- unsigned Size() const {
- // Emptiness plays critical role in thread pool blocking. So we go to great
- // effort to not produce false positives (claim non-empty queue as empty).
- for (;;) {
- // Capture a consistent snapshot of front/tail.
- unsigned front = front_.load(std::memory_order_acquire);
- unsigned back = back_.load(std::memory_order_acquire);
- unsigned front1 = front_.load(std::memory_order_relaxed);
- if (front != front1) continue;
- int size = (front & kMask2) - (back & kMask2);
- // Fix overflow.
- if (size < 0) size += 2 * kSize;
- // Order of modification in push/pop is crafted to make the queue look
- // larger than it is during concurrent modifications. E.g. pop can
- // decrement size before the corresponding push has incremented it.
- // So the computed size can be up to kSize + 1, fix it.
- if (size > static_cast<int>(kSize)) size = kSize;
- return size;
- }
- }
+ unsigned Size() const { return SizeOrNotEmpty<true>(); }
// Empty tests whether container is empty.
// Can be called by any thread at any time.
- bool Empty() const { return Size() == 0; }
+ bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
+
+ // Delete all the elements from the queue.
+ void Flush() {
+ while (!Empty()) {
+ PopFront();
+ }
+ }
private:
static const unsigned kMask = kSize - 1;
@@ -191,7 +174,7 @@ class RunQueue {
};
std::mutex mutex_;
// Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
- // front/back, repsectively. The remaining bits contain modification counters
+ // front/back, respectively. The remaining bits contain modification counters
// that are incremented on Push operations. This allows us to (1) distinguish
// between empty and full conditions (if we would use log(kSize) bits for
// position, these conditions would be indistinguishable); (2) obtain
@@ -201,6 +184,49 @@ class RunQueue {
std::atomic<unsigned> back_;
Elem array_[kSize];
+ // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
+ // only whether the size is 0 is guaranteed to be correct.
+ // Can be called by any thread at any time.
+ template<bool NeedSizeEstimate>
+ unsigned SizeOrNotEmpty() const {
+ // Emptiness plays critical role in thread pool blocking. So we go to great
+ // effort to not produce false positives (claim non-empty queue as empty).
+ unsigned front = front_.load(std::memory_order_acquire);
+ for (;;) {
+ // Capture a consistent snapshot of front/tail.
+ unsigned back = back_.load(std::memory_order_acquire);
+ unsigned front1 = front_.load(std::memory_order_relaxed);
+ if (front != front1) {
+ front = front1;
+ std::atomic_thread_fence(std::memory_order_acquire);
+ continue;
+ }
+ if (NeedSizeEstimate) {
+ return CalculateSize(front, back);
+ } else {
+ // This value will be 0 if the queue is empty, and undefined otherwise.
+ unsigned maybe_zero = ((front ^ back) & kMask2);
+ // Queue size estimate must agree with maybe zero check on the queue
+ // empty/non-empty state.
+ eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
+ return maybe_zero;
+ }
+ }
+ }
+
+ EIGEN_ALWAYS_INLINE
+ unsigned CalculateSize(unsigned front, unsigned back) const {
+ int size = (front & kMask2) - (back & kMask2);
+ // Fix overflow.
+ if (size < 0) size += 2 * kSize;
+ // Order of modification in push/pop is crafted to make the queue look
+ // larger than it is during concurrent modifications. E.g. push can
+ // increment size before the corresponding pop has decremented it.
+ // So the computed size can be up to kSize + 1, fix it.
+ if (size > static_cast<int>(kSize)) size = kSize;
+ return static_cast<unsigned>(size);
+ }
+
RunQueue(const RunQueue&) = delete;
void operator=(const RunQueue&) = delete;
};