aboutsummaryrefslogtreecommitdiff
path: root/src/iter/par_bridge.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-05-19 16:31:56 -0700
committerJoel Galenson <jgalenson@google.com>2021-05-19 16:31:56 -0700
commitdd2305def6fff1d9f1149ecb73b02e37eb8a18b0 (patch)
treea915bf2398b238dbd70ae338a6bb643a7372de84 /src/iter/par_bridge.rs
parent698313e1f73ce48237108f63ba85b3000f9d0ff1 (diff)
downloadrayon-dd2305def6fff1d9f1149ecb73b02e37eb8a18b0.tar.gz
Upgrade rust/crates/rayon to 1.5.1
Test: make Change-Id: I40c1a4538832871d1f4cd09daf6904d094b5615e
Diffstat (limited to 'src/iter/par_bridge.rs')
-rw-r--r--src/iter/par_bridge.rs38
1 files changed, 27 insertions, 11 deletions
diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs
index 4c2b96e..339ac1a 100644
--- a/src/iter/par_bridge.rs
+++ b/src/iter/par_bridge.rs
@@ -125,16 +125,19 @@ where
let mut count = self.split_count.load(Ordering::SeqCst);
loop {
- let done = self.done.load(Ordering::SeqCst);
+ // Check if the iterator is exhausted *and* we've consumed every item from it.
+ let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
+
match count.checked_sub(1) {
Some(new_count) if !done => {
- let last_count =
- self.split_count
- .compare_and_swap(count, new_count, Ordering::SeqCst);
- if last_count == count {
- return (self.clone(), Some(self));
- } else {
- count = last_count;
+ match self.split_count.compare_exchange_weak(
+ count,
+ new_count,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => return (self.clone(), Some(self)),
+ Err(last_count) => count = last_count,
}
}
_ => {
@@ -157,13 +160,26 @@ where
}
}
Steal::Empty => {
+ // Don't storm the mutex if we're already done.
if self.done.load(Ordering::SeqCst) {
- // the iterator is out of items, no use in continuing
- return folder;
+ // Someone might have pushed more between our `steal()` and `done.load()`
+ if self.items.is_empty() {
+ // The iterator is out of items, no use in continuing
+ return folder;
+ }
} else {
// our cache is out of items, time to load more from the iterator
match self.iter.try_lock() {
Ok(mut guard) => {
+ // Check `done` again in case we raced with the previous lock
+ // holder on its way out.
+ if self.done.load(Ordering::SeqCst) {
+ if self.items.is_empty() {
+ return folder;
+ }
+ continue;
+ }
+
let count = current_num_threads();
let count = (count * count) * 2;
@@ -184,7 +200,7 @@ where
}
Err(TryLockError::WouldBlock) => {
// someone else has the mutex, just sit tight until it's ready
- yield_now(); //TODO: use a thread=pool-aware yield? (#548)
+ yield_now(); //TODO: use a thread-pool-aware yield? (#548)
}
Err(TryLockError::Poisoned(_)) => {
// any panics from other threads will have been caught by the pool,