use super::plumbing::*; use super::*; /// This trait abstracts the different ways we can "unzip" one parallel /// iterator into two distinct consumers, which we can handle almost /// identically apart from how to process the individual items. trait UnzipOp: Sync + Send { /// The type of item expected by the left consumer. type Left: Send; /// The type of item expected by the right consumer. type Right: Send; /// Consumes one item and feeds it to one or both of the underlying folders. fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder, FB: Folder; /// Reports whether this op may support indexed consumers. /// - e.g. true for `unzip` where the item count passed through directly. /// - e.g. false for `partition` where the sorting is not yet known. fn indexable() -> bool { false } } /// Runs an unzip-like operation into default `ParallelExtend` collections. fn execute(pi: I, op: OP) -> (FromA, FromB) where I: ParallelIterator, OP: UnzipOp, FromA: Default + Send + ParallelExtend, FromB: Default + Send + ParallelExtend, { let mut a = FromA::default(); let mut b = FromB::default(); execute_into(&mut a, &mut b, pi, op); (a, b) } /// Runs an unzip-like operation into `ParallelExtend` collections. fn execute_into(a: &mut FromA, b: &mut FromB, pi: I, op: OP) where I: ParallelIterator, OP: UnzipOp, FromA: Send + ParallelExtend, FromB: Send + ParallelExtend, { // We have no idea what the consumers will look like for these // collections' `par_extend`, but we can intercept them in our own // `drive_unindexed`. Start with the left side, type `A`: let iter = UnzipA { base: pi, op, b }; a.par_extend(iter); } /// Unzips the items of a parallel iterator into a pair of arbitrary /// `ParallelExtend` containers. /// /// This is called by `ParallelIterator::unzip`. pub(super) fn unzip(pi: I) -> (FromA, FromB) where I: ParallelIterator, FromA: Default + Send + ParallelExtend, FromB: Default + Send + ParallelExtend, A: Send, B: Send, { execute(pi, Unzip) } /// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s. /// /// This is called by `super::collect::unzip_into_vecs`. pub(super) fn unzip_indexed(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result) where I: IndexedParallelIterator, CA: Consumer, CB: Consumer, A: Send, B: Send, { let consumer = UnzipConsumer { op: &Unzip, left, right, }; pi.drive(consumer) } /// An `UnzipOp` that splits a tuple directly into the two consumers. struct Unzip; impl UnzipOp<(A, B)> for Unzip { type Left = A; type Right = B; fn consume(&self, item: (A, B), left: FA, right: FB) -> (FA, FB) where FA: Folder, FB: Folder, { (left.consume(item.0), right.consume(item.1)) } fn indexable() -> bool { true } } /// Partitions the items of a parallel iterator into a pair of arbitrary /// `ParallelExtend` containers. /// /// This is called by `ParallelIterator::partition`. pub(super) fn partition(pi: I, predicate: P) -> (A, B) where I: ParallelIterator, A: Default + Send + ParallelExtend, B: Default + Send + ParallelExtend, P: Fn(&I::Item) -> bool + Sync + Send, { execute(pi, Partition { predicate }) } /// An `UnzipOp` that routes items depending on a predicate function. struct Partition

{ predicate: P, } impl UnzipOp for Partition

where P: Fn(&T) -> bool + Sync + Send, T: Send, { type Left = T; type Right = T; fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder, FB: Folder, { if (self.predicate)(&item) { (left.consume(item), right) } else { (left, right.consume(item)) } } } /// Partitions and maps the items of a parallel iterator into a pair of /// arbitrary `ParallelExtend` containers. /// /// This called by `ParallelIterator::partition_map`. pub(super) fn partition_map(pi: I, predicate: P) -> (A, B) where I: ParallelIterator, A: Default + Send + ParallelExtend, B: Default + Send + ParallelExtend, P: Fn(I::Item) -> Either + Sync + Send, L: Send, R: Send, { execute(pi, PartitionMap { predicate }) } /// An `UnzipOp` that routes items depending on how they are mapped `Either`. struct PartitionMap

{ predicate: P, } impl UnzipOp for PartitionMap

where P: Fn(T) -> Either + Sync + Send, L: Send, R: Send, { type Left = L; type Right = R; fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder, FB: Folder, { match (self.predicate)(item) { Either::Left(item) => (left.consume(item), right), Either::Right(item) => (left, right.consume(item)), } } } /// A fake iterator to intercept the `Consumer` for type `A`. struct UnzipA<'b, I, OP, FromB> { base: I, op: OP, b: &'b mut FromB, } impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB> where I: ParallelIterator, OP: UnzipOp, FromB: Send + ParallelExtend, { type Item = OP::Left; fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let mut result = None; { // Now it's time to find the consumer for type `B` let iter = UnzipB { base: self.base, op: self.op, left_consumer: consumer, left_result: &mut result, }; self.b.par_extend(iter); } // NB: If for some reason `b.par_extend` doesn't actually drive the // iterator, then we won't have a result for the left side to return // at all. We can't fake an arbitrary consumer's result, so panic. result.expect("unzip consumers didn't execute!") } fn opt_len(&self) -> Option { if OP::indexable() { self.base.opt_len() } else { None } } } /// A fake iterator to intercept the `Consumer` for type `B`. struct UnzipB<'r, I, OP, CA> where I: ParallelIterator, OP: UnzipOp, CA: UnindexedConsumer, CA::Result: 'r, { base: I, op: OP, left_consumer: CA, left_result: &'r mut Option, } impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA> where I: ParallelIterator, OP: UnzipOp, CA: UnindexedConsumer, { type Item = OP::Right; fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { // Now that we have two consumers, we can unzip the real iterator. let consumer = UnzipConsumer { op: &self.op, left: self.left_consumer, right: consumer, }; let result = self.base.drive_unindexed(consumer); *self.left_result = Some(result.0); result.1 } fn opt_len(&self) -> Option { if OP::indexable() { self.base.opt_len() } else { None } } } /// `Consumer` that unzips into two other `Consumer`s struct UnzipConsumer<'a, OP, CA, CB> { op: &'a OP, left: CA, right: CB, } impl<'a, T, OP, CA, CB> Consumer for UnzipConsumer<'a, OP, CA, CB> where OP: UnzipOp, CA: Consumer, CB: Consumer, { type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>; type Reducer = UnzipReducer; type Result = (CA::Result, CB::Result); fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { let (left1, left2, left_reducer) = self.left.split_at(index); let (right1, right2, right_reducer) = self.right.split_at(index); ( UnzipConsumer { op: self.op, left: left1, right: right1, }, UnzipConsumer { op: self.op, left: left2, right: right2, }, UnzipReducer { left: left_reducer, right: right_reducer, }, ) } fn into_folder(self) -> Self::Folder { UnzipFolder { op: self.op, left: self.left.into_folder(), right: self.right.into_folder(), } } fn full(&self) -> bool { // don't stop until everyone is full self.left.full() && self.right.full() } } impl<'a, T, OP, CA, CB> UnindexedConsumer for UnzipConsumer<'a, OP, CA, CB> where OP: UnzipOp, CA: UnindexedConsumer, CB: UnindexedConsumer, { fn split_off_left(&self) -> Self { UnzipConsumer { op: self.op, left: self.left.split_off_left(), right: self.right.split_off_left(), } } fn to_reducer(&self) -> Self::Reducer { UnzipReducer { left: self.left.to_reducer(), right: self.right.to_reducer(), } } } /// `Folder` that unzips into two other `Folder`s struct UnzipFolder<'a, OP, FA, FB> { op: &'a OP, left: FA, right: FB, } impl<'a, T, OP, FA, FB> Folder for UnzipFolder<'a, OP, FA, FB> where OP: UnzipOp, FA: Folder, FB: Folder, { type Result = (FA::Result, FB::Result); fn consume(self, item: T) -> Self { let (left, right) = self.op.consume(item, self.left, self.right); UnzipFolder { op: self.op, left, right, } } fn complete(self) -> Self::Result { (self.left.complete(), self.right.complete()) } fn full(&self) -> bool { // don't stop until everyone is full self.left.full() && self.right.full() } } /// `Reducer` that unzips into two other `Reducer`s struct UnzipReducer { left: RA, right: RB, } impl Reducer<(A, B)> for UnzipReducer where RA: Reducer, RB: Reducer, { fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) { ( self.left.reduce(left.0, right.0), self.right.reduce(left.1, right.1), ) } } impl ParallelExtend<(A, B)> for (FromA, FromB) where A: Send, B: Send, FromA: Send + ParallelExtend, FromB: Send + ParallelExtend, { fn par_extend(&mut self, pi: I) where I: IntoParallelIterator, { execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip); } } impl ParallelExtend> for (A, B) where L: Send, R: Send, A: Send + ParallelExtend, B: Send + ParallelExtend, { fn par_extend(&mut self, pi: I) where I: IntoParallelIterator>, { execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither); } } /// An `UnzipOp` that routes items depending on their `Either` variant. struct UnEither; impl UnzipOp> for UnEither where L: Send, R: Send, { type Left = L; type Right = R; fn consume(&self, item: Either, left: FL, right: FR) -> (FL, FR) where FL: Folder, FR: Folder, { match item { Either::Left(item) => (left.consume(item), right), Either::Right(item) => (left, right.consume(item)), } } } impl FromParallelIterator<(A, B)> for (FromA, FromB) where A: Send, B: Send, FromA: Send + FromParallelIterator, FromB: Send + FromParallelIterator, { fn from_par_iter(pi: I) -> Self where I: IntoParallelIterator, { let (a, b): (Collector, Collector) = pi.into_par_iter().unzip(); (a.result.unwrap(), b.result.unwrap()) } } impl FromParallelIterator> for (A, B) where L: Send, R: Send, A: Send + FromParallelIterator, B: Send + FromParallelIterator, { fn from_par_iter(pi: I) -> Self where I: IntoParallelIterator>, { fn identity(x: T) -> T { x } let (a, b): (Collector, Collector) = pi.into_par_iter().partition_map(identity); (a.result.unwrap(), b.result.unwrap()) } } /// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`. struct Collector { result: Option, } impl Default for Collector { fn default() -> Self { Collector { result: None } } } impl ParallelExtend for Collector where T: Send, FromT: Send + FromParallelIterator, { fn par_extend(&mut self, pi: I) where I: IntoParallelIterator, { debug_assert!(self.result.is_none()); self.result = Some(pi.into_par_iter().collect()); } }