aboutsummaryrefslogtreecommitdiff
path: root/src/iter/collect
diff options
context:
space:
mode:
Diffstat (limited to 'src/iter/collect')
-rw-r--r--src/iter/collect/consumer.rs72
-rw-r--r--src/iter/collect/mod.rs78
-rw-r--r--src/iter/collect/test.rs2
3 files changed, 77 insertions, 75 deletions
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs
index 689f29c..3a8eea0 100644
--- a/src/iter/collect/consumer.rs
+++ b/src/iter/collect/consumer.rs
@@ -1,26 +1,18 @@
use super::super::plumbing::*;
use std::marker::PhantomData;
+use std::mem::MaybeUninit;
use std::ptr;
use std::slice;
pub(super) struct CollectConsumer<'c, T: Send> {
/// A slice covering the target memory, not yet initialized!
- target: &'c mut [T],
-}
-
-pub(super) struct CollectFolder<'c, T: Send> {
- /// The folder writes into `result` and must extend the result
- /// up to exactly this number of elements.
- final_len: usize,
-
- /// The current written-to part of our slice of the target
- result: CollectResult<'c, T>,
+ target: &'c mut [MaybeUninit<T>],
}
impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// The target memory is considered uninitialized, and will be
/// overwritten without reading or dropping existing values.
- pub(super) fn new(target: &'c mut [T]) -> Self {
+ pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self {
CollectConsumer { target }
}
}
@@ -31,8 +23,12 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// the elements will be dropped, unless its ownership is released before then.
#[must_use]
pub(super) struct CollectResult<'c, T> {
- start: *mut T,
+ /// A slice covering the target memory, initialized up to our separate `len`.
+ target: &'c mut [MaybeUninit<T>],
+ /// The current initialized length in `target`
len: usize,
+ /// Lifetime invariance guarantees that the data flows from consumer to result,
+ /// especially for the `scope_fn` callback in `Collect::with_consumer`.
invariant_lifetime: PhantomData<&'c mut &'c mut [T]>,
}
@@ -57,13 +53,15 @@ impl<'c, T> Drop for CollectResult<'c, T> {
// Drop the first `self.len` elements, which have been recorded
// to be initialized by the folder.
unsafe {
- ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len));
+ // TODO: use `MaybeUninit::slice_as_mut_ptr`
+ let start = self.target.as_mut_ptr() as *mut T;
+ ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len));
}
}
}
impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
- type Folder = CollectFolder<'c, T>;
+ type Folder = CollectResult<'c, T>;
type Reducer = CollectReducer;
type Result = CollectResult<'c, T>;
@@ -80,16 +78,13 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
)
}
- fn into_folder(self) -> CollectFolder<'c, T> {
- // Create a folder that consumes values and writes them
+ fn into_folder(self) -> Self::Folder {
+ // Create a result/folder that consumes values and writes them
// into target. The initial result has length 0.
- CollectFolder {
- final_len: self.target.len(),
- result: CollectResult {
- start: self.target.as_mut_ptr(),
- len: 0,
- invariant_lifetime: PhantomData,
- },
+ CollectResult {
+ target: self.target,
+ len: 0,
+ invariant_lifetime: PhantomData,
}
}
@@ -98,19 +93,19 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
}
}
-impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
- type Result = CollectResult<'c, T>;
+impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> {
+ type Result = Self;
- fn consume(mut self, item: T) -> CollectFolder<'c, T> {
- if self.result.len >= self.final_len {
- panic!("too many values pushed to consumer");
- }
+ fn consume(mut self, item: T) -> Self {
+ let dest = self
+ .target
+ .get_mut(self.len)
+ .expect("too many values pushed to consumer");
- // Compute target pointer and write to it, and
- // extend the current result by one element
+ // Write item and increase the initialized length
unsafe {
- self.result.start.add(self.result.len).write(item);
- self.result.len += 1;
+ dest.as_mut_ptr().write(item);
+ self.len += 1;
}
self
@@ -119,7 +114,7 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
fn complete(self) -> Self::Result {
// NB: We don't explicitly check that the local writes were complete,
// but Collect will assert the total result length in the end.
- self.result
+ self
}
fn full(&self) -> bool {
@@ -151,8 +146,13 @@ impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer {
// Merge if the CollectResults are adjacent and in left to right order
// else: drop the right piece now and total length will end up short in the end,
// when the correctness of the collected result is asserted.
- if left.start.wrapping_add(left.len) == right.start {
- left.len += right.release_ownership();
+ let left_end = left.target[left.len..].as_ptr();
+ if left_end == right.target.as_ptr() {
+ let len = left.len + right.release_ownership();
+ unsafe {
+ left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len);
+ }
+ left.len = len;
}
left
}
diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs
index e18298e..7cbf215 100644
--- a/src/iter/collect/mod.rs
+++ b/src/iter/collect/mod.rs
@@ -1,4 +1,5 @@
use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
+use std::mem::MaybeUninit;
use std::slice;
mod consumer;
@@ -88,55 +89,56 @@ impl<'c, T: Send + 'c> Collect<'c, T> {
where
F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
{
+ let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
+ let result = scope_fn(CollectConsumer::new(slice));
+
+ // The CollectResult represents a contiguous part of the
+ // slice, that has been written to.
+ // On unwind here, the CollectResult will be dropped.
+ // If some producers on the way did not produce enough elements,
+ // partial CollectResults may have been dropped without
+ // being reduced to the final result, and we will see
+ // that as the length coming up short.
+ //
+ // Here, we assert that `slice` is fully initialized. This is
+ // checked by the following assert, which verifies if a
+ // complete CollectResult was produced; if the length is
+ // correct, it is necessarily covering the target slice.
+ // Since we know that the consumer cannot have escaped from
+ // `drive` (by parametricity, essentially), we know that any
+ // stores that will happen, have happened. Unless some code is buggy,
+ // that means we should have seen `len` total writes.
+ let actual_writes = result.len();
+ assert!(
+ actual_writes == self.len,
+ "expected {} total writes, but got {}",
+ self.len,
+ actual_writes
+ );
+
+ // Release the result's mutable borrow and "proxy ownership"
+ // of the elements, before the vector takes it over.
+ result.release_ownership();
+
+ let new_len = self.vec.len() + self.len;
+
unsafe {
- let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
- let result = scope_fn(CollectConsumer::new(slice));
-
- // The CollectResult represents a contiguous part of the
- // slice, that has been written to.
- // On unwind here, the CollectResult will be dropped.
- // If some producers on the way did not produce enough elements,
- // partial CollectResults may have been dropped without
- // being reduced to the final result, and we will see
- // that as the length coming up short.
- //
- // Here, we assert that `slice` is fully initialized. This is
- // checked by the following assert, which verifies if a
- // complete CollectResult was produced; if the length is
- // correct, it is necessarily covering the target slice.
- // Since we know that the consumer cannot have escaped from
- // `drive` (by parametricity, essentially), we know that any
- // stores that will happen, have happened. Unless some code is buggy,
- // that means we should have seen `len` total writes.
- let actual_writes = result.len();
- assert!(
- actual_writes == self.len,
- "expected {} total writes, but got {}",
- self.len,
- actual_writes
- );
-
- // Release the result's mutable borrow and "proxy ownership"
- // of the elements, before the vector takes it over.
- result.release_ownership();
-
- let new_len = self.vec.len() + self.len;
self.vec.set_len(new_len);
}
}
/// Reserve space for `len` more elements in the vector,
/// and return a slice to the uninitialized tail of the vector
- ///
- /// Safety: The tail slice is uninitialized
- unsafe fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [T] {
+ fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
// Reserve the new space.
vec.reserve(len);
- // Get a correct borrow, then extend it for the newly added length.
+ // TODO: use `Vec::spare_capacity_mut` instead
+ // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
+ // as `T`, and we already made sure to have the additional space.
let start = vec.len();
- let slice = &mut vec[start..];
- slice::from_raw_parts_mut(slice.as_mut_ptr(), len)
+ let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
+ unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
}
}
diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs
index 00c16c4..ddf7757 100644
--- a/src/iter/collect/test.rs
+++ b/src/iter/collect/test.rs
@@ -24,7 +24,7 @@ fn produce_too_many_items() {
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
- folder.consume(24);
+ folder = folder.consume(24);
unreachable!("folder does not complete")
});
}