From dd2305def6fff1d9f1149ecb73b02e37eb8a18b0 Mon Sep 17 00:00:00 2001 From: Joel Galenson Date: Wed, 19 May 2021 16:31:56 -0700 Subject: Upgrade rust/crates/rayon to 1.5.1 Test: make Change-Id: I40c1a4538832871d1f4cd09daf6904d094b5615e --- .cargo_vcs_info.json | 2 +- Android.bp | 18 ++--- Cargo.toml | 8 +-- Cargo.toml.orig | 8 +-- FAQ.md | 2 +- METADATA | 10 +-- README.md | 4 +- RELEASES.md | 33 ++++++++- build.rs | 3 + src/array.rs | 90 +++++++++++++++++++++++++ src/iter/collect/consumer.rs | 72 ++++++++++---------- src/iter/collect/mod.rs | 78 +++++++++++----------- src/iter/collect/test.rs | 2 +- src/iter/mod.rs | 81 ++++++++++++++++++++-- src/iter/par_bridge.rs | 38 ++++++++--- src/iter/unzip.rs | 61 +++++++++++++++++ src/lib.rs | 5 +- src/range.rs | 156 ++++++++++++++++++++++++++++++++++--------- src/range_inclusive.rs | 142 +++++++++++++++++++++++++++++++++------ src/slice/mergesort.rs | 10 +-- src/slice/mod.rs | 18 ----- src/slice/quicksort.rs | 4 +- src/slice/test.rs | 4 +- src/vec.rs | 39 +++++++++-- tests/clones.rs | 6 ++ tests/debug.rs | 6 ++ tests/producer_split_at.rs | 6 ++ tests/sort-panic-safe.rs | 2 +- 28 files changed, 700 insertions(+), 208 deletions(-) create mode 100644 src/array.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index c28c857..c8aa4e4 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "dc13cb7875ad43c7d1ea8b1e504b09c031f7ed5a" + "sha1": "ebcb09b1dc53211c6b5abdf4dc5b40e4bcd0a965" } } diff --git a/Android.bp b/Android.bp index b05b5b2..101bf04 100644 --- a/Android.bp +++ b/Android.bp @@ -39,13 +39,13 @@ license { rust_library { name: "librayon", - // has rustc warnings host_supported: true, crate_name: "rayon", srcs: ["src/lib.rs"], edition: "2018", - flags: [ - "--cfg step_by", + cfgs: [ + "min_const_generics", + "step_by", ], rustlibs: [ "libcrossbeam_deque", @@ -57,14 +57,14 @@ rust_library { // dependent_library ["feature_list"] // autocfg-1.0.1 // cfg-if-1.0.0 -// crossbeam-channel-0.5.0 "crossbeam-utils,default,std" +// crossbeam-channel-0.5.1 "crossbeam-utils,default,std" // crossbeam-deque-0.8.0 "crossbeam-epoch,crossbeam-utils,default,std" -// crossbeam-epoch-0.9.3 "alloc,lazy_static,std" -// crossbeam-utils-0.8.3 "default,lazy_static,std" +// crossbeam-epoch-0.9.4 "alloc,lazy_static,std" +// crossbeam-utils-0.8.4 "default,lazy_static,std" // either-1.6.1 // lazy_static-1.4.0 -// libc-0.2.88 "default,std" -// memoffset-0.6.1 "default" +// libc-0.2.94 "default,std" +// memoffset-0.6.3 "default" // num_cpus-1.13.0 -// rayon-core-1.9.0 +// rayon-core-1.9.1 // scopeguard-1.1.0 diff --git a/Cargo.toml b/Cargo.toml index 2129b6f..3b8921d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ [package] edition = "2018" name = "rayon" -version = "1.5.0" +version = "1.5.1" authors = ["Niko Matsakis ", "Josh Stone "] exclude = ["/ci/*", "/scripts/*", "/.github/*", "/bors.toml"] description = "Simple work-stealing parallelism for Rust" @@ -31,7 +31,7 @@ version = "1.0" default-features = false [dependencies.rayon-core] -version = "1.9.0" +version = "1.9.1" [dev-dependencies.docopt] version = "1" @@ -39,10 +39,10 @@ version = "1" version = "1" [dev-dependencies.rand] -version = "0.7" +version = "0.8" [dev-dependencies.rand_xorshift] -version = "0.2" +version = "0.3" [dev-dependencies.serde] version = "1.0.85" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index a7600e1..6ab9dda 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "1.5.0" +version = "1.5.1" authors = ["Niko Matsakis ", "Josh Stone "] description = "Simple work-stealing parallelism for Rust" @@ -19,7 +19,7 @@ members = ["rayon-demo", "rayon-core"] exclude = ["ci"] [dependencies] -rayon-core = { version = "1.9.0", path = "rayon-core" } +rayon-core = { version = "1.9.1", path = "rayon-core" } crossbeam-deque = "0.8.0" # This is a public dependency! @@ -30,8 +30,8 @@ default-features = false [dev-dependencies] docopt = "1" lazy_static = "1" -rand = "0.7" -rand_xorshift = "0.2" +rand = "0.8" +rand_xorshift = "0.3" [dev-dependencies.serde] version = "1.0.85" diff --git a/FAQ.md b/FAQ.md index 11117d3..745f033 100644 --- a/FAQ.md +++ b/FAQ.md @@ -86,7 +86,7 @@ tscounter.store(value + 1, Ordering::SeqCst); You can already see that the `AtomicUsize` API is a bit more complex, as it requires you to specify an -[ordering](http://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html). (I +[ordering](https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html). (I won't go into the details on ordering here, but suffice to say that if you don't know what an ordering is, and probably even if you do, you should use `Ordering::SeqCst`.) The danger in this parallel version of diff --git a/METADATA b/METADATA index 7d115ed..ddf9f80 100644 --- a/METADATA +++ b/METADATA @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/rayon/rayon-1.5.0.crate" + value: "https://static.crates.io/crates/rayon/rayon-1.5.1.crate" } - version: "1.5.0" + version: "1.5.1" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 21 + year: 2021 + month: 5 + day: 19 } } diff --git a/README.md b/README.md index 869c537..4c4dff6 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon) [![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon) ![minimum rustc 1.36](https://img.shields.io/badge/rustc-1.36+-red.svg) -![build status](https://github.com/rayon-rs/rayon/workflows/master/badge.svg) +[![build status](https://github.com/rayon-rs/rayon/workflows/master/badge.svg)](https://github.com/rayon-rs/rayon/actions) [![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Rayon is a data-parallelism library for Rust. It is extremely @@ -15,7 +15,7 @@ Belt Rust conference.) Rayon is [available on crates.io](https://crates.io/crates/rayon), and [API Documentation is available on docs.rs](https://docs.rs/rayon/). -[blog]: http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ +[blog]: https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ [video]: https://www.youtube.com/watch?v=gof_OEv71Aw ## Parallel iterators and more diff --git a/RELEASES.md b/RELEASES.md index fa452c3..0299436 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,3 +1,30 @@ +# Release rayon 1.5.1 / rayon-core 1.9.1 (2021-05-18) + +- The new `in_place_scope` and `in_place_scope_fifo` are variations of `scope` + and `scope_fifo`, running the initial non-`Send` callback directly on the + current thread, rather than moving execution to the thread pool. +- With Rust 1.51 or later, arrays now implement `IntoParallelIterator`. +- New implementations of `FromParallelIterator` make it possible to `collect` + complicated nestings of items. + - `FromParallelIterator<(A, B)> for (FromA, FromB)` works like `unzip`. + - `FromParallelIterator> for (A, B)` works like `partition_map`. +- Type inference now works better with parallel `Range` and `RangeInclusive`. +- The implementation of `FromParallelIterator` and `ParallelExtend` for + `Vec` now uses `MaybeUninit` internally to avoid creating any + references to uninitialized data. +- `ParallelBridge` fixed a bug with threads missing available work. + +## Contributors + +Thanks to all of the contributors for this release! + +- @atouchet +- @cuviper +- @Hywan +- @iRaiko +- @Qwaz +- @rocallahan + # Release rayon 1.5.0 / rayon-core 1.9.0 (2020-10-21) - Update crossbeam dependencies. @@ -606,7 +633,7 @@ Thanks to the following people for their contributions to this release: This release includes a lot of progress towards the goal of parity with the sequential iterator API, though there are still a few methods that are not yet complete. If you'd like to help with that effort, -[check out the milestone](https://github.com/nikomatsakis/rayon/issues?q=is%3Aopen+is%3Aissue+milestone%3A%22Parity+with+the+%60Iterator%60+trait%22) +[check out the milestone](https://github.com/rayon-rs/rayon/issues?q=is%3Aopen+is%3Aissue+milestone%3A%22Parity+with+the+%60Iterator%60+trait%22) to see the remaining issues. **Announcement:** @cuviper has been added as a collaborator to the @@ -672,7 +699,7 @@ API. Thanks @cuviper! Keep it up. - We are considering removing weights or changing the weight mechanism before 1.0. Examples of scenarios where you still need weights even with this adaptive mechanism would be great. Join the discussion - at . + at . - New (unstable) scoped threads API, see `rayon::scope` for details. - You will need to supply the [cargo feature] `unstable`. - The various demos and benchmarks have been consolidated into one @@ -682,7 +709,7 @@ API. Thanks @cuviper! Keep it up. - Various internal cleanup in the implementation and typo fixes. Thanks @cuviper, @Eh2406, and @spacejam! -[cargo feature]: http://doc.crates.io/manifest.html#the-features-section +[cargo feature]: https://doc.rust-lang.org/cargo/reference/features.html#the-features-section # Release 0.4.2 (2016-09-15) diff --git a/build.rs b/build.rs index 70e301b..e52e326 100644 --- a/build.rs +++ b/build.rs @@ -3,4 +3,7 @@ fn main() { if ac.probe_expression("(0..10).step_by(2).rev()") { autocfg::emit("step_by"); } + if ac.probe_expression("{ fn foo() {} }") { + autocfg::emit("min_const_generics"); + } } diff --git a/src/array.rs b/src/array.rs new file mode 100644 index 0000000..7922cd6 --- /dev/null +++ b/src/array.rs @@ -0,0 +1,90 @@ +#![cfg(min_const_generics)] +//! Parallel iterator types for [arrays] (`[T; N]`) +//! +//! You will rarely need to interact with this module directly unless you need +//! to name one of the iterator types. +//! +//! Everything in this module requires const generics, stabilized in Rust 1.51. +//! +//! [arrays]: https://doc.rust-lang.org/std/primitive.array.html + +use crate::iter::plumbing::*; +use crate::iter::*; +use crate::slice::{Iter, IterMut}; +use crate::vec::DrainProducer; +use std::mem::ManuallyDrop; + +/// This implementation requires const generics, stabilized in Rust 1.51. +impl<'data, T: Sync + 'data, const N: usize> IntoParallelIterator for &'data [T; N] { + type Item = &'data T; + type Iter = Iter<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&[T]>::into_par_iter(self) + } +} + +/// This implementation requires const generics, stabilized in Rust 1.51. +impl<'data, T: Send + 'data, const N: usize> IntoParallelIterator for &'data mut [T; N] { + type Item = &'data mut T; + type Iter = IterMut<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&mut [T]>::into_par_iter(self) + } +} + +/// This implementation requires const generics, stabilized in Rust 1.51. +impl IntoParallelIterator for [T; N] { + type Item = T; + type Iter = IntoIter; + + fn into_par_iter(self) -> Self::Iter { + IntoIter { array: self } + } +} + +/// Parallel iterator that moves out of an array. +#[derive(Debug, Clone)] +pub struct IntoIter { + array: [T; N], +} + +impl ParallelIterator for IntoIter { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(N) + } +} + +impl IndexedParallelIterator for IntoIter { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + N + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + unsafe { + // Drain every item, and then the local array can just fall out of scope. + let mut array = ManuallyDrop::new(self.array); + callback.callback(DrainProducer::new(&mut *array)) + } + } +} diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs index 689f29c..3a8eea0 100644 --- a/src/iter/collect/consumer.rs +++ b/src/iter/collect/consumer.rs @@ -1,26 +1,18 @@ use super::super::plumbing::*; use std::marker::PhantomData; +use std::mem::MaybeUninit; use std::ptr; use std::slice; pub(super) struct CollectConsumer<'c, T: Send> { /// A slice covering the target memory, not yet initialized! - target: &'c mut [T], -} - -pub(super) struct CollectFolder<'c, T: Send> { - /// The folder writes into `result` and must extend the result - /// up to exactly this number of elements. - final_len: usize, - - /// The current written-to part of our slice of the target - result: CollectResult<'c, T>, + target: &'c mut [MaybeUninit], } impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// The target memory is considered uninitialized, and will be /// overwritten without reading or dropping existing values. - pub(super) fn new(target: &'c mut [T]) -> Self { + pub(super) fn new(target: &'c mut [MaybeUninit]) -> Self { CollectConsumer { target } } } @@ -31,8 +23,12 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> { /// the elements will be dropped, unless its ownership is released before then. #[must_use] pub(super) struct CollectResult<'c, T> { - start: *mut T, + /// A slice covering the target memory, initialized up to our separate `len`. + target: &'c mut [MaybeUninit], + /// The current initialized length in `target` len: usize, + /// Lifetime invariance guarantees that the data flows from consumer to result, + /// especially for the `scope_fn` callback in `Collect::with_consumer`. invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, } @@ -57,13 +53,15 @@ impl<'c, T> Drop for CollectResult<'c, T> { // Drop the first `self.len` elements, which have been recorded // to be initialized by the folder. unsafe { - ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len)); + // TODO: use `MaybeUninit::slice_as_mut_ptr` + let start = self.target.as_mut_ptr() as *mut T; + ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len)); } } } impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { - type Folder = CollectFolder<'c, T>; + type Folder = CollectResult<'c, T>; type Reducer = CollectReducer; type Result = CollectResult<'c, T>; @@ -80,16 +78,13 @@ impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { ) } - fn into_folder(self) -> CollectFolder<'c, T> { - // Create a folder that consumes values and writes them + fn into_folder(self) -> Self::Folder { + // Create a result/folder that consumes values and writes them // into target. The initial result has length 0. - CollectFolder { - final_len: self.target.len(), - result: CollectResult { - start: self.target.as_mut_ptr(), - len: 0, - invariant_lifetime: PhantomData, - }, + CollectResult { + target: self.target, + len: 0, + invariant_lifetime: PhantomData, } } @@ -98,19 +93,19 @@ impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { } } -impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { - type Result = CollectResult<'c, T>; +impl<'c, T: Send + 'c> Folder for CollectResult<'c, T> { + type Result = Self; - fn consume(mut self, item: T) -> CollectFolder<'c, T> { - if self.result.len >= self.final_len { - panic!("too many values pushed to consumer"); - } + fn consume(mut self, item: T) -> Self { + let dest = self + .target + .get_mut(self.len) + .expect("too many values pushed to consumer"); - // Compute target pointer and write to it, and - // extend the current result by one element + // Write item and increase the initialized length unsafe { - self.result.start.add(self.result.len).write(item); - self.result.len += 1; + dest.as_mut_ptr().write(item); + self.len += 1; } self @@ -119,7 +114,7 @@ impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { fn complete(self) -> Self::Result { // NB: We don't explicitly check that the local writes were complete, // but Collect will assert the total result length in the end. - self.result + self } fn full(&self) -> bool { @@ -151,8 +146,13 @@ impl<'c, T> Reducer> for CollectReducer { // Merge if the CollectResults are adjacent and in left to right order // else: drop the right piece now and total length will end up short in the end, // when the correctness of the collected result is asserted. - if left.start.wrapping_add(left.len) == right.start { - left.len += right.release_ownership(); + let left_end = left.target[left.len..].as_ptr(); + if left_end == right.target.as_ptr() { + let len = left.len + right.release_ownership(); + unsafe { + left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len); + } + left.len = len; } left } diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs index e18298e..7cbf215 100644 --- a/src/iter/collect/mod.rs +++ b/src/iter/collect/mod.rs @@ -1,4 +1,5 @@ use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; +use std::mem::MaybeUninit; use std::slice; mod consumer; @@ -88,55 +89,56 @@ impl<'c, T: Send + 'c> Collect<'c, T> { where F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, { + let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); + let result = scope_fn(CollectConsumer::new(slice)); + + // The CollectResult represents a contiguous part of the + // slice, that has been written to. + // On unwind here, the CollectResult will be dropped. + // If some producers on the way did not produce enough elements, + // partial CollectResults may have been dropped without + // being reduced to the final result, and we will see + // that as the length coming up short. + // + // Here, we assert that `slice` is fully initialized. This is + // checked by the following assert, which verifies if a + // complete CollectResult was produced; if the length is + // correct, it is necessarily covering the target slice. + // Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any + // stores that will happen, have happened. Unless some code is buggy, + // that means we should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == self.len, + "expected {} total writes, but got {}", + self.len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = self.vec.len() + self.len; + unsafe { - let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); - let result = scope_fn(CollectConsumer::new(slice)); - - // The CollectResult represents a contiguous part of the - // slice, that has been written to. - // On unwind here, the CollectResult will be dropped. - // If some producers on the way did not produce enough elements, - // partial CollectResults may have been dropped without - // being reduced to the final result, and we will see - // that as the length coming up short. - // - // Here, we assert that `slice` is fully initialized. This is - // checked by the following assert, which verifies if a - // complete CollectResult was produced; if the length is - // correct, it is necessarily covering the target slice. - // Since we know that the consumer cannot have escaped from - // `drive` (by parametricity, essentially), we know that any - // stores that will happen, have happened. Unless some code is buggy, - // that means we should have seen `len` total writes. - let actual_writes = result.len(); - assert!( - actual_writes == self.len, - "expected {} total writes, but got {}", - self.len, - actual_writes - ); - - // Release the result's mutable borrow and "proxy ownership" - // of the elements, before the vector takes it over. - result.release_ownership(); - - let new_len = self.vec.len() + self.len; self.vec.set_len(new_len); } } /// Reserve space for `len` more elements in the vector, /// and return a slice to the uninitialized tail of the vector - /// - /// Safety: The tail slice is uninitialized - unsafe fn reserve_get_tail_slice(vec: &mut Vec, len: usize) -> &mut [T] { + fn reserve_get_tail_slice(vec: &mut Vec, len: usize) -> &mut [MaybeUninit] { // Reserve the new space. vec.reserve(len); - // Get a correct borrow, then extend it for the newly added length. + // TODO: use `Vec::spare_capacity_mut` instead + // SAFETY: `MaybeUninit` is guaranteed to have the same layout + // as `T`, and we already made sure to have the additional space. let start = vec.len(); - let slice = &mut vec[start..]; - slice::from_raw_parts_mut(slice.as_mut_ptr(), len) + let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit; + unsafe { slice::from_raw_parts_mut(tail_ptr, len) } } } diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs index 00c16c4..ddf7757 100644 --- a/src/iter/collect/test.rs +++ b/src/iter/collect/test.rs @@ -24,7 +24,7 @@ fn produce_too_many_items() { let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); - folder.consume(24); + folder = folder.consume(24); unreachable!("folder does not complete") }); } diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 0c82933..edff1a6 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -61,7 +61,7 @@ //! If you'd like to build a custom parallel iterator, or to write your own //! combinator, then check out the [split] function and the [plumbing] module. //! -//! [regular iterator]: http://doc.rust-lang.org/std/iter/trait.Iterator.html +//! [regular iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html //! [`ParallelIterator`]: trait.ParallelIterator.html //! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html //! [split]: fn.split.html @@ -1966,6 +1966,79 @@ pub trait ParallelIterator: Sized + Send { /// /// assert_eq!(sync_vec, async_vec); /// ``` + /// + /// You can collect a pair of collections like [`unzip`](#method.unzip) + /// for paired items: + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let a = [(0, 1), (1, 2), (2, 3), (3, 4)]; + /// let (first, second): (Vec<_>, Vec<_>) = a.into_par_iter().collect(); + /// + /// assert_eq!(first, [0, 1, 2, 3]); + /// assert_eq!(second, [1, 2, 3, 4]); + /// ``` + /// + /// Or like [`partition_map`](#method.partition_map) for `Either` items: + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::Either; + /// + /// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter().map(|x| { + /// if x % 2 == 0 { + /// Either::Left(x * 4) + /// } else { + /// Either::Right(x * 3) + /// } + /// }).collect(); + /// + /// assert_eq!(left, [0, 8, 16, 24]); + /// assert_eq!(right, [3, 9, 15, 21]); + /// ``` + /// + /// You can even collect an arbitrarily-nested combination of pairs and `Either`: + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::Either; + /// + /// let (first, (left, right)): (Vec<_>, (Vec<_>, Vec<_>)) + /// = (0..8).into_par_iter().map(|x| { + /// if x % 2 == 0 { + /// (x, Either::Left(x * 4)) + /// } else { + /// (-x, Either::Right(x * 3)) + /// } + /// }).collect(); + /// + /// assert_eq!(first, [0, -1, 2, -3, 4, -5, 6, -7]); + /// assert_eq!(left, [0, 8, 16, 24]); + /// assert_eq!(right, [3, 9, 15, 21]); + /// ``` + /// + /// All of that can _also_ be combined with short-circuiting collection of + /// `Result` or `Option` types: + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::Either; + /// + /// let result: Result<(Vec<_>, (Vec<_>, Vec<_>)), _> + /// = (0..8).into_par_iter().map(|x| { + /// if x > 5 { + /// Err(x) + /// } else if x % 2 == 0 { + /// Ok((x, Either::Left(x * 4))) + /// } else { + /// Ok((-x, Either::Right(x * 3))) + /// } + /// }).collect(); + /// + /// let error = result.unwrap_err(); + /// assert!(error == 6 || error == 7); + /// ``` fn collect(self) -> C where C: FromParallelIterator, @@ -2130,7 +2203,7 @@ pub trait ParallelIterator: Sized + Send { /// See the [README] for more details on the internals of parallel /// iterators. /// - /// [README]: README.md + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer; @@ -2817,7 +2890,7 @@ pub trait IndexedParallelIterator: ParallelIterator { /// See the [README] for more details on the internals of parallel /// iterators. /// - /// [README]: README.md + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn drive>(self, consumer: C) -> C::Result; /// Internal method used to define the behavior of this parallel @@ -2834,7 +2907,7 @@ pub trait IndexedParallelIterator: ParallelIterator { /// See the [README] for more details on the internals of parallel /// iterators. /// - /// [README]: README.md + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn with_producer>(self, callback: CB) -> CB::Output; } diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index 4c2b96e..339ac1a 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -125,16 +125,19 @@ where let mut count = self.split_count.load(Ordering::SeqCst); loop { - let done = self.done.load(Ordering::SeqCst); + // Check if the iterator is exhausted *and* we've consumed every item from it. + let done = self.done.load(Ordering::SeqCst) && self.items.is_empty(); + match count.checked_sub(1) { Some(new_count) if !done => { - let last_count = - self.split_count - .compare_and_swap(count, new_count, Ordering::SeqCst); - if last_count == count { - return (self.clone(), Some(self)); - } else { - count = last_count; + match self.split_count.compare_exchange_weak( + count, + new_count, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return (self.clone(), Some(self)), + Err(last_count) => count = last_count, } } _ => { @@ -157,13 +160,26 @@ where } } Steal::Empty => { + // Don't storm the mutex if we're already done. if self.done.load(Ordering::SeqCst) { - // the iterator is out of items, no use in continuing - return folder; + // Someone might have pushed more between our `steal()` and `done.load()` + if self.items.is_empty() { + // The iterator is out of items, no use in continuing + return folder; + } } else { // our cache is out of items, time to load more from the iterator match self.iter.try_lock() { Ok(mut guard) => { + // Check `done` again in case we raced with the previous lock + // holder on its way out. + if self.done.load(Ordering::SeqCst) { + if self.items.is_empty() { + return folder; + } + continue; + } + let count = current_num_threads(); let count = (count * count) * 2; @@ -184,7 +200,7 @@ where } Err(TryLockError::WouldBlock) => { // someone else has the mutex, just sit tight until it's ready - yield_now(); //TODO: use a thread=pool-aware yield? (#548) + yield_now(); //TODO: use a thread-pool-aware yield? (#548) } Err(TryLockError::Poisoned(_)) => { // any panics from other threads will have been caught by the pool, diff --git a/src/iter/unzip.rs b/src/iter/unzip.rs index 219b909..0b7074e 100644 --- a/src/iter/unzip.rs +++ b/src/iter/unzip.rs @@ -462,3 +462,64 @@ where } } } + +impl FromParallelIterator<(A, B)> for (FromA, FromB) +where + A: Send, + B: Send, + FromA: Send + FromParallelIterator, + FromB: Send + FromParallelIterator, +{ + fn from_par_iter(pi: I) -> Self + where + I: IntoParallelIterator, + { + let (a, b): (Collector, Collector) = pi.into_par_iter().unzip(); + (a.result.unwrap(), b.result.unwrap()) + } +} + +impl FromParallelIterator> for (A, B) +where + L: Send, + R: Send, + A: Send + FromParallelIterator, + B: Send + FromParallelIterator, +{ + fn from_par_iter(pi: I) -> Self + where + I: IntoParallelIterator>, + { + fn identity(x: T) -> T { + x + } + + let (a, b): (Collector, Collector) = pi.into_par_iter().partition_map(identity); + (a.result.unwrap(), b.result.unwrap()) + } +} + +/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`. +struct Collector { + result: Option, +} + +impl Default for Collector { + fn default() -> Self { + Collector { result: None } + } +} + +impl ParallelExtend for Collector +where + T: Send, + FromT: Send + FromParallelIterator, +{ + fn par_extend(&mut self, pi: I) + where + I: IntoParallelIterator, + { + debug_assert!(self.result.is_none()); + self.result = Some(pi.into_par_iter().collect()); + } +} diff --git a/src/lib.rs b/src/lib.rs index d5d0314..b1e6fab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,6 +91,7 @@ mod private; mod split_producer; +pub mod array; pub mod collections; pub mod iter; pub mod option; @@ -114,7 +115,7 @@ pub use rayon_core::ThreadPool; pub use rayon_core::ThreadPoolBuildError; pub use rayon_core::ThreadPoolBuilder; pub use rayon_core::{current_num_threads, current_thread_index}; +pub use rayon_core::{in_place_scope, scope, Scope}; +pub use rayon_core::{in_place_scope_fifo, scope_fifo, ScopeFifo}; pub use rayon_core::{join, join_context}; -pub use rayon_core::{scope, Scope}; -pub use rayon_core::{scope_fifo, ScopeFifo}; pub use rayon_core::{spawn, spawn_fifo}; diff --git a/src/range.rs b/src/range.rs index 09ba25e..57b613e 100644 --- a/src/range.rs +++ b/src/range.rs @@ -19,10 +19,11 @@ use crate::iter::plumbing::*; use crate::iter::*; use std::char; +use std::convert::TryFrom; use std::ops::Range; use std::usize; -/// Parallel iterator over a range, implemented for all integer types. +/// Parallel iterator over a range, implemented for all integer types and `char`. /// /// **Note:** The `zip` operation requires `IndexedParallelIterator` /// which is not implemented for `u64`, `i64`, `u128`, or `i128`. @@ -48,6 +49,7 @@ pub struct Iter { range: Range, } +/// Implemented for ranges of all primitive integer types and `char`. impl IntoParallelIterator for Range where Iter: ParallelIterator, @@ -76,40 +78,117 @@ where } } +/// These traits help drive integer type inference. Without them, an unknown `{integer}` type only +/// has constraints on `Iter<{integer}>`, which will probably give up and use `i32`. By adding +/// these traits on the item type, the compiler can see a more direct constraint to infer like +/// `{integer}: RangeInteger`, which works better. See `test_issue_833` for an example. +/// +/// They have to be `pub` since they're seen in the public `impl ParallelIterator` constraints, but +/// we put them in a private modules so they're not actually reachable in our public API. +mod private { + use super::*; + + /// Implementation details of `ParallelIterator for Iter` + pub trait RangeInteger: Sized + Send { + private_decl! {} + + fn drive_unindexed(iter: Iter, consumer: C) -> C::Result + where + C: UnindexedConsumer; + + fn opt_len(iter: &Iter) -> Option; + } + + /// Implementation details of `IndexedParallelIterator for Iter` + pub trait IndexedRangeInteger: RangeInteger { + private_decl! {} + + fn drive(iter: Iter, consumer: C) -> C::Result + where + C: Consumer; + + fn len(iter: &Iter) -> usize; + + fn with_producer(iter: Iter, callback: CB) -> CB::Output + where + CB: ProducerCallback; + } +} +use private::{IndexedRangeInteger, RangeInteger}; + +impl ParallelIterator for Iter { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + T::drive_unindexed(self, consumer) + } + + #[inline] + fn opt_len(&self) -> Option { + T::opt_len(self) + } +} + +impl IndexedParallelIterator for Iter { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + T::drive(self, consumer) + } + + #[inline] + fn len(&self) -> usize { + T::len(self) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + T::with_producer(self, callback) + } +} + macro_rules! indexed_range_impl { ( $t:ty ) => { - impl ParallelIterator for Iter<$t> { - type Item = $t; + impl RangeInteger for $t { + private_impl! {} - fn drive_unindexed(self, consumer: C) -> C::Result + fn drive_unindexed(iter: Iter<$t>, consumer: C) -> C::Result where - C: UnindexedConsumer, + C: UnindexedConsumer<$t>, { - bridge(self, consumer) + bridge(iter, consumer) } - fn opt_len(&self) -> Option { - Some(self.len()) + fn opt_len(iter: &Iter<$t>) -> Option { + Some(iter.range.len()) } } - impl IndexedParallelIterator for Iter<$t> { - fn drive(self, consumer: C) -> C::Result + impl IndexedRangeInteger for $t { + private_impl! {} + + fn drive(iter: Iter<$t>, consumer: C) -> C::Result where - C: Consumer, + C: Consumer<$t>, { - bridge(self, consumer) + bridge(iter, consumer) } - fn len(&self) -> usize { - self.range.len() + fn len(iter: &Iter<$t>) -> usize { + iter.range.len() } - fn with_producer(self, callback: CB) -> CB::Output + fn with_producer(iter: Iter<$t>, callback: CB) -> CB::Output where - CB: ProducerCallback, + CB: ProducerCallback<$t>, { - callback.callback(IterProducer { range: self.range }) + callback.callback(IterProducer { range: iter.range }) } } @@ -150,36 +229,31 @@ macro_rules! unindexed_range_impl { } } - impl ParallelIterator for Iter<$t> { - type Item = $t; + impl RangeInteger for $t { + private_impl! {} - fn drive_unindexed(self, consumer: C) -> C::Result + fn drive_unindexed(iter: Iter<$t>, consumer: C) -> C::Result where - C: UnindexedConsumer, + C: UnindexedConsumer<$t>, { #[inline] fn offset(start: $t) -> impl Fn(usize) -> $t { move |i| start.wrapping_add(i as $t) } - if let Some(len) = self.opt_len() { + if let Some(len) = iter.opt_len() { // Drive this in indexed mode for better `collect`. (0..len) .into_par_iter() - .map(offset(self.range.start)) + .map(offset(iter.range.start)) .drive(consumer) } else { - bridge_unindexed(IterProducer { range: self.range }, consumer) + bridge_unindexed(IterProducer { range: iter.range }, consumer) } } - fn opt_len(&self) -> Option { - let len = self.range.len(); - if len <= usize::MAX as $len_t { - Some(len as usize) - } else { - None - } + fn opt_len(iter: &Iter<$t>) -> Option { + usize::try_from(iter.range.len()).ok() } } @@ -366,3 +440,23 @@ fn test_usize_i64_overflow() { let pool = ThreadPoolBuilder::new().num_threads(8).build().unwrap(); pool.install(|| assert_eq!(iter.find_last(|_| true), Some(i64::MAX - 1))); } + +#[test] +fn test_issue_833() { + fn is_even(n: i64) -> bool { + n % 2 == 0 + } + + // The integer type should be inferred from `is_even` + let v: Vec<_> = (1..100).into_par_iter().filter(|&x| is_even(x)).collect(); + assert!(v.into_iter().eq((2..100).step_by(2))); + + // Try examples with indexed iterators too + let pos = (0..100).into_par_iter().position_any(|x| x == 50i16); + assert_eq!(pos, Some(50usize)); + + assert!((0..100) + .into_par_iter() + .zip(0..100) + .all(|(a, b)| i16::eq(&a, &b))); +} diff --git a/src/range_inclusive.rs b/src/range_inclusive.rs index c802b6c..b7bb0ca 100644 --- a/src/range_inclusive.rs +++ b/src/range_inclusive.rs @@ -21,10 +21,10 @@ use crate::iter::*; use std::char; use std::ops::RangeInclusive; -/// Parallel iterator over an inclusive range, implemented for all integer types. +/// Parallel iterator over an inclusive range, implemented for all integer types and `char`. /// /// **Note:** The `zip` operation requires `IndexedParallelIterator` -/// which is only implemented for `u8`, `i8`, `u16`, and `i16`. +/// which is only implemented for `u8`, `i8`, `u16`, `i16`, and `char`. /// /// ``` /// use rayon::prelude::*; @@ -71,6 +71,7 @@ where } } +/// Implemented for ranges of all primitive integer types and `char`. impl IntoParallelIterator for RangeInclusive where Iter: ParallelIterator, @@ -83,34 +84,109 @@ where } } +/// These traits help drive integer type inference. Without them, an unknown `{integer}` type only +/// has constraints on `Iter<{integer}>`, which will probably give up and use `i32`. By adding +/// these traits on the item type, the compiler can see a more direct constraint to infer like +/// `{integer}: RangeInteger`, which works better. See `test_issue_833` for an example. +/// +/// They have to be `pub` since they're seen in the public `impl ParallelIterator` constraints, but +/// we put them in a private modules so they're not actually reachable in our public API. +mod private { + use super::*; + + /// Implementation details of `ParallelIterator for Iter` + pub trait RangeInteger: Sized + Send { + private_decl! {} + + fn drive_unindexed(iter: Iter, consumer: C) -> C::Result + where + C: UnindexedConsumer; + + fn opt_len(iter: &Iter) -> Option; + } + + /// Implementation details of `IndexedParallelIterator for Iter` + pub trait IndexedRangeInteger: RangeInteger { + private_decl! {} + + fn drive(iter: Iter, consumer: C) -> C::Result + where + C: Consumer; + + fn len(iter: &Iter) -> usize; + + fn with_producer(iter: Iter, callback: CB) -> CB::Output + where + CB: ProducerCallback; + } +} +use private::{IndexedRangeInteger, RangeInteger}; + +impl ParallelIterator for Iter { + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + T::drive_unindexed(self, consumer) + } + + #[inline] + fn opt_len(&self) -> Option { + T::opt_len(self) + } +} + +impl IndexedParallelIterator for Iter { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + T::drive(self, consumer) + } + + #[inline] + fn len(&self) -> usize { + T::len(self) + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + T::with_producer(self, callback) + } +} + macro_rules! convert { - ( $self:ident . $method:ident ( $( $arg:expr ),* ) ) => { - if let Some((start, end)) = $self.bounds() { + ( $iter:ident . $method:ident ( $( $arg:expr ),* ) ) => { + if let Some((start, end)) = $iter.bounds() { if let Some(end) = end.checked_add(1) { (start..end).into_par_iter().$method($( $arg ),*) } else { (start..end).into_par_iter().chain(once(end)).$method($( $arg ),*) } } else { - empty::().$method($( $arg ),*) + empty::().$method($( $arg ),*) } }; } macro_rules! parallel_range_impl { ( $t:ty ) => { - impl ParallelIterator for Iter<$t> { - type Item = $t; + impl RangeInteger for $t { + private_impl! {} - fn drive_unindexed(self, consumer: C) -> C::Result + fn drive_unindexed(iter: Iter<$t>, consumer: C) -> C::Result where - C: UnindexedConsumer, + C: UnindexedConsumer<$t>, { - convert!(self.drive_unindexed(consumer)) + convert!(iter.drive_unindexed(consumer)) } - fn opt_len(&self) -> Option { - convert!(self.opt_len()) + fn opt_len(iter: &Iter<$t>) -> Option { + convert!(iter.opt_len()) } } }; @@ -120,23 +196,25 @@ macro_rules! indexed_range_impl { ( $t:ty ) => { parallel_range_impl! { $t } - impl IndexedParallelIterator for Iter<$t> { - fn drive(self, consumer: C) -> C::Result + impl IndexedRangeInteger for $t { + private_impl! {} + + fn drive(iter: Iter<$t>, consumer: C) -> C::Result where - C: Consumer, + C: Consumer<$t>, { - convert!(self.drive(consumer)) + convert!(iter.drive(consumer)) } - fn len(&self) -> usize { - self.range.len() + fn len(iter: &Iter<$t>) -> usize { + iter.range.len() } - fn with_producer(self, callback: CB) -> CB::Output + fn with_producer(iter: Iter<$t>, callback: CB) -> CB::Output where - CB: ProducerCallback, + CB: ProducerCallback<$t>, { - convert!(self.with_producer(callback)) + convert!(iter.with_producer(callback)) } } }; @@ -179,7 +257,7 @@ macro_rules! convert_char { .$method($( $arg ),*) } } else { - empty().into_par_iter().$method($( $arg ),*) + empty::().$method($( $arg ),*) } }; } @@ -286,3 +364,23 @@ fn test_usize_i64_overflow() { let pool = ThreadPoolBuilder::new().num_threads(8).build().unwrap(); pool.install(|| assert_eq!(iter.find_last(|_| true), Some(i64::MAX))); } + +#[test] +fn test_issue_833() { + fn is_even(n: i64) -> bool { + n % 2 == 0 + } + + // The integer type should be inferred from `is_even` + let v: Vec<_> = (1..=100).into_par_iter().filter(|&x| is_even(x)).collect(); + assert!(v.into_iter().eq((2..=100).step_by(2))); + + // Try examples with indexed iterators too + let pos = (0..=100).into_par_iter().position_any(|x| x == 50i16); + assert_eq!(pos, Some(50usize)); + + assert!((0..=100) + .into_par_iter() + .zip(0..=100) + .all(|(a, b)| i16::eq(&a, &b))); +} diff --git a/src/slice/mergesort.rs b/src/slice/mergesort.rs index a007cae..e9a5d43 100644 --- a/src/slice/mergesort.rs +++ b/src/slice/mergesort.rs @@ -284,7 +284,7 @@ fn collapse(runs: &[Run]) -> Option { /// Otherwise, it sorts the slice into non-descending order. /// /// This merge sort borrows some (but not all) ideas from TimSort, which is described in detail -/// [here](http://svn.python.org/projects/python/trunk/Objects/listsort.txt). +/// [here](https://svn.python.org/projects/python/trunk/Objects/listsort.txt). /// /// The algorithm identifies strictly descending and non-descending subsequences, which are called /// natural runs. There is a stack of pending runs yet to be merged. Each newly found run is pushed @@ -739,12 +739,12 @@ mod tests { check(&[1, 2, 2, 2, 2, 3], &[]); check(&[], &[1, 2, 2, 2, 2, 3]); - let mut rng = thread_rng(); + let ref mut rng = thread_rng(); for _ in 0..100 { - let limit: u32 = rng.gen_range(1, 21); - let left_len: usize = rng.gen_range(0, 20); - let right_len: usize = rng.gen_range(0, 20); + let limit: u32 = rng.gen_range(1..21); + let left_len: usize = rng.gen_range(0..20); + let right_len: usize = rng.gen_range(0..20); let mut left = rng .sample_iter(&Uniform::new(0, limit)) diff --git a/src/slice/mod.rs b/src/slice/mod.rs index b80125f..d47b0d3 100644 --- a/src/slice/mod.rs +++ b/src/slice/mod.rs @@ -470,15 +470,6 @@ impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] { } } -impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec { - type Item = &'data T; - type Iter = Iter<'data, T>; - - fn into_par_iter(self) -> Self::Iter { - Iter { slice: self } - } -} - impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut [T] { type Item = &'data mut T; type Iter = IterMut<'data, T>; @@ -488,15 +479,6 @@ impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut [T] { } } -impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec { - type Item = &'data mut T; - type Iter = IterMut<'data, T>; - - fn into_par_iter(self) -> Self::Iter { - IterMut { slice: self } - } -} - /// Parallel iterator over immutable items in a slice #[derive(Debug)] pub struct Iter<'data, T: Sync> { diff --git a/src/slice/quicksort.rs b/src/slice/quicksort.rs index b985073..17c6f33 100644 --- a/src/slice/quicksort.rs +++ b/src/slice/quicksort.rs @@ -229,7 +229,7 @@ where /// Partitioning is performed block-by-block in order to minimize the cost of branching operations. /// This idea is presented in the [BlockQuicksort][pdf] paper. /// -/// [pdf]: http://drops.dagstuhl.de/opus/volltexte/2016/6389/pdf/LIPIcs-ESA-2016-38.pdf +/// [pdf]: https://drops.dagstuhl.de/opus/volltexte/2016/6389/pdf/LIPIcs-ESA-2016-38.pdf fn partition_in_blocks(v: &mut [T], pivot: &T, is_less: &F) -> usize where F: Fn(&T, &T) -> bool, @@ -766,7 +766,7 @@ mod tests { #[test] fn test_heapsort() { - let rng = thread_rng(); + let ref mut rng = thread_rng(); for len in (0..25).chain(500..501) { for &modulus in &[5, 10, 100] { diff --git a/src/slice/test.rs b/src/slice/test.rs index 97de7d8..71743e2 100644 --- a/src/slice/test.rs +++ b/src/slice/test.rs @@ -10,7 +10,7 @@ macro_rules! sort { ($f:ident, $name:ident) => { #[test] fn $name() { - let mut rng = thread_rng(); + let ref mut rng = thread_rng(); for len in (0..25).chain(500..501) { for &modulus in &[5, 10, 100] { @@ -105,7 +105,7 @@ fn test_par_sort_stability() { let mut rng = thread_rng(); let mut v: Vec<_> = (0..len) .map(|_| { - let n: usize = rng.gen_range(0, 10); + let n: usize = rng.gen_range(0..10); counts[n] += 1; (n, counts[n]) }) diff --git a/src/vec.rs b/src/vec.rs index 686673b..1e68aa0 100644 --- a/src/vec.rs +++ b/src/vec.rs @@ -8,12 +8,31 @@ use crate::iter::plumbing::*; use crate::iter::*; use crate::math::simplify_range; +use crate::slice::{Iter, IterMut}; use std::iter; use std::mem; use std::ops::{Range, RangeBounds}; use std::ptr; use std::slice; +impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec { + type Item = &'data T; + type Iter = Iter<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&[T]>::into_par_iter(self) + } +} + +impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec { + type Item = &'data mut T; + type Iter = IterMut<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&mut [T]>::into_par_iter(self) + } +} + /// Parallel iterator that moves out of a vector. #[derive(Debug, Clone)] pub struct IntoIter { @@ -122,12 +141,16 @@ impl<'data, T: Send> IndexedParallelIterator for Drain<'data, T> { let start = self.range.start; self.vec.set_len(start); - // Get a correct borrow lifetime, then extend it to the original length. - let mut slice = &mut self.vec[start..]; - slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len()); + // Create the producer as the exclusive "owner" of the slice. + let producer = { + // Get a correct borrow lifetime, then extend it to the original length. + let mut slice = &mut self.vec[start..]; + slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len()); + DrainProducer::new(slice) + }; // The producer will move or drop each item from the drained range. - callback.callback(DrainProducer::new(slice)) + callback.callback(producer) } } } @@ -208,7 +231,9 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { type Item = T; fn next(&mut self) -> Option { - let ptr = self.iter.next()?; + // Coerce the pointer early, so we don't keep the + // reference that's about to be invalidated. + let ptr: *const T = self.iter.next()?; Some(unsafe { ptr::read(ptr) }) } @@ -223,7 +248,9 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { fn next_back(&mut self) -> Option { - let ptr = self.iter.next_back()?; + // Coerce the pointer early, so we don't keep the + // reference that's about to be invalidated. + let ptr: *const T = self.iter.next_back()?; Some(unsafe { ptr::read(ptr) }) } } diff --git a/tests/clones.rs b/tests/clones.rs index fa93f8a..9add775 100644 --- a/tests/clones.rs +++ b/tests/clones.rs @@ -107,6 +107,12 @@ fn clone_vec() { check(v.into_par_iter()); } +#[test] +fn clone_array() { + let a = [0i32; 100]; + check(a.into_par_iter()); +} + #[test] fn clone_adaptors() { let v: Vec<_> = (0..1000).map(Some).collect(); diff --git a/tests/debug.rs b/tests/debug.rs index fb11110..3584ba0 100644 --- a/tests/debug.rs +++ b/tests/debug.rs @@ -130,6 +130,12 @@ fn debug_vec() { check(v.into_par_iter()); } +#[test] +fn debug_array() { + let a = [0i32; 10]; + check(a.into_par_iter()); +} + #[test] fn debug_adaptors() { let v: Vec<_> = (0..10).collect(); diff --git a/tests/producer_split_at.rs b/tests/producer_split_at.rs index 752bc3e..3a49059 100644 --- a/tests/producer_split_at.rs +++ b/tests/producer_split_at.rs @@ -108,6 +108,12 @@ fn check_len(iter: &I, len: usize) { // **** Base Producers **** +#[test] +fn array() { + let a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + check(&a, || a); +} + #[test] fn empty() { let v = vec![42]; diff --git a/tests/sort-panic-safe.rs b/tests/sort-panic-safe.rs index 7c50505..00a9731 100644 --- a/tests/sort-panic-safe.rs +++ b/tests/sort-panic-safe.rs @@ -132,7 +132,7 @@ fn sort_panic_safe() { let mut rng = thread_rng(); let mut input = (0..len) .map(|id| DropCounter { - x: rng.gen_range(0, modulus), + x: rng.gen_range(0..modulus), id, version: Cell::new(0), }) -- cgit v1.2.3