diff options
author | Jakub Kotur <qtr@google.com> | 2021-03-16 20:22:30 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-03-16 20:22:30 +0000 |
commit | 4c0b8e0ef78bcd80a4facc34c6a2158d83f96802 (patch) | |
tree | 13641ab7afc7aa43b586606c18d53084dedf7ace /src/iter/par_bridge.rs | |
parent | a679e9b8b7e4ae27a19b81f216e375ea8a9cdb8e (diff) | |
parent | 7522a9ba008ac88224e0990932bdd298a43a2336 (diff) | |
download | rayon-4c0b8e0ef78bcd80a4facc34c6a2158d83f96802.tar.gz |
Initial import of rayon-1.5.0. am: 041839ceab am: d836dd6404 am: 7522a9ba00
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/rayon/+/1622436
Change-Id: I94b0e06d9da131d6f98db436ad1a178dfff2ec90
Diffstat (limited to 'src/iter/par_bridge.rs')
-rw-r--r-- | src/iter/par_bridge.rs | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs new file mode 100644 index 0000000..4c2b96e --- /dev/null +++ b/src/iter/par_bridge.rs @@ -0,0 +1,201 @@ +use crossbeam_deque::{Steal, Stealer, Worker}; + +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Mutex, TryLockError}; +use std::thread::yield_now; + +use crate::current_num_threads; +use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer}; +use crate::iter::ParallelIterator; + +/// Conversion trait to convert an `Iterator` to a `ParallelIterator`. +/// +/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items +/// across the Rayon thread pool. This has the advantage of being able to parallelize just about +/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with +/// `par_iter` instead. However, it can still be useful for iterators that are difficult to +/// parallelize by other means, like channels or file or network I/O. +/// +/// The resulting iterator is not guaranteed to keep the order of the original iterator. +/// +/// # Examples +/// +/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can +/// use any of the `ParallelIterator` methods: +/// +/// ``` +/// use rayon::iter::ParallelBridge; +/// use rayon::prelude::ParallelIterator; +/// use std::sync::mpsc::channel; +/// +/// let rx = { +/// let (tx, rx) = channel(); +/// +/// tx.send("one!"); +/// tx.send("two!"); +/// tx.send("three!"); +/// +/// rx +/// }; +/// +/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); +/// output.sort_unstable(); +/// +/// assert_eq!(&*output, &["one!", "three!", "two!"]); +/// ``` +pub trait ParallelBridge: Sized { + /// Creates a bridge from this type to a `ParallelIterator`. + fn par_bridge(self) -> IterBridge<Self>; +} + +impl<T: Iterator + Send> ParallelBridge for T +where + T::Item: Send, +{ + fn par_bridge(self) -> IterBridge<Self> { + IterBridge { iter: self } + } +} + +/// `IterBridge` is a parallel iterator that wraps a sequential iterator. +/// +/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the +/// [`ParallelBridge`] documentation for details. +/// +/// [`ParallelBridge`]: trait.ParallelBridge.html +#[derive(Debug, Clone)] +pub struct IterBridge<Iter> { + iter: Iter, +} + +impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter> +where + Iter::Item: Send, +{ + type Item = Iter::Item; + + fn drive_unindexed<C>(self, consumer: C) -> C::Result + where + C: UnindexedConsumer<Self::Item>, + { + let split_count = AtomicUsize::new(current_num_threads()); + let worker = Worker::new_fifo(); + let stealer = worker.stealer(); + let done = AtomicBool::new(false); + let iter = Mutex::new((self.iter, worker)); + + bridge_unindexed( + IterParallelProducer { + split_count: &split_count, + done: &done, + iter: &iter, + items: stealer, + }, + consumer, + ) + } +} + +struct IterParallelProducer<'a, Iter: Iterator> { + split_count: &'a AtomicUsize, + done: &'a AtomicBool, + iter: &'a Mutex<(Iter, Worker<Iter::Item>)>, + items: Stealer<Iter::Item>, +} + +// manual clone because T doesn't need to be Clone, but the derive assumes it should be +impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { + fn clone(&self) -> Self { + IterParallelProducer { + split_count: self.split_count, + done: self.done, + iter: self.iter, + items: self.items.clone(), + } + } +} + +impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> +where + Iter::Item: Send, +{ + type Item = Iter::Item; + + fn split(self) -> (Self, Option<Self>) { + let mut count = self.split_count.load(Ordering::SeqCst); + + loop { + let done = self.done.load(Ordering::SeqCst); + match count.checked_sub(1) { + Some(new_count) if !done => { + let last_count = + self.split_count + .compare_and_swap(count, new_count, Ordering::SeqCst); + if last_count == count { + return (self.clone(), Some(self)); + } else { + count = last_count; + } + } + _ => { + return (self, None); + } + } + } + } + + fn fold_with<F>(self, mut folder: F) -> F + where + F: Folder<Self::Item>, + { + loop { + match self.items.steal() { + Steal::Success(it) => { + folder = folder.consume(it); + if folder.full() { + return folder; + } + } + Steal::Empty => { + if self.done.load(Ordering::SeqCst) { + // the iterator is out of items, no use in continuing + return folder; + } else { + // our cache is out of items, time to load more from the iterator + match self.iter.try_lock() { + Ok(mut guard) => { + let count = current_num_threads(); + let count = (count * count) * 2; + + let (ref mut iter, ref worker) = *guard; + + // while worker.len() < count { + // FIXME the new deque doesn't let us count items. We can just + // push a number of items, but that doesn't consider active + // stealers elsewhere. + for _ in 0..count { + if let Some(it) = iter.next() { + worker.push(it); + } else { + self.done.store(true, Ordering::SeqCst); + break; + } + } + } + Err(TryLockError::WouldBlock) => { + // someone else has the mutex, just sit tight until it's ready + yield_now(); //TODO: use a thread=pool-aware yield? (#548) + } + Err(TryLockError::Poisoned(_)) => { + // any panics from other threads will have been caught by the pool, + // and will be re-thrown when joined - just exit + return folder; + } + } + } + } + Steal::Retry => (), + } + } + } +} |