//! Traits and functions used to implement parallel iteration. These are //! low-level details -- users of parallel iterators should not need to //! interact with them directly. See [the `plumbing` README][r] for a general overview. //! //! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md use crate::join_context; use super::IndexedParallelIterator; use std::cmp; use std::usize; /// The `ProducerCallback` trait is a kind of generic closure, /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in /// the plumbing README][r] for more details. /// /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html pub trait ProducerCallback { /// The type of value returned by this callback. Analogous to /// [`Output` from the `FnOnce` trait][Output]. /// /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output type Output; /// Invokes the callback with the given producer as argument. The /// key point of this trait is that this method is generic over /// `P`, and hence implementors must be defined for any producer. fn callback

(self, producer: P) -> Self::Output where P: Producer; } /// A `Producer` is effectively a "splittable `IntoIterator`". That /// is, a producer is a value which can be converted into an iterator /// at any time: at that point, it simply produces items on demand, /// like any iterator. But what makes a `Producer` special is that, /// *before* we convert to an iterator, we can also **split** it at a /// particular point using the `split_at` method. This will yield up /// two producers, one producing the items before that point, and one /// producing the items after that point (these two producers can then /// independently be split further, or be converted into iterators). /// In Rayon, this splitting is used to divide between threads. /// See [the `plumbing` README][r] for further details. /// /// Note that each producer will always produce a fixed number of /// items N. However, this number N is not queryable through the API; /// the consumer is expected to track it. /// /// NB. You might expect `Producer` to extend the `IntoIterator` /// trait. However, [rust-lang/rust#20671][20671] prevents us from /// declaring the DoubleEndedIterator and ExactSizeIterator /// constraints on a required IntoIterator trait, so we inline /// IntoIterator here until that issue is fixed. /// /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md /// [20671]: https://github.com/rust-lang/rust/issues/20671 pub trait Producer: Send + Sized { /// The type of item that will be produced by this producer once /// it is converted into an iterator. type Item; /// The type of iterator we will become. type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; /// Convert `self` into an iterator; at this point, no more parallel splits /// are possible. fn into_iter(self) -> Self::IntoIter; /// The minimum number of items that we will process /// sequentially. Defaults to 1, which means that we will split /// all the way down to a single item. This can be raised higher /// using the [`with_min_len`] method, which will force us to /// create sequential tasks at a larger granularity. Note that /// Rayon automatically normally attempts to adjust the size of /// parallel splits to reduce overhead, so this should not be /// needed. /// /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len fn min_len(&self) -> usize { 1 } /// The maximum number of items that we will process /// sequentially. Defaults to MAX, which means that we can choose /// not to split at all. This can be lowered using the /// [`with_max_len`] method, which will force us to create more /// parallel tasks. Note that Rayon automatically normally /// attempts to adjust the size of parallel splits to reduce /// overhead, so this should not be needed. /// /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len fn max_len(&self) -> usize { usize::MAX } /// Split into two producers; one produces items `0..index`, the /// other `index..N`. Index must be less than or equal to `N`. fn split_at(self, index: usize) -> (Self, Self); /// Iterate the producer, feeding each element to `folder`, and /// stop when the folder is full (or all elements have been consumed). /// /// The provided implementation is sufficient for most iterables. fn fold_with(self, folder: F) -> F where F: Folder, { folder.consume_iter(self.into_iter()) } } /// A consumer is effectively a [generalized "fold" operation][fold], /// and in fact each consumer will eventually be converted into a /// [`Folder`]. What makes a consumer special is that, like a /// [`Producer`], it can be **split** into multiple consumers using /// the `split_at` method. When a consumer is split, it produces two /// consumers, as well as a **reducer**. The two consumers can be fed /// items independently, and when they are done the reducer is used to /// combine their two results into one. See [the `plumbing` /// README][r] for further details. /// /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold /// [`Folder`]: trait.Folder.html /// [`Producer`]: trait.Producer.html pub trait Consumer: Send + Sized { /// The type of folder that this consumer can be converted into. type Folder: Folder; /// The type of reducer that is produced if this consumer is split. type Reducer: Reducer; /// The type of result that this consumer will ultimately produce. type Result: Send; /// Divide the consumer into two consumers, one processing items /// `0..index` and one processing items from `index..`. Also /// produces a reducer that can be used to reduce the results at /// the end. fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); /// Convert the consumer into a folder that can consume items /// sequentially, eventually producing a final result. fn into_folder(self) -> Self::Folder; /// Hint whether this `Consumer` would like to stop processing /// further items, e.g. if a search has been completed. fn full(&self) -> bool; } /// The `Folder` trait encapsulates [the standard fold /// operation][fold]. It can be fed many items using the `consume` /// method. At the end, once all items have been consumed, it can then /// be converted (using `complete`) into a final value. /// /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold pub trait Folder: Sized { /// The type of result that will ultimately be produced by the folder. type Result; /// Consume next item and return new sequential state. fn consume(self, item: Item) -> Self; /// Consume items from the iterator until full, and return new sequential state. /// /// This method is **optional**. The default simply iterates over /// `iter`, invoking `consume` and checking after each iteration /// whether `full` returns false. /// /// The main reason to override it is if you can provide a more /// specialized, efficient implementation. fn consume_iter(mut self, iter: I) -> Self where I: IntoIterator, { for item in iter { self = self.consume(item); if self.full() { break; } } self } /// Finish consuming items, produce final result. fn complete(self) -> Self::Result; /// Hint whether this `Folder` would like to stop processing /// further items, e.g. if a search has been completed. fn full(&self) -> bool; } /// The reducer is the final step of a `Consumer` -- after a consumer /// has been split into two parts, and each of those parts has been /// fully processed, we are left with two results. The reducer is then /// used to combine those two results into one. See [the `plumbing` /// README][r] for further details. /// /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md pub trait Reducer { /// Reduce two final results into one; this is executed after a /// split. fn reduce(self, left: Result, right: Result) -> Result; } /// A stateless consumer can be freely copied. These consumers can be /// used like regular consumers, but they also support a /// `split_off_left` method that does not take an index to split, but /// simply splits at some arbitrary point (`for_each`, for example, /// produces an unindexed consumer). pub trait UnindexedConsumer: Consumer { /// Splits off a "left" consumer and returns it. The `self` /// consumer should then be used to consume the "right" portion of /// the data. (The ordering matters for methods like find_first -- /// values produced by the returned value are given precedence /// over values produced by `self`.) Once the left and right /// halves have been fully consumed, you should reduce the results /// with the result of `to_reducer`. fn split_off_left(&self) -> Self; /// Creates a reducer that can be used to combine the results from /// a split consumer. fn to_reducer(&self) -> Self::Reducer; } /// A variant on `Producer` which does not know its exact length or /// cannot represent it in a `usize`. These producers act like /// ordinary producers except that they cannot be told to split at a /// particular point. Instead, you just ask them to split 'somewhere'. /// /// (In principle, `Producer` could extend this trait; however, it /// does not because to do so would require producers to carry their /// own length with them.) pub trait UnindexedProducer: Send + Sized { /// The type of item returned by this producer. type Item; /// Split midway into a new producer if possible, otherwise return `None`. fn split(self) -> (Self, Option); /// Iterate the producer, feeding each element to `folder`, and /// stop when the folder is full (or all elements have been consumed). fn fold_with(self, folder: F) -> F where F: Folder; } /// A splitter controls the policy for splitting into smaller work items. /// /// Thief-splitting is an adaptive policy that starts by splitting into /// enough jobs for every worker thread, and then resets itself whenever a /// job is actually stolen into a different thread. #[derive(Clone, Copy)] struct Splitter { /// The `splits` tell us approximately how many remaining times we'd /// like to split this job. We always just divide it by two though, so /// the effective number of pieces will be `next_power_of_two()`. splits: usize, } impl Splitter { #[inline] fn new() -> Splitter { Splitter { splits: crate::current_num_threads(), } } #[inline] fn try_split(&mut self, stolen: bool) -> bool { let Splitter { splits } = *self; if stolen { // This job was stolen! Reset the number of desired splits to the // thread count, if that's more than we had remaining anyway. self.splits = cmp::max(crate::current_num_threads(), self.splits / 2); true } else if splits > 0 { // We have splits remaining, make it so. self.splits /= 2; true } else { // Not stolen, and no more splits -- we're done! false } } } /// The length splitter is built on thief-splitting, but additionally takes /// into account the remaining length of the iterator. #[derive(Clone, Copy)] struct LengthSplitter { inner: Splitter, /// The smallest we're willing to divide into. Usually this is just 1, /// but you can choose a larger working size with `with_min_len()`. min: usize, } impl LengthSplitter { /// Creates a new splitter based on lengths. /// /// The `min` is a hard lower bound. We'll never split below that, but /// of course an iterator might start out smaller already. /// /// The `max` is an upper bound on the working size, used to determine /// the minimum number of times we need to split to get under that limit. /// The adaptive algorithm may very well split even further, but never /// smaller than the `min`. #[inline] fn new(min: usize, max: usize, len: usize) -> LengthSplitter { let mut splitter = LengthSplitter { inner: Splitter::new(), min: cmp::max(min, 1), }; // Divide the given length by the max working length to get the minimum // number of splits we need to get under that max. This rounds down, // but the splitter actually gives `next_power_of_two()` pieces anyway. // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. let min_splits = len / cmp::max(max, 1); // Only update the value if it's not splitting enough already. if min_splits > splitter.inner.splits { splitter.inner.splits = min_splits; } splitter } #[inline] fn try_split(&mut self, len: usize, stolen: bool) -> bool { // If splitting wouldn't make us too small, try the inner splitter. len / 2 >= self.min && self.inner.try_split(stolen) } } /// This helper function is used to "connect" a parallel iterator to a /// consumer. It will convert the `par_iter` into a producer P and /// then pull items from P and feed them to `consumer`, splitting and /// creating parallel threads as needed. /// /// This is useful when you are implementing your own parallel /// iterators: it is often used as the definition of the /// [`drive_unindexed`] or [`drive`] methods. /// /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive pub fn bridge(par_iter: I, consumer: C) -> C::Result where I: IndexedParallelIterator, C: Consumer, { let len = par_iter.len(); return par_iter.with_producer(Callback { len, consumer }); struct Callback { len: usize, consumer: C, } impl ProducerCallback for Callback where C: Consumer, { type Output = C::Result; fn callback

(self, producer: P) -> C::Result where P: Producer, { bridge_producer_consumer(self.len, producer, self.consumer) } } } /// This helper function is used to "connect" a producer and a /// consumer. You may prefer to call [`bridge`], which wraps this /// function. This function will draw items from `producer` and feed /// them to `consumer`, splitting and creating parallel tasks when /// needed. /// /// This is useful when you are implementing your own parallel /// iterators: it is often used as the definition of the /// [`drive_unindexed`] or [`drive`] methods. /// /// [`bridge`]: fn.bridge.html /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive pub fn bridge_producer_consumer(len: usize, producer: P, consumer: C) -> C::Result where P: Producer, C: Consumer, { let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); return helper(len, false, splitter, producer, consumer); fn helper( len: usize, migrated: bool, mut splitter: LengthSplitter, producer: P, consumer: C, ) -> C::Result where P: Producer, C: Consumer, { if consumer.full() { consumer.into_folder().complete() } else if splitter.try_split(len, migrated) { let mid = len / 2; let (left_producer, right_producer) = producer.split_at(mid); let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); let (left_result, right_result) = join_context( |context| { helper( mid, context.migrated(), splitter, left_producer, left_consumer, ) }, |context| { helper( len - mid, context.migrated(), splitter, right_producer, right_consumer, ) }, ); reducer.reduce(left_result, right_result) } else { producer.fold_with(consumer.into_folder()).complete() } } } /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer. /// /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html pub fn bridge_unindexed(producer: P, consumer: C) -> C::Result where P: UnindexedProducer, C: UnindexedConsumer, { let splitter = Splitter::new(); bridge_unindexed_producer_consumer(false, splitter, producer, consumer) } fn bridge_unindexed_producer_consumer( migrated: bool, mut splitter: Splitter, producer: P, consumer: C, ) -> C::Result where P: UnindexedProducer, C: UnindexedConsumer, { if consumer.full() { consumer.into_folder().complete() } else if splitter.try_split(migrated) { match producer.split() { (left_producer, Some(right_producer)) => { let (reducer, left_consumer, right_consumer) = (consumer.to_reducer(), consumer.split_off_left(), consumer); let bridge = bridge_unindexed_producer_consumer; let (left_result, right_result) = join_context( |context| bridge(context.migrated(), splitter, left_producer, left_consumer), |context| bridge(context.migrated(), splitter, right_producer, right_consumer), ); reducer.reduce(left_result, right_result) } (producer, None) => producer.fold_with(consumer.into_folder()).complete(), } } else { producer.fold_with(consumer.into_folder()).complete() } }