use super::plumbing::*; use super::*; use std::cell::Cell; use std::sync::atomic::{AtomicUsize, Ordering}; #[cfg(test)] mod test; // The key optimization for find_first is that a consumer can stop its search if // some consumer to its left already found a match (and similarly for consumers // to the right for find_last). To make this work, all consumers need some // notion of their position in the data relative to other consumers, including // unindexed consumers that have no built-in notion of position. // // To solve this, we assign each consumer a lower and upper bound for an // imaginary "range" of data that it consumes. The initial consumer starts with // the range 0..usize::max_value(). The split divides this range in half so that // one resulting consumer has the range 0..(usize::max_value() / 2), and the // other has (usize::max_value() / 2)..usize::max_value(). Every subsequent // split divides the range in half again until it cannot be split anymore // (i.e. its length is 1), in which case the split returns two consumers with // the same range. In that case both consumers will continue to consume all // their data regardless of whether a better match is found, but the reducer // will still return the correct answer. #[derive(Copy, Clone)] enum MatchPosition { Leftmost, Rightmost, } /// Returns true if pos1 is a better match than pos2 according to MatchPosition #[inline] fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool { match mp { MatchPosition::Leftmost => pos1 < pos2, MatchPosition::Rightmost => pos1 > pos2, } } pub(super) fn find_first(pi: I, find_op: P) -> Option where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync, { let best_found = AtomicUsize::new(usize::max_value()); let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found); pi.drive_unindexed(consumer) } pub(super) fn find_last(pi: I, find_op: P) -> Option where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync, { let best_found = AtomicUsize::new(0); let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found); pi.drive_unindexed(consumer) } struct FindConsumer<'p, P> { find_op: &'p P, lower_bound: Cell, upper_bound: usize, match_position: MatchPosition, best_found: &'p AtomicUsize, } impl<'p, P> FindConsumer<'p, P> { fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self { FindConsumer { find_op, lower_bound: Cell::new(0), upper_bound: usize::max_value(), match_position, best_found, } } fn current_index(&self) -> usize { match self.match_position { MatchPosition::Leftmost => self.lower_bound.get(), MatchPosition::Rightmost => self.upper_bound, } } } impl<'p, T, P> Consumer for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync, { type Folder = FindFolder<'p, T, P>; type Reducer = FindReducer; type Result = Option; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { let dir = self.match_position; ( self.split_off_left(), self, FindReducer { match_position: dir, }, ) } fn into_folder(self) -> Self::Folder { FindFolder { find_op: self.find_op, boundary: self.current_index(), match_position: self.match_position, best_found: self.best_found, item: None, } } fn full(&self) -> bool { // can stop consuming if the best found index so far is *strictly* // better than anything this consumer will find better_position( self.best_found.load(Ordering::Relaxed), self.current_index(), self.match_position, ) } } impl<'p, T, P> UnindexedConsumer for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync, { fn split_off_left(&self) -> Self { // Upper bound for one consumer will be lower bound for the other. This // overlap is okay, because only one of the bounds will be used for // comparing against best_found; the other is kept only to be able to // divide the range in half. // // When the resolution of usize has been exhausted (i.e. when // upper_bound = lower_bound), both results of this split will have the // same range. When that happens, we lose the ability to tell one // consumer to stop working when the other finds a better match, but the // reducer ensures that the best answer is still returned (see the test // above). let old_lower_bound = self.lower_bound.get(); let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2); self.lower_bound.set(median); FindConsumer { find_op: self.find_op, lower_bound: Cell::new(old_lower_bound), upper_bound: median, match_position: self.match_position, best_found: self.best_found, } } fn to_reducer(&self) -> Self::Reducer { FindReducer { match_position: self.match_position, } } } struct FindFolder<'p, T, P> { find_op: &'p P, boundary: usize, match_position: MatchPosition, best_found: &'p AtomicUsize, item: Option, } impl<'p, P: 'p + Fn(&T) -> bool, T> Folder for FindFolder<'p, T, P> { type Result = Option; fn consume(mut self, item: T) -> Self { let found_best_in_range = match self.match_position { MatchPosition::Leftmost => self.item.is_some(), MatchPosition::Rightmost => false, }; if !found_best_in_range && (self.find_op)(&item) { // Continuously try to set best_found until we succeed or we // discover a better match was already found. let mut current = self.best_found.load(Ordering::Relaxed); loop { if better_position(current, self.boundary, self.match_position) { break; } match self.best_found.compare_exchange_weak( current, self.boundary, Ordering::Relaxed, Ordering::Relaxed, ) { Ok(_) => { self.item = Some(item); break; } Err(v) => current = v, } } } self } fn complete(self) -> Self::Result { self.item } fn full(&self) -> bool { let found_best_in_range = match self.match_position { MatchPosition::Leftmost => self.item.is_some(), MatchPosition::Rightmost => false, }; found_best_in_range || better_position( self.best_found.load(Ordering::Relaxed), self.boundary, self.match_position, ) } } struct FindReducer { match_position: MatchPosition, } impl Reducer> for FindReducer { fn reduce(self, left: Option, right: Option) -> Option { match self.match_position { MatchPosition::Leftmost => left.or(right), MatchPosition::Rightmost => right.or(left), } } }