diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2022-12-13 14:15:06 +0100 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2022-12-13 14:16:39 +0100 |
commit | 6229992cbe96fbbf719aa4fdbede3888ffea67ba (patch) | |
tree | 756e01e069080219bb1bb7487156c9f88919b97a /src/iter/collect | |
parent | 2bfe0b856493f125b6182750a099a577c7835d07 (diff) | |
download | rayon-6229992cbe96fbbf719aa4fdbede3888ffea67ba.tar.gz |
Upgrade rayon to 1.6.1main-16k-with-phones
This project was upgraded with external_updater.
Usage: tools/external_updater/updater.sh update rust/crates/rayon
For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
Test: TreeHugger
Change-Id: I55672395408c0a770cc19a909a94f46b6fccfd38
Diffstat (limited to 'src/iter/collect')
-rw-r--r-- | src/iter/collect/consumer.rs | 113 | ||||
-rw-r--r-- | src/iter/collect/mod.rs | 159 | ||||
-rw-r--r-- | src/iter/collect/test.rs | 50 |
3 files changed, 139 insertions, 183 deletions
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs index 3a8eea0..acd67df 100644 --- a/src/iter/collect/consumer.rs +++ b/src/iter/collect/consumer.rs @@ -1,19 +1,38 @@ use super::super::plumbing::*; +use crate::SendPtr; use std::marker::PhantomData; -use std::mem::MaybeUninit; use std::ptr; use std::slice; pub(super) struct CollectConsumer<'c, T: Send> { - /// A slice covering the target memory, not yet initialized! - target: &'c mut [MaybeUninit<T>], + /// See `CollectResult` for explanation of why this is not a slice + start: SendPtr<T>, + len: usize, + marker: PhantomData<&'c mut T>, +} + +impl<T: Send> CollectConsumer<'_, T> { + /// Create a collector for `len` items in the unused capacity of the vector. + pub(super) fn appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T> { + let start = vec.len(); + assert!(vec.capacity() - start >= len); + + // SAFETY: We already made sure to have the additional space allocated. + // The pointer is derived from `Vec` directly, not through a `Deref`, + // so it has provenance over the whole allocation. + unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) } + } } impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// The target memory is considered uninitialized, and will be /// overwritten without reading or dropping existing values. - pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self { - CollectConsumer { target } + unsafe fn new(start: *mut T, len: usize) -> Self { + CollectConsumer { + start: SendPtr(start), + len, + marker: PhantomData, + } } } @@ -23,10 +42,13 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// the elements will be dropped, unless its ownership is released before then. #[must_use] pub(super) struct CollectResult<'c, T> { - /// A slice covering the target memory, initialized up to our separate `len`. - target: &'c mut [MaybeUninit<T>], - /// The current initialized length in `target` - len: usize, + /// This pointer and length has the same representation as a slice, + /// but retains the provenance of the entire array so that we can merge + /// these regions together in `CollectReducer`. + start: SendPtr<T>, + total_len: usize, + /// The current initialized length after `start` + initialized_len: usize, /// Lifetime invariance guarantees that the data flows from consumer to result, /// especially for the `scope_fn` callback in `Collect::with_consumer`. invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, @@ -37,25 +59,26 @@ unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} impl<'c, T> CollectResult<'c, T> { /// The current length of the collect result pub(super) fn len(&self) -> usize { - self.len + self.initialized_len } /// Release ownership of the slice of elements, and return the length pub(super) fn release_ownership(mut self) -> usize { - let ret = self.len; - self.len = 0; + let ret = self.initialized_len; + self.initialized_len = 0; ret } } impl<'c, T> Drop for CollectResult<'c, T> { fn drop(&mut self) { - // Drop the first `self.len` elements, which have been recorded + // Drop the first `self.initialized_len` elements, which have been recorded // to be initialized by the folder. unsafe { - // TODO: use `MaybeUninit::slice_as_mut_ptr` - let start = self.target.as_mut_ptr() as *mut T; - ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len)); + ptr::drop_in_place(slice::from_raw_parts_mut( + self.start.0, + self.initialized_len, + )); } } } @@ -66,24 +89,27 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { type Result = CollectResult<'c, T>; fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { - let CollectConsumer { target } = self; - - // Produce new consumers. Normal slicing ensures that the - // memory range given to each consumer is disjoint. - let (left, right) = target.split_at_mut(index); - ( - CollectConsumer::new(left), - CollectConsumer::new(right), - CollectReducer, - ) + let CollectConsumer { start, len, .. } = self; + + // Produce new consumers. + // SAFETY: This assert checks that `index` is a valid offset for `start` + unsafe { + assert!(index <= len); + ( + CollectConsumer::new(start.0, index), + CollectConsumer::new(start.0.add(index), len - index), + CollectReducer, + ) + } } fn into_folder(self) -> Self::Folder { // Create a result/folder that consumes values and writes them - // into target. The initial result has length 0. + // into the region after start. The initial result has length 0. CollectResult { - target: self.target, - len: 0, + start: self.start, + total_len: self.len, + initialized_len: 0, invariant_lifetime: PhantomData, } } @@ -97,15 +123,17 @@ impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> { type Result = Self; fn consume(mut self, item: T) -> Self { - let dest = self - .target - .get_mut(self.len) - .expect("too many values pushed to consumer"); + assert!( + self.initialized_len < self.total_len, + "too many values pushed to consumer" + ); - // Write item and increase the initialized length + // SAFETY: The assert above is a bounds check for this write, and we + // avoid assignment here so we do not drop an uninitialized T. unsafe { - dest.as_mut_ptr().write(item); - self.len += 1; + // Write item and increase the initialized length + self.start.0.add(self.initialized_len).write(item); + self.initialized_len += 1; } self @@ -146,14 +174,13 @@ impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { // Merge if the CollectResults are adjacent and in left to right order // else: drop the right piece now and total length will end up short in the end, // when the correctness of the collected result is asserted. - let left_end = left.target[left.len..].as_ptr(); - if left_end == right.target.as_ptr() { - let len = left.len + right.release_ownership(); - unsafe { - left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len); + unsafe { + let left_end = left.start.0.add(left.initialized_len); + if left_end == right.start.0 { + left.total_len += right.total_len; + left.initialized_len += right.release_ownership(); } - left.len = len; + left } - left } } diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs index 7cbf215..4044a68 100644 --- a/src/iter/collect/mod.rs +++ b/src/iter/collect/mod.rs @@ -1,6 +1,4 @@ -use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; -use std::mem::MaybeUninit; -use std::slice; +use super::{IndexedParallelIterator, ParallelIterator}; mod consumer; use self::consumer::CollectConsumer; @@ -19,7 +17,7 @@ where { v.truncate(0); // clear any old data let len = pi.len(); - Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer)); + collect_with_consumer(v, len, |consumer| pi.drive(consumer)); } /// Collects the results of the iterator into the specified vector. @@ -33,12 +31,12 @@ where /// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement /// `UnindexedConsumer`. That implementation panics `unreachable!` in case /// there's a bug where we actually do try to use this unindexed. -fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) +pub(super) fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) where I: ParallelIterator<Item = T>, T: Send, { - Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer)); + collect_with_consumer(v, len, |consumer| pi.drive_unindexed(consumer)); } /// Unzips the results of the exact iterator into the specified vectors. @@ -55,9 +53,9 @@ where right.truncate(0); let len = pi.len(); - Collect::new(right, len).with_consumer(|right_consumer| { + collect_with_consumer(right, len, |right_consumer| { let mut right_result = None; - Collect::new(left, len).with_consumer(|left_consumer| { + collect_with_consumer(left, len, |left_consumer| { let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer); right_result = Some(right_r); left_r @@ -66,108 +64,53 @@ where }); } -/// Manage the collection vector. -struct Collect<'c, T: Send> { - vec: &'c mut Vec<T>, - len: usize, -} - -impl<'c, T: Send + 'c> Collect<'c, T> { - fn new(vec: &'c mut Vec<T>, len: usize) -> Self { - Collect { vec, len } - } - - /// Create a consumer on the slice of memory we are collecting into. - /// - /// The consumer needs to be used inside the scope function, and the - /// complete collect result passed back. - /// - /// This method will verify the collect result, and panic if the slice - /// was not fully written into. Otherwise, in the successful case, - /// the vector is complete with the collected result. - fn with_consumer<F>(mut self, scope_fn: F) - where - F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, - { - let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); - let result = scope_fn(CollectConsumer::new(slice)); - - // The CollectResult represents a contiguous part of the - // slice, that has been written to. - // On unwind here, the CollectResult will be dropped. - // If some producers on the way did not produce enough elements, - // partial CollectResults may have been dropped without - // being reduced to the final result, and we will see - // that as the length coming up short. - // - // Here, we assert that `slice` is fully initialized. This is - // checked by the following assert, which verifies if a - // complete CollectResult was produced; if the length is - // correct, it is necessarily covering the target slice. - // Since we know that the consumer cannot have escaped from - // `drive` (by parametricity, essentially), we know that any - // stores that will happen, have happened. Unless some code is buggy, - // that means we should have seen `len` total writes. - let actual_writes = result.len(); - assert!( - actual_writes == self.len, - "expected {} total writes, but got {}", - self.len, - actual_writes - ); - - // Release the result's mutable borrow and "proxy ownership" - // of the elements, before the vector takes it over. - result.release_ownership(); - - let new_len = self.vec.len() + self.len; - - unsafe { - self.vec.set_len(new_len); - } - } - - /// Reserve space for `len` more elements in the vector, - /// and return a slice to the uninitialized tail of the vector - fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] { - // Reserve the new space. - vec.reserve(len); - - // TODO: use `Vec::spare_capacity_mut` instead - // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout - // as `T`, and we already made sure to have the additional space. - let start = vec.len(); - let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>; - unsafe { slice::from_raw_parts_mut(tail_ptr, len) } - } -} - -/// Extends a vector with items from a parallel iterator. -impl<T> ParallelExtend<T> for Vec<T> +/// Create a consumer on the slice of memory we are collecting into. +/// +/// The consumer needs to be used inside the scope function, and the +/// complete collect result passed back. +/// +/// This method will verify the collect result, and panic if the slice +/// was not fully written into. Otherwise, in the successful case, +/// the vector is complete with the collected result. +fn collect_with_consumer<T, F>(vec: &mut Vec<T>, len: usize, scope_fn: F) where T: Send, + F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, { - fn par_extend<I>(&mut self, par_iter: I) - where - I: IntoParallelIterator<Item = T>, - { - // See the vec_collect benchmarks in rayon-demo for different strategies. - let par_iter = par_iter.into_par_iter(); - match par_iter.opt_len() { - Some(len) => { - // When Rust gets specialization, we can get here for indexed iterators - // without relying on `opt_len`. Until then, `special_extend()` fakes - // an unindexed mode on the promise that `opt_len()` is accurate. - special_extend(par_iter, len, self); - } - None => { - // This works like `extend`, but `Vec::append` is more efficient. - let list = super::extend::collect(par_iter); - self.reserve(super::extend::len(&list)); - for mut vec in list { - self.append(&mut vec); - } - } - } + // Reserve space for `len` more elements in the vector, + vec.reserve(len); + + // Create the consumer and run the callback for collection. + let result = scope_fn(CollectConsumer::appender(vec, len)); + + // The `CollectResult` represents a contiguous part of the slice, that has + // been written to. On unwind here, the `CollectResult` will be dropped. If + // some producers on the way did not produce enough elements, partial + // `CollectResult`s may have been dropped without being reduced to the final + // result, and we will see that as the length coming up short. + // + // Here, we assert that added length is fully initialized. This is checked + // by the following assert, which verifies if a complete `CollectResult` + // was produced; if the length is correct, it is necessarily covering the + // target slice. Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any stores that + // will happen, have happened. Unless some code is buggy, that means we + // should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == len, + "expected {} total writes, but got {}", + len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = vec.len() + len; + + unsafe { + vec.set_len(new_len); } } diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs index ddf7757..b5f676f 100644 --- a/src/iter/collect/test.rs +++ b/src/iter/collect/test.rs @@ -5,7 +5,7 @@ // try to drive the "collect consumer" incorrectly. These should // result in panics. -use super::Collect; +use super::collect_with_consumer; use crate::iter::plumbing::*; use rayon_core::join; @@ -20,7 +20,7 @@ use std::thread::Result as ThreadResult; #[should_panic(expected = "too many values")] fn produce_too_many_items() { let mut v = vec![]; - Collect::new(&mut v, 2).with_consumer(|consumer| { + collect_with_consumer(&mut v, 2, |consumer| { let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); @@ -35,8 +35,7 @@ fn produce_too_many_items() { #[should_panic(expected = "expected 5 total writes, but got 2")] fn produce_fewer_items() { let mut v = vec![]; - let collect = Collect::new(&mut v, 5); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 5, |consumer| { let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); @@ -49,8 +48,7 @@ fn produce_fewer_items() { #[should_panic(expected = "expected 4 total writes, but got 2")] fn left_produces_items_with_no_complete() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -66,8 +64,7 @@ fn left_produces_items_with_no_complete() { #[should_panic(expected = "expected 4 total writes, but got 2")] fn right_produces_items_with_no_complete() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -83,8 +80,7 @@ fn produces_items_with_no_complete() { let counter = DropCounter::default(); let mut v = vec![]; let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - let collect = Collect::new(&mut v, 2); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 2, |consumer| { let mut folder = consumer.into_folder(); folder = folder.consume(counter.element()); folder = folder.consume(counter.element()); @@ -102,8 +98,7 @@ fn produces_items_with_no_complete() { #[should_panic(expected = "too many values")] fn left_produces_too_many_items() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -120,8 +115,7 @@ fn left_produces_too_many_items() { #[should_panic(expected = "too many values")] fn right_produces_too_many_items() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -138,8 +132,7 @@ fn right_produces_too_many_items() { #[should_panic(expected = "expected 4 total writes, but got 1")] fn left_produces_fewer_items() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); @@ -158,8 +151,7 @@ fn left_produces_fewer_items() { #[should_panic(expected = "expected 4 total writes, but got 2")] fn only_left_result() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -177,8 +169,7 @@ fn only_left_result() { #[should_panic(expected = "expected 4 total writes, but got 2")] fn only_right_result() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -195,8 +186,7 @@ fn only_right_result() { #[should_panic(expected = "expected 4 total writes, but got 2")] fn reducer_does_not_preserve_order() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); @@ -215,8 +205,7 @@ fn reducer_does_not_preserve_order() { #[should_panic(expected = "expected 4 total writes, but got 3")] fn right_produces_fewer_items() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); @@ -230,13 +219,12 @@ fn right_produces_fewer_items() { } // The left consumer panics and the right stops short, like `panic_fuse()`. -// We should get the left panic without finishing `Collect::with_consumer`. +// We should get the left panic without finishing `collect_with_consumer`. #[test] #[should_panic(expected = "left consumer panic")] fn left_panics() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let (left_result, right_result) = join( @@ -257,13 +245,12 @@ fn left_panics() { } // The right consumer panics and the left stops short, like `panic_fuse()`. -// We should get the right panic without finishing `Collect::with_consumer`. +// We should get the right panic without finishing `collect_with_consumer`. #[test] #[should_panic(expected = "right consumer panic")] fn right_panics() { let mut v = vec![]; - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let (left_result, right_result) = join( @@ -290,8 +277,7 @@ fn left_produces_fewer_items_drops() { let counter = DropCounter::default(); let mut v = vec![]; let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - let collect = Collect::new(&mut v, 4); - collect.with_consumer(|consumer| { + collect_with_consumer(&mut v, 4, |consumer| { let reducer = consumer.to_reducer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); |