diff options
author | Joel Galenson <jgalenson@google.com> | 2021-05-19 16:31:56 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-05-19 16:31:56 -0700 |
commit | dd2305def6fff1d9f1149ecb73b02e37eb8a18b0 (patch) | |
tree | a915bf2398b238dbd70ae338a6bb643a7372de84 /src/iter/collect | |
parent | 698313e1f73ce48237108f63ba85b3000f9d0ff1 (diff) | |
download | rayon-dd2305def6fff1d9f1149ecb73b02e37eb8a18b0.tar.gz |
Upgrade rust/crates/rayon to 1.5.1
Test: make
Change-Id: I40c1a4538832871d1f4cd09daf6904d094b5615e
Diffstat (limited to 'src/iter/collect')
-rw-r--r-- | src/iter/collect/consumer.rs | 72 | ||||
-rw-r--r-- | src/iter/collect/mod.rs | 78 | ||||
-rw-r--r-- | src/iter/collect/test.rs | 2 |
3 files changed, 77 insertions, 75 deletions
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs index 689f29c..3a8eea0 100644 --- a/src/iter/collect/consumer.rs +++ b/src/iter/collect/consumer.rs @@ -1,26 +1,18 @@ use super::super::plumbing::*; use std::marker::PhantomData; +use std::mem::MaybeUninit; use std::ptr; use std::slice; pub(super) struct CollectConsumer<'c, T: Send> { /// A slice covering the target memory, not yet initialized! - target: &'c mut [T], -} - -pub(super) struct CollectFolder<'c, T: Send> { - /// The folder writes into `result` and must extend the result - /// up to exactly this number of elements. - final_len: usize, - - /// The current written-to part of our slice of the target - result: CollectResult<'c, T>, + target: &'c mut [MaybeUninit<T>], } impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// The target memory is considered uninitialized, and will be /// overwritten without reading or dropping existing values. - pub(super) fn new(target: &'c mut [T]) -> Self { + pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self { CollectConsumer { target } } } @@ -31,8 +23,12 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// the elements will be dropped, unless its ownership is released before then. #[must_use] pub(super) struct CollectResult<'c, T> { - start: *mut T, + /// A slice covering the target memory, initialized up to our separate `len`. + target: &'c mut [MaybeUninit<T>], + /// The current initialized length in `target` len: usize, + /// Lifetime invariance guarantees that the data flows from consumer to result, + /// especially for the `scope_fn` callback in `Collect::with_consumer`. invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, } @@ -57,13 +53,15 @@ impl<'c, T> Drop for CollectResult<'c, T> { // Drop the first `self.len` elements, which have been recorded // to be initialized by the folder. unsafe { - ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len)); + // TODO: use `MaybeUninit::slice_as_mut_ptr` + let start = self.target.as_mut_ptr() as *mut T; + ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len)); } } } impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { - type Folder = CollectFolder<'c, T>; + type Folder = CollectResult<'c, T>; type Reducer = CollectReducer; type Result = CollectResult<'c, T>; @@ -80,16 +78,13 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { ) } - fn into_folder(self) -> CollectFolder<'c, T> { - // Create a folder that consumes values and writes them + fn into_folder(self) -> Self::Folder { + // Create a result/folder that consumes values and writes them // into target. The initial result has length 0. - CollectFolder { - final_len: self.target.len(), - result: CollectResult { - start: self.target.as_mut_ptr(), - len: 0, - invariant_lifetime: PhantomData, - }, + CollectResult { + target: self.target, + len: 0, + invariant_lifetime: PhantomData, } } @@ -98,19 +93,19 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { } } -impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> { - type Result = CollectResult<'c, T>; +impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> { + type Result = Self; - fn consume(mut self, item: T) -> CollectFolder<'c, T> { - if self.result.len >= self.final_len { - panic!("too many values pushed to consumer"); - } + fn consume(mut self, item: T) -> Self { + let dest = self + .target + .get_mut(self.len) + .expect("too many values pushed to consumer"); - // Compute target pointer and write to it, and - // extend the current result by one element + // Write item and increase the initialized length unsafe { - self.result.start.add(self.result.len).write(item); - self.result.len += 1; + dest.as_mut_ptr().write(item); + self.len += 1; } self @@ -119,7 +114,7 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> { fn complete(self) -> Self::Result { // NB: We don't explicitly check that the local writes were complete, // but Collect will assert the total result length in the end. - self.result + self } fn full(&self) -> bool { @@ -151,8 +146,13 @@ impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { // Merge if the CollectResults are adjacent and in left to right order // else: drop the right piece now and total length will end up short in the end, // when the correctness of the collected result is asserted. - if left.start.wrapping_add(left.len) == right.start { - left.len += right.release_ownership(); + let left_end = left.target[left.len..].as_ptr(); + if left_end == right.target.as_ptr() { + let len = left.len + right.release_ownership(); + unsafe { + left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len); + } + left.len = len; } left } diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs index e18298e..7cbf215 100644 --- a/src/iter/collect/mod.rs +++ b/src/iter/collect/mod.rs @@ -1,4 +1,5 @@ use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; +use std::mem::MaybeUninit; use std::slice; mod consumer; @@ -88,55 +89,56 @@ impl<'c, T: Send + 'c> Collect<'c, T> { where F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, { + let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); + let result = scope_fn(CollectConsumer::new(slice)); + + // The CollectResult represents a contiguous part of the + // slice, that has been written to. + // On unwind here, the CollectResult will be dropped. + // If some producers on the way did not produce enough elements, + // partial CollectResults may have been dropped without + // being reduced to the final result, and we will see + // that as the length coming up short. + // + // Here, we assert that `slice` is fully initialized. This is + // checked by the following assert, which verifies if a + // complete CollectResult was produced; if the length is + // correct, it is necessarily covering the target slice. + // Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any + // stores that will happen, have happened. Unless some code is buggy, + // that means we should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == self.len, + "expected {} total writes, but got {}", + self.len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = self.vec.len() + self.len; + unsafe { - let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); - let result = scope_fn(CollectConsumer::new(slice)); - - // The CollectResult represents a contiguous part of the - // slice, that has been written to. - // On unwind here, the CollectResult will be dropped. - // If some producers on the way did not produce enough elements, - // partial CollectResults may have been dropped without - // being reduced to the final result, and we will see - // that as the length coming up short. - // - // Here, we assert that `slice` is fully initialized. This is - // checked by the following assert, which verifies if a - // complete CollectResult was produced; if the length is - // correct, it is necessarily covering the target slice. - // Since we know that the consumer cannot have escaped from - // `drive` (by parametricity, essentially), we know that any - // stores that will happen, have happened. Unless some code is buggy, - // that means we should have seen `len` total writes. - let actual_writes = result.len(); - assert!( - actual_writes == self.len, - "expected {} total writes, but got {}", - self.len, - actual_writes - ); - - // Release the result's mutable borrow and "proxy ownership" - // of the elements, before the vector takes it over. - result.release_ownership(); - - let new_len = self.vec.len() + self.len; self.vec.set_len(new_len); } } /// Reserve space for `len` more elements in the vector, /// and return a slice to the uninitialized tail of the vector - /// - /// Safety: The tail slice is uninitialized - unsafe fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [T] { + fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] { // Reserve the new space. vec.reserve(len); - // Get a correct borrow, then extend it for the newly added length. + // TODO: use `Vec::spare_capacity_mut` instead + // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout + // as `T`, and we already made sure to have the additional space. let start = vec.len(); - let slice = &mut vec[start..]; - slice::from_raw_parts_mut(slice.as_mut_ptr(), len) + let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>; + unsafe { slice::from_raw_parts_mut(tail_ptr, len) } } } diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs index 00c16c4..ddf7757 100644 --- a/src/iter/collect/test.rs +++ b/src/iter/collect/test.rs @@ -24,7 +24,7 @@ fn produce_too_many_items() { let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); - folder.consume(24); + folder = folder.consume(24); unreachable!("folder does not complete") }); } |