diff options
author | Jakub Kotur <qtr@google.com> | 2020-12-21 17:28:15 +0100 |
---|---|---|
committer | Jakub Kotur <qtr@google.com> | 2021-03-05 16:39:23 +0100 |
commit | 041839ceabbc67165512fde0d33c91347b758487 (patch) | |
tree | 13641ab7afc7aa43b586606c18d53084dedf7ace /src/iter/collect | |
parent | a679e9b8b7e4ae27a19b81f216e375ea8a9cdb8e (diff) | |
download | rayon-041839ceabbc67165512fde0d33c91347b758487.tar.gz |
Initial import of rayon-1.5.0.
Bug: 155309706
Change-Id: I6ff7de1cb89d093d7938abf78d586ed76da85b0d
Diffstat (limited to 'src/iter/collect')
-rw-r--r-- | src/iter/collect/consumer.rs | 159 | ||||
-rw-r--r-- | src/iter/collect/mod.rs | 171 | ||||
-rw-r--r-- | src/iter/collect/test.rs | 385 |
3 files changed, 715 insertions, 0 deletions
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs new file mode 100644 index 0000000..689f29c --- /dev/null +++ b/src/iter/collect/consumer.rs @@ -0,0 +1,159 @@ +use super::super::plumbing::*; +use std::marker::PhantomData; +use std::ptr; +use std::slice; + +pub(super) struct CollectConsumer<'c, T: Send> { + /// A slice covering the target memory, not yet initialized! + target: &'c mut [T], +} + +pub(super) struct CollectFolder<'c, T: Send> { + /// The folder writes into `result` and must extend the result + /// up to exactly this number of elements. + final_len: usize, + + /// The current written-to part of our slice of the target + result: CollectResult<'c, T>, +} + +impl<'c, T: Send + 'c> CollectConsumer<'c, T> { + /// The target memory is considered uninitialized, and will be + /// overwritten without reading or dropping existing values. + pub(super) fn new(target: &'c mut [T]) -> Self { + CollectConsumer { target } + } +} + +/// CollectResult represents an initialized part of the target slice. +/// +/// This is a proxy owner of the elements in the slice; when it drops, +/// the elements will be dropped, unless its ownership is released before then. +#[must_use] +pub(super) struct CollectResult<'c, T> { + start: *mut T, + len: usize, + invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, +} + +unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} + +impl<'c, T> CollectResult<'c, T> { + /// The current length of the collect result + pub(super) fn len(&self) -> usize { + self.len + } + + /// Release ownership of the slice of elements, and return the length + pub(super) fn release_ownership(mut self) -> usize { + let ret = self.len; + self.len = 0; + ret + } +} + +impl<'c, T> Drop for CollectResult<'c, T> { + fn drop(&mut self) { + // Drop the first `self.len` elements, which have been recorded + // to be initialized by the folder. + unsafe { + ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len)); + } + } +} + +impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { + type Folder = CollectFolder<'c, T>; + type Reducer = CollectReducer; + type Result = CollectResult<'c, T>; + + fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { + let CollectConsumer { target } = self; + + // Produce new consumers. Normal slicing ensures that the + // memory range given to each consumer is disjoint. + let (left, right) = target.split_at_mut(index); + ( + CollectConsumer::new(left), + CollectConsumer::new(right), + CollectReducer, + ) + } + + fn into_folder(self) -> CollectFolder<'c, T> { + // Create a folder that consumes values and writes them + // into target. The initial result has length 0. + CollectFolder { + final_len: self.target.len(), + result: CollectResult { + start: self.target.as_mut_ptr(), + len: 0, + invariant_lifetime: PhantomData, + }, + } + } + + fn full(&self) -> bool { + false + } +} + +impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> { + type Result = CollectResult<'c, T>; + + fn consume(mut self, item: T) -> CollectFolder<'c, T> { + if self.result.len >= self.final_len { + panic!("too many values pushed to consumer"); + } + + // Compute target pointer and write to it, and + // extend the current result by one element + unsafe { + self.result.start.add(self.result.len).write(item); + self.result.len += 1; + } + + self + } + + fn complete(self) -> Self::Result { + // NB: We don't explicitly check that the local writes were complete, + // but Collect will assert the total result length in the end. + self.result + } + + fn full(&self) -> bool { + false + } +} + +/// Pretend to be unindexed for `special_collect_into_vec`, +/// but we should never actually get used that way... +impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> { + fn split_off_left(&self) -> Self { + unreachable!("CollectConsumer must be indexed!") + } + fn to_reducer(&self) -> Self::Reducer { + CollectReducer + } +} + +/// CollectReducer combines adjacent chunks; the result must always +/// be contiguous so that it is one combined slice. +pub(super) struct CollectReducer; + +impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { + fn reduce( + self, + mut left: CollectResult<'c, T>, + right: CollectResult<'c, T>, + ) -> CollectResult<'c, T> { + // Merge if the CollectResults are adjacent and in left to right order + // else: drop the right piece now and total length will end up short in the end, + // when the correctness of the collected result is asserted. + if left.start.wrapping_add(left.len) == right.start { + left.len += right.release_ownership(); + } + left + } +} diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs new file mode 100644 index 0000000..e18298e --- /dev/null +++ b/src/iter/collect/mod.rs @@ -0,0 +1,171 @@ +use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; +use std::slice; + +mod consumer; +use self::consumer::CollectConsumer; +use self::consumer::CollectResult; +use super::unzip::unzip_indexed; + +mod test; + +/// Collects the results of the exact iterator into the specified vector. +/// +/// This is called by `IndexedParallelIterator::collect_into_vec`. +pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>) +where + I: IndexedParallelIterator<Item = T>, + T: Send, +{ + v.truncate(0); // clear any old data + let len = pi.len(); + Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer)); +} + +/// Collects the results of the iterator into the specified vector. +/// +/// Technically, this only works for `IndexedParallelIterator`, but we're faking a +/// bit of specialization here until Rust can do that natively. Callers are +/// using `opt_len` to find the length before calling this, and only exact +/// iterators will return anything but `None` there. +/// +/// Since the type system doesn't understand that contract, we have to allow +/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement +/// `UnindexedConsumer`. That implementation panics `unreachable!` in case +/// there's a bug where we actually do try to use this unindexed. +fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) +where + I: ParallelIterator<Item = T>, + T: Send, +{ + Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer)); +} + +/// Unzips the results of the exact iterator into the specified vectors. +/// +/// This is called by `IndexedParallelIterator::unzip_into_vecs`. +pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>) +where + I: IndexedParallelIterator<Item = (A, B)>, + A: Send, + B: Send, +{ + // clear any old data + left.truncate(0); + right.truncate(0); + + let len = pi.len(); + Collect::new(right, len).with_consumer(|right_consumer| { + let mut right_result = None; + Collect::new(left, len).with_consumer(|left_consumer| { + let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer); + right_result = Some(right_r); + left_r + }); + right_result.unwrap() + }); +} + +/// Manage the collection vector. +struct Collect<'c, T: Send> { + vec: &'c mut Vec<T>, + len: usize, +} + +impl<'c, T: Send + 'c> Collect<'c, T> { + fn new(vec: &'c mut Vec<T>, len: usize) -> Self { + Collect { vec, len } + } + + /// Create a consumer on the slice of memory we are collecting into. + /// + /// The consumer needs to be used inside the scope function, and the + /// complete collect result passed back. + /// + /// This method will verify the collect result, and panic if the slice + /// was not fully written into. Otherwise, in the successful case, + /// the vector is complete with the collected result. + fn with_consumer<F>(mut self, scope_fn: F) + where + F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, + { + unsafe { + let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); + let result = scope_fn(CollectConsumer::new(slice)); + + // The CollectResult represents a contiguous part of the + // slice, that has been written to. + // On unwind here, the CollectResult will be dropped. + // If some producers on the way did not produce enough elements, + // partial CollectResults may have been dropped without + // being reduced to the final result, and we will see + // that as the length coming up short. + // + // Here, we assert that `slice` is fully initialized. This is + // checked by the following assert, which verifies if a + // complete CollectResult was produced; if the length is + // correct, it is necessarily covering the target slice. + // Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any + // stores that will happen, have happened. Unless some code is buggy, + // that means we should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == self.len, + "expected {} total writes, but got {}", + self.len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = self.vec.len() + self.len; + self.vec.set_len(new_len); + } + } + + /// Reserve space for `len` more elements in the vector, + /// and return a slice to the uninitialized tail of the vector + /// + /// Safety: The tail slice is uninitialized + unsafe fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [T] { + // Reserve the new space. + vec.reserve(len); + + // Get a correct borrow, then extend it for the newly added length. + let start = vec.len(); + let slice = &mut vec[start..]; + slice::from_raw_parts_mut(slice.as_mut_ptr(), len) + } +} + +/// Extends a vector with items from a parallel iterator. +impl<T> ParallelExtend<T> for Vec<T> +where + T: Send, +{ + fn par_extend<I>(&mut self, par_iter: I) + where + I: IntoParallelIterator<Item = T>, + { + // See the vec_collect benchmarks in rayon-demo for different strategies. + let par_iter = par_iter.into_par_iter(); + match par_iter.opt_len() { + Some(len) => { + // When Rust gets specialization, we can get here for indexed iterators + // without relying on `opt_len`. Until then, `special_extend()` fakes + // an unindexed mode on the promise that `opt_len()` is accurate. + special_extend(par_iter, len, self); + } + None => { + // This works like `extend`, but `Vec::append` is more efficient. + let list = super::extend::collect(par_iter); + self.reserve(super::extend::len(&list)); + for mut vec in list { + self.append(&mut vec); + } + } + } + } +} diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs new file mode 100644 index 0000000..00c16c4 --- /dev/null +++ b/src/iter/collect/test.rs @@ -0,0 +1,385 @@ +#![cfg(test)] +#![allow(unused_assignments)] + +// These tests are primarily targeting "abusive" producers that will +// try to drive the "collect consumer" incorrectly. These should +// result in panics. + +use super::Collect; +use crate::iter::plumbing::*; +use rayon_core::join; + +use std::fmt; +use std::panic; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::Result as ThreadResult; + +/// Promises to produce 2 items, but then produces 3. Does not do any +/// splits at all. +#[test] +#[should_panic(expected = "too many values")] +fn produce_too_many_items() { + let mut v = vec![]; + Collect::new(&mut v, 2).with_consumer(|consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(22); + folder = folder.consume(23); + folder.consume(24); + unreachable!("folder does not complete") + }); +} + +/// Produces fewer items than promised. Does not do any +/// splits at all. +#[test] +#[should_panic(expected = "expected 5 total writes, but got 2")] +fn produce_fewer_items() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 5); + collect.with_consumer(|consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(22); + folder = folder.consume(23); + folder.complete() + }); +} + +// Complete is not called by the consumer. Hence,the collection vector is not fully initialized. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn left_produces_items_with_no_complete() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + right_folder.complete() + }); +} + +// Complete is not called by the right consumer. Hence,the +// collection vector is not fully initialized. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn right_produces_items_with_no_complete() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + left_folder.complete() + }); +} + +// Complete is not called by the consumer. Hence,the collection vector is not fully initialized. +#[test] +fn produces_items_with_no_complete() { + let counter = DropCounter::default(); + let mut v = vec![]; + let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let collect = Collect::new(&mut v, 2); + collect.with_consumer(|consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(counter.element()); + folder = folder.consume(counter.element()); + panic!("folder does not complete"); + }); + })); + assert!(v.is_empty()); + assert_is_panic_with_message(&panic_result, "folder does not complete"); + counter.assert_drop_count(); +} + +// The left consumer produces too many items while the right +// consumer produces correct number. +#[test] +#[should_panic(expected = "too many values")] +fn left_produces_too_many_items() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1).consume(2); + right_folder = right_folder.consume(2).consume(3); + let _ = right_folder.complete(); + unreachable!("folder does not complete"); + }); +} + +// The right consumer produces too many items while the left +// consumer produces correct number. +#[test] +#[should_panic(expected = "too many values")] +fn right_produces_too_many_items() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3).consume(4); + let _ = left_folder.complete(); + unreachable!("folder does not complete"); + }); +} + +// The left consumer produces fewer items while the right +// consumer produces correct number. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 1")] +fn left_produces_fewer_items() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); +} + +// The left and right consumer produce the correct number but +// only left result is returned +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn only_left_result() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let _ = right_folder.complete(); + left_result + }); +} + +// The left and right consumer produce the correct number but +// only right result is returned +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn only_right_result() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let _ = left_folder.complete(); + right_folder.complete() + }); +} + +// The left and right consumer produce the correct number but reduce +// in the wrong order. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn reducer_does_not_preserve_order() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(right_result, left_result) + }); +} + +// The right consumer produces fewer items while the left +// consumer produces correct number. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 3")] +fn right_produces_fewer_items() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); +} + +// The left consumer panics and the right stops short, like `panic_fuse()`. +// We should get the left panic without finishing `Collect::with_consumer`. +#[test] +#[should_panic(expected = "left consumer panic")] +fn left_panics() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let (left_result, right_result) = join( + || { + let mut left_folder = left_consumer.into_folder(); + left_folder = left_folder.consume(0); + panic!("left consumer panic"); + }, + || { + let mut right_folder = right_consumer.into_folder(); + right_folder = right_folder.consume(2); + right_folder.complete() // early return + }, + ); + reducer.reduce(left_result, right_result) + }); + unreachable!(); +} + +// The right consumer panics and the left stops short, like `panic_fuse()`. +// We should get the right panic without finishing `Collect::with_consumer`. +#[test] +#[should_panic(expected = "right consumer panic")] +fn right_panics() { + let mut v = vec![]; + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let (left_result, right_result) = join( + || { + let mut left_folder = left_consumer.into_folder(); + left_folder = left_folder.consume(0); + left_folder.complete() // early return + }, + || { + let mut right_folder = right_consumer.into_folder(); + right_folder = right_folder.consume(2); + panic!("right consumer panic"); + }, + ); + reducer.reduce(left_result, right_result) + }); + unreachable!(); +} + +// The left consumer produces fewer items while the right +// consumer produces correct number; check that created elements are dropped +#[test] +fn left_produces_fewer_items_drops() { + let counter = DropCounter::default(); + let mut v = vec![]; + let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let collect = Collect::new(&mut v, 4); + collect.with_consumer(|consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(counter.element()); + right_folder = right_folder + .consume(counter.element()) + .consume(counter.element()); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); + })); + assert!(v.is_empty()); + assert_is_panic_with_message(&panic_result, "expected 4 total writes, but got 1"); + counter.assert_drop_count(); +} + +/// This counter can create elements, and then count and verify +/// the number of which have actually been dropped again. +#[derive(Default)] +struct DropCounter { + created: AtomicUsize, + dropped: AtomicUsize, +} + +struct Element<'a>(&'a AtomicUsize); + +impl DropCounter { + fn created(&self) -> usize { + self.created.load(Ordering::SeqCst) + } + + fn dropped(&self) -> usize { + self.dropped.load(Ordering::SeqCst) + } + + fn element(&self) -> Element<'_> { + self.created.fetch_add(1, Ordering::SeqCst); + Element(&self.dropped) + } + + fn assert_drop_count(&self) { + assert_eq!( + self.created(), + self.dropped(), + "Expected {} dropped elements, but found {}", + self.created(), + self.dropped() + ); + } +} + +impl<'a> Drop for Element<'a> { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } +} + +/// Assert that the result from catch_unwind is a panic that contains expected message +fn assert_is_panic_with_message<T>(result: &ThreadResult<T>, expected: &str) +where + T: fmt::Debug, +{ + match result { + Ok(value) => { + panic!( + "assertion failure: Expected panic, got successful {:?}", + value + ); + } + Err(error) => { + let message_str = error.downcast_ref::<&'static str>().cloned(); + let message_string = error.downcast_ref::<String>().map(String::as_str); + if let Some(message) = message_str.or(message_string) { + if !message.contains(expected) { + panic!( + "assertion failure: Expected {:?}, but found panic with {:?}", + expected, message + ); + } + // assertion passes + } else { + panic!( + "assertion failure: Expected {:?}, but found panic with unknown value", + expected + ); + } + } + } +} |