aboutsummaryrefslogtreecommitdiff
path: root/src/iter/collect
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2022-12-13 14:15:06 +0100
committerJeff Vander Stoep <jeffv@google.com>2022-12-13 14:16:39 +0100
commit6229992cbe96fbbf719aa4fdbede3888ffea67ba (patch)
tree756e01e069080219bb1bb7487156c9f88919b97a /src/iter/collect
parent2bfe0b856493f125b6182750a099a577c7835d07 (diff)
downloadrayon-6229992cbe96fbbf719aa4fdbede3888ffea67ba.tar.gz
Upgrade rayon to 1.6.1main-16k-with-phones
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/rayon For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md Test: TreeHugger Change-Id: I55672395408c0a770cc19a909a94f46b6fccfd38
Diffstat (limited to 'src/iter/collect')
-rw-r--r--src/iter/collect/consumer.rs113
-rw-r--r--src/iter/collect/mod.rs159
-rw-r--r--src/iter/collect/test.rs50
3 files changed, 139 insertions, 183 deletions
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs
index 3a8eea0..acd67df 100644
--- a/src/iter/collect/consumer.rs
+++ b/src/iter/collect/consumer.rs
@@ -1,19 +1,38 @@
use super::super::plumbing::*;
+use crate::SendPtr;
use std::marker::PhantomData;
-use std::mem::MaybeUninit;
use std::ptr;
use std::slice;
pub(super) struct CollectConsumer<'c, T: Send> {
- /// A slice covering the target memory, not yet initialized!
- target: &'c mut [MaybeUninit<T>],
+ /// See `CollectResult` for explanation of why this is not a slice
+ start: SendPtr<T>,
+ len: usize,
+ marker: PhantomData<&'c mut T>,
+}
+
+impl<T: Send> CollectConsumer<'_, T> {
+ /// Create a collector for `len` items in the unused capacity of the vector.
+ pub(super) fn appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T> {
+ let start = vec.len();
+ assert!(vec.capacity() - start >= len);
+
+ // SAFETY: We already made sure to have the additional space allocated.
+ // The pointer is derived from `Vec` directly, not through a `Deref`,
+ // so it has provenance over the whole allocation.
+ unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) }
+ }
}
impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// The target memory is considered uninitialized, and will be
/// overwritten without reading or dropping existing values.
- pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self {
- CollectConsumer { target }
+ unsafe fn new(start: *mut T, len: usize) -> Self {
+ CollectConsumer {
+ start: SendPtr(start),
+ len,
+ marker: PhantomData,
+ }
}
}
@@ -23,10 +42,13 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// the elements will be dropped, unless its ownership is released before then.
#[must_use]
pub(super) struct CollectResult<'c, T> {
- /// A slice covering the target memory, initialized up to our separate `len`.
- target: &'c mut [MaybeUninit<T>],
- /// The current initialized length in `target`
- len: usize,
+ /// This pointer and length has the same representation as a slice,
+ /// but retains the provenance of the entire array so that we can merge
+ /// these regions together in `CollectReducer`.
+ start: SendPtr<T>,
+ total_len: usize,
+ /// The current initialized length after `start`
+ initialized_len: usize,
/// Lifetime invariance guarantees that the data flows from consumer to result,
/// especially for the `scope_fn` callback in `Collect::with_consumer`.
invariant_lifetime: PhantomData<&'c mut &'c mut [T]>,
@@ -37,25 +59,26 @@ unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {}
impl<'c, T> CollectResult<'c, T> {
/// The current length of the collect result
pub(super) fn len(&self) -> usize {
- self.len
+ self.initialized_len
}
/// Release ownership of the slice of elements, and return the length
pub(super) fn release_ownership(mut self) -> usize {
- let ret = self.len;
- self.len = 0;
+ let ret = self.initialized_len;
+ self.initialized_len = 0;
ret
}
}
impl<'c, T> Drop for CollectResult<'c, T> {
fn drop(&mut self) {
- // Drop the first `self.len` elements, which have been recorded
+ // Drop the first `self.initialized_len` elements, which have been recorded
// to be initialized by the folder.
unsafe {
- // TODO: use `MaybeUninit::slice_as_mut_ptr`
- let start = self.target.as_mut_ptr() as *mut T;
- ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len));
+ ptr::drop_in_place(slice::from_raw_parts_mut(
+ self.start.0,
+ self.initialized_len,
+ ));
}
}
}
@@ -66,24 +89,27 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
type Result = CollectResult<'c, T>;
fn split_at(self, index: usize) -> (Self, Self, CollectReducer) {
- let CollectConsumer { target } = self;
-
- // Produce new consumers. Normal slicing ensures that the
- // memory range given to each consumer is disjoint.
- let (left, right) = target.split_at_mut(index);
- (
- CollectConsumer::new(left),
- CollectConsumer::new(right),
- CollectReducer,
- )
+ let CollectConsumer { start, len, .. } = self;
+
+ // Produce new consumers.
+ // SAFETY: This assert checks that `index` is a valid offset for `start`
+ unsafe {
+ assert!(index <= len);
+ (
+ CollectConsumer::new(start.0, index),
+ CollectConsumer::new(start.0.add(index), len - index),
+ CollectReducer,
+ )
+ }
}
fn into_folder(self) -> Self::Folder {
// Create a result/folder that consumes values and writes them
- // into target. The initial result has length 0.
+ // into the region after start. The initial result has length 0.
CollectResult {
- target: self.target,
- len: 0,
+ start: self.start,
+ total_len: self.len,
+ initialized_len: 0,
invariant_lifetime: PhantomData,
}
}
@@ -97,15 +123,17 @@ impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> {
type Result = Self;
fn consume(mut self, item: T) -> Self {
- let dest = self
- .target
- .get_mut(self.len)
- .expect("too many values pushed to consumer");
+ assert!(
+ self.initialized_len < self.total_len,
+ "too many values pushed to consumer"
+ );
- // Write item and increase the initialized length
+ // SAFETY: The assert above is a bounds check for this write, and we
+ // avoid assignment here so we do not drop an uninitialized T.
unsafe {
- dest.as_mut_ptr().write(item);
- self.len += 1;
+ // Write item and increase the initialized length
+ self.start.0.add(self.initialized_len).write(item);
+ self.initialized_len += 1;
}
self
@@ -146,14 +174,13 @@ impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer {
// Merge if the CollectResults are adjacent and in left to right order
// else: drop the right piece now and total length will end up short in the end,
// when the correctness of the collected result is asserted.
- let left_end = left.target[left.len..].as_ptr();
- if left_end == right.target.as_ptr() {
- let len = left.len + right.release_ownership();
- unsafe {
- left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len);
+ unsafe {
+ let left_end = left.start.0.add(left.initialized_len);
+ if left_end == right.start.0 {
+ left.total_len += right.total_len;
+ left.initialized_len += right.release_ownership();
}
- left.len = len;
+ left
}
- left
}
}
diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs
index 7cbf215..4044a68 100644
--- a/src/iter/collect/mod.rs
+++ b/src/iter/collect/mod.rs
@@ -1,6 +1,4 @@
-use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
-use std::mem::MaybeUninit;
-use std::slice;
+use super::{IndexedParallelIterator, ParallelIterator};
mod consumer;
use self::consumer::CollectConsumer;
@@ -19,7 +17,7 @@ where
{
v.truncate(0); // clear any old data
let len = pi.len();
- Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer));
+ collect_with_consumer(v, len, |consumer| pi.drive(consumer));
}
/// Collects the results of the iterator into the specified vector.
@@ -33,12 +31,12 @@ where
/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
/// `UnindexedConsumer`. That implementation panics `unreachable!` in case
/// there's a bug where we actually do try to use this unindexed.
-fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
+pub(super) fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
where
I: ParallelIterator<Item = T>,
T: Send,
{
- Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer));
+ collect_with_consumer(v, len, |consumer| pi.drive_unindexed(consumer));
}
/// Unzips the results of the exact iterator into the specified vectors.
@@ -55,9 +53,9 @@ where
right.truncate(0);
let len = pi.len();
- Collect::new(right, len).with_consumer(|right_consumer| {
+ collect_with_consumer(right, len, |right_consumer| {
let mut right_result = None;
- Collect::new(left, len).with_consumer(|left_consumer| {
+ collect_with_consumer(left, len, |left_consumer| {
let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
right_result = Some(right_r);
left_r
@@ -66,108 +64,53 @@ where
});
}
-/// Manage the collection vector.
-struct Collect<'c, T: Send> {
- vec: &'c mut Vec<T>,
- len: usize,
-}
-
-impl<'c, T: Send + 'c> Collect<'c, T> {
- fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
- Collect { vec, len }
- }
-
- /// Create a consumer on the slice of memory we are collecting into.
- ///
- /// The consumer needs to be used inside the scope function, and the
- /// complete collect result passed back.
- ///
- /// This method will verify the collect result, and panic if the slice
- /// was not fully written into. Otherwise, in the successful case,
- /// the vector is complete with the collected result.
- fn with_consumer<F>(mut self, scope_fn: F)
- where
- F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
- {
- let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
- let result = scope_fn(CollectConsumer::new(slice));
-
- // The CollectResult represents a contiguous part of the
- // slice, that has been written to.
- // On unwind here, the CollectResult will be dropped.
- // If some producers on the way did not produce enough elements,
- // partial CollectResults may have been dropped without
- // being reduced to the final result, and we will see
- // that as the length coming up short.
- //
- // Here, we assert that `slice` is fully initialized. This is
- // checked by the following assert, which verifies if a
- // complete CollectResult was produced; if the length is
- // correct, it is necessarily covering the target slice.
- // Since we know that the consumer cannot have escaped from
- // `drive` (by parametricity, essentially), we know that any
- // stores that will happen, have happened. Unless some code is buggy,
- // that means we should have seen `len` total writes.
- let actual_writes = result.len();
- assert!(
- actual_writes == self.len,
- "expected {} total writes, but got {}",
- self.len,
- actual_writes
- );
-
- // Release the result's mutable borrow and "proxy ownership"
- // of the elements, before the vector takes it over.
- result.release_ownership();
-
- let new_len = self.vec.len() + self.len;
-
- unsafe {
- self.vec.set_len(new_len);
- }
- }
-
- /// Reserve space for `len` more elements in the vector,
- /// and return a slice to the uninitialized tail of the vector
- fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
- // Reserve the new space.
- vec.reserve(len);
-
- // TODO: use `Vec::spare_capacity_mut` instead
- // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
- // as `T`, and we already made sure to have the additional space.
- let start = vec.len();
- let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
- unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
- }
-}
-
-/// Extends a vector with items from a parallel iterator.
-impl<T> ParallelExtend<T> for Vec<T>
+/// Create a consumer on the slice of memory we are collecting into.
+///
+/// The consumer needs to be used inside the scope function, and the
+/// complete collect result passed back.
+///
+/// This method will verify the collect result, and panic if the slice
+/// was not fully written into. Otherwise, in the successful case,
+/// the vector is complete with the collected result.
+fn collect_with_consumer<T, F>(vec: &mut Vec<T>, len: usize, scope_fn: F)
where
T: Send,
+ F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
{
- fn par_extend<I>(&mut self, par_iter: I)
- where
- I: IntoParallelIterator<Item = T>,
- {
- // See the vec_collect benchmarks in rayon-demo for different strategies.
- let par_iter = par_iter.into_par_iter();
- match par_iter.opt_len() {
- Some(len) => {
- // When Rust gets specialization, we can get here for indexed iterators
- // without relying on `opt_len`. Until then, `special_extend()` fakes
- // an unindexed mode on the promise that `opt_len()` is accurate.
- special_extend(par_iter, len, self);
- }
- None => {
- // This works like `extend`, but `Vec::append` is more efficient.
- let list = super::extend::collect(par_iter);
- self.reserve(super::extend::len(&list));
- for mut vec in list {
- self.append(&mut vec);
- }
- }
- }
+ // Reserve space for `len` more elements in the vector,
+ vec.reserve(len);
+
+ // Create the consumer and run the callback for collection.
+ let result = scope_fn(CollectConsumer::appender(vec, len));
+
+ // The `CollectResult` represents a contiguous part of the slice, that has
+ // been written to. On unwind here, the `CollectResult` will be dropped. If
+ // some producers on the way did not produce enough elements, partial
+ // `CollectResult`s may have been dropped without being reduced to the final
+ // result, and we will see that as the length coming up short.
+ //
+ // Here, we assert that added length is fully initialized. This is checked
+ // by the following assert, which verifies if a complete `CollectResult`
+ // was produced; if the length is correct, it is necessarily covering the
+ // target slice. Since we know that the consumer cannot have escaped from
+ // `drive` (by parametricity, essentially), we know that any stores that
+ // will happen, have happened. Unless some code is buggy, that means we
+ // should have seen `len` total writes.
+ let actual_writes = result.len();
+ assert!(
+ actual_writes == len,
+ "expected {} total writes, but got {}",
+ len,
+ actual_writes
+ );
+
+ // Release the result's mutable borrow and "proxy ownership"
+ // of the elements, before the vector takes it over.
+ result.release_ownership();
+
+ let new_len = vec.len() + len;
+
+ unsafe {
+ vec.set_len(new_len);
}
}
diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs
index ddf7757..b5f676f 100644
--- a/src/iter/collect/test.rs
+++ b/src/iter/collect/test.rs
@@ -5,7 +5,7 @@
// try to drive the "collect consumer" incorrectly. These should
// result in panics.
-use super::Collect;
+use super::collect_with_consumer;
use crate::iter::plumbing::*;
use rayon_core::join;
@@ -20,7 +20,7 @@ use std::thread::Result as ThreadResult;
#[should_panic(expected = "too many values")]
fn produce_too_many_items() {
let mut v = vec![];
- Collect::new(&mut v, 2).with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 2, |consumer| {
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
@@ -35,8 +35,7 @@ fn produce_too_many_items() {
#[should_panic(expected = "expected 5 total writes, but got 2")]
fn produce_fewer_items() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 5);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 5, |consumer| {
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
@@ -49,8 +48,7 @@ fn produce_fewer_items() {
#[should_panic(expected = "expected 4 total writes, but got 2")]
fn left_produces_items_with_no_complete() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -66,8 +64,7 @@ fn left_produces_items_with_no_complete() {
#[should_panic(expected = "expected 4 total writes, but got 2")]
fn right_produces_items_with_no_complete() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -83,8 +80,7 @@ fn produces_items_with_no_complete() {
let counter = DropCounter::default();
let mut v = vec![];
let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- let collect = Collect::new(&mut v, 2);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 2, |consumer| {
let mut folder = consumer.into_folder();
folder = folder.consume(counter.element());
folder = folder.consume(counter.element());
@@ -102,8 +98,7 @@ fn produces_items_with_no_complete() {
#[should_panic(expected = "too many values")]
fn left_produces_too_many_items() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -120,8 +115,7 @@ fn left_produces_too_many_items() {
#[should_panic(expected = "too many values")]
fn right_produces_too_many_items() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -138,8 +132,7 @@ fn right_produces_too_many_items() {
#[should_panic(expected = "expected 4 total writes, but got 1")]
fn left_produces_fewer_items() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
@@ -158,8 +151,7 @@ fn left_produces_fewer_items() {
#[should_panic(expected = "expected 4 total writes, but got 2")]
fn only_left_result() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -177,8 +169,7 @@ fn only_left_result() {
#[should_panic(expected = "expected 4 total writes, but got 2")]
fn only_right_result() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
@@ -195,8 +186,7 @@ fn only_right_result() {
#[should_panic(expected = "expected 4 total writes, but got 2")]
fn reducer_does_not_preserve_order() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
@@ -215,8 +205,7 @@ fn reducer_does_not_preserve_order() {
#[should_panic(expected = "expected 4 total writes, but got 3")]
fn right_produces_fewer_items() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
@@ -230,13 +219,12 @@ fn right_produces_fewer_items() {
}
// The left consumer panics and the right stops short, like `panic_fuse()`.
-// We should get the left panic without finishing `Collect::with_consumer`.
+// We should get the left panic without finishing `collect_with_consumer`.
#[test]
#[should_panic(expected = "left consumer panic")]
fn left_panics() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let (left_result, right_result) = join(
@@ -257,13 +245,12 @@ fn left_panics() {
}
// The right consumer panics and the left stops short, like `panic_fuse()`.
-// We should get the right panic without finishing `Collect::with_consumer`.
+// We should get the right panic without finishing `collect_with_consumer`.
#[test]
#[should_panic(expected = "right consumer panic")]
fn right_panics() {
let mut v = vec![];
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let (left_result, right_result) = join(
@@ -290,8 +277,7 @@ fn left_produces_fewer_items_drops() {
let counter = DropCounter::default();
let mut v = vec![];
let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- let collect = Collect::new(&mut v, 4);
- collect.with_consumer(|consumer| {
+ collect_with_consumer(&mut v, 4, |consumer| {
let reducer = consumer.to_reducer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();