diff options
author | Joel Galenson <jgalenson@google.com> | 2021-05-19 16:31:56 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-05-19 16:31:56 -0700 |
commit | dd2305def6fff1d9f1149ecb73b02e37eb8a18b0 (patch) | |
tree | a915bf2398b238dbd70ae338a6bb643a7372de84 /src/iter/par_bridge.rs | |
parent | 698313e1f73ce48237108f63ba85b3000f9d0ff1 (diff) | |
download | rayon-dd2305def6fff1d9f1149ecb73b02e37eb8a18b0.tar.gz |
Upgrade rust/crates/rayon to 1.5.1
Test: make
Change-Id: I40c1a4538832871d1f4cd09daf6904d094b5615e
Diffstat (limited to 'src/iter/par_bridge.rs')
-rw-r--r-- | src/iter/par_bridge.rs | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index 4c2b96e..339ac1a 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -125,16 +125,19 @@ where let mut count = self.split_count.load(Ordering::SeqCst); loop { - let done = self.done.load(Ordering::SeqCst); + // Check if the iterator is exhausted *and* we've consumed every item from it. + let done = self.done.load(Ordering::SeqCst) && self.items.is_empty(); + match count.checked_sub(1) { Some(new_count) if !done => { - let last_count = - self.split_count - .compare_and_swap(count, new_count, Ordering::SeqCst); - if last_count == count { - return (self.clone(), Some(self)); - } else { - count = last_count; + match self.split_count.compare_exchange_weak( + count, + new_count, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return (self.clone(), Some(self)), + Err(last_count) => count = last_count, } } _ => { @@ -157,13 +160,26 @@ where } } Steal::Empty => { + // Don't storm the mutex if we're already done. if self.done.load(Ordering::SeqCst) { - // the iterator is out of items, no use in continuing - return folder; + // Someone might have pushed more between our `steal()` and `done.load()` + if self.items.is_empty() { + // The iterator is out of items, no use in continuing + return folder; + } } else { // our cache is out of items, time to load more from the iterator match self.iter.try_lock() { Ok(mut guard) => { + // Check `done` again in case we raced with the previous lock + // holder on its way out. + if self.done.load(Ordering::SeqCst) { + if self.items.is_empty() { + return folder; + } + continue; + } + let count = current_num_threads(); let count = (count * count) * 2; @@ -184,7 +200,7 @@ where } Err(TryLockError::WouldBlock) => { // someone else has the mutex, just sit tight until it's ready - yield_now(); //TODO: use a thread=pool-aware yield? (#548) + yield_now(); //TODO: use a thread-pool-aware yield? (#548) } Err(TryLockError::Poisoned(_)) => { // any panics from other threads will have been caught by the pool, |