aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-06-09 20:22:04 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-06-09 20:22:04 +0000
commit489816d97874aea123d4c83daf76be3a91174b3c (patch)
treea915bf2398b238dbd70ae338a6bb643a7372de84
parenta09246c27bd61f6f5b6009217c51aa39e98bf65a (diff)
parentdd2305def6fff1d9f1149ecb73b02e37eb8a18b0 (diff)
downloadrayon-489816d97874aea123d4c83daf76be3a91174b3c.tar.gz
Upgrade rust/crates/rayon to 1.5.1 am: dd2305def6android-s-beta-4android-s-beta-3android-s-beta-4
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/rayon/+/1712353 Change-Id: I4d7221e9f9e2c32bc6cea6dade537833719f1397
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp18
-rw-r--r--Cargo.toml8
-rw-r--r--Cargo.toml.orig8
-rw-r--r--FAQ.md2
-rw-r--r--METADATA10
-rw-r--r--README.md4
-rw-r--r--RELEASES.md33
-rw-r--r--build.rs3
-rw-r--r--src/array.rs90
-rw-r--r--src/iter/collect/consumer.rs72
-rw-r--r--src/iter/collect/mod.rs78
-rw-r--r--src/iter/collect/test.rs2
-rw-r--r--src/iter/mod.rs81
-rw-r--r--src/iter/par_bridge.rs38
-rw-r--r--src/iter/unzip.rs61
-rw-r--r--src/lib.rs5
-rw-r--r--src/range.rs156
-rw-r--r--src/range_inclusive.rs142
-rw-r--r--src/slice/mergesort.rs10
-rw-r--r--src/slice/mod.rs18
-rw-r--r--src/slice/quicksort.rs4
-rw-r--r--src/slice/test.rs4
-rw-r--r--src/vec.rs39
-rw-r--r--tests/clones.rs6
-rw-r--r--tests/debug.rs6
-rw-r--r--tests/producer_split_at.rs6
-rw-r--r--tests/sort-panic-safe.rs2
28 files changed, 700 insertions, 208 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c28c857..c8aa4e4 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "dc13cb7875ad43c7d1ea8b1e504b09c031f7ed5a"
+ "sha1": "ebcb09b1dc53211c6b5abdf4dc5b40e4bcd0a965"
}
}
diff --git a/Android.bp b/Android.bp
index b05b5b2..101bf04 100644
--- a/Android.bp
+++ b/Android.bp
@@ -39,13 +39,13 @@ license {
rust_library {
name: "librayon",
- // has rustc warnings
host_supported: true,
crate_name: "rayon",
srcs: ["src/lib.rs"],
edition: "2018",
- flags: [
- "--cfg step_by",
+ cfgs: [
+ "min_const_generics",
+ "step_by",
],
rustlibs: [
"libcrossbeam_deque",
@@ -57,14 +57,14 @@ rust_library {
// dependent_library ["feature_list"]
// autocfg-1.0.1
// cfg-if-1.0.0
-// crossbeam-channel-0.5.0 "crossbeam-utils,default,std"
+// crossbeam-channel-0.5.1 "crossbeam-utils,default,std"
// crossbeam-deque-0.8.0 "crossbeam-epoch,crossbeam-utils,default,std"
-// crossbeam-epoch-0.9.3 "alloc,lazy_static,std"
-// crossbeam-utils-0.8.3 "default,lazy_static,std"
+// crossbeam-epoch-0.9.4 "alloc,lazy_static,std"
+// crossbeam-utils-0.8.4 "default,lazy_static,std"
// either-1.6.1
// lazy_static-1.4.0
-// libc-0.2.88 "default,std"
-// memoffset-0.6.1 "default"
+// libc-0.2.94 "default,std"
+// memoffset-0.6.3 "default"
// num_cpus-1.13.0
-// rayon-core-1.9.0
+// rayon-core-1.9.1
// scopeguard-1.1.0
diff --git a/Cargo.toml b/Cargo.toml
index 2129b6f..3b8921d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "rayon"
-version = "1.5.0"
+version = "1.5.1"
authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"]
exclude = ["/ci/*", "/scripts/*", "/.github/*", "/bors.toml"]
description = "Simple work-stealing parallelism for Rust"
@@ -31,7 +31,7 @@ version = "1.0"
default-features = false
[dependencies.rayon-core]
-version = "1.9.0"
+version = "1.9.1"
[dev-dependencies.docopt]
version = "1"
@@ -39,10 +39,10 @@ version = "1"
version = "1"
[dev-dependencies.rand]
-version = "0.7"
+version = "0.8"
[dev-dependencies.rand_xorshift]
-version = "0.2"
+version = "0.3"
[dev-dependencies.serde]
version = "1.0.85"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index a7600e1..6ab9dda 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "rayon"
# Reminder to update html_rool_url in lib.rs when updating version
-version = "1.5.0"
+version = "1.5.1"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Simple work-stealing parallelism for Rust"
@@ -19,7 +19,7 @@ members = ["rayon-demo", "rayon-core"]
exclude = ["ci"]
[dependencies]
-rayon-core = { version = "1.9.0", path = "rayon-core" }
+rayon-core = { version = "1.9.1", path = "rayon-core" }
crossbeam-deque = "0.8.0"
# This is a public dependency!
@@ -30,8 +30,8 @@ default-features = false
[dev-dependencies]
docopt = "1"
lazy_static = "1"
-rand = "0.7"
-rand_xorshift = "0.2"
+rand = "0.8"
+rand_xorshift = "0.3"
[dev-dependencies.serde]
version = "1.0.85"
diff --git a/FAQ.md b/FAQ.md
index 11117d3..745f033 100644
--- a/FAQ.md
+++ b/FAQ.md
@@ -86,7 +86,7 @@ tscounter.store(value + 1, Ordering::SeqCst);
You can already see that the `AtomicUsize` API is a bit more complex,
as it requires you to specify an
-[ordering](http://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html). (I
+[ordering](https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html). (I
won't go into the details on ordering here, but suffice to say that if
you don't know what an ordering is, and probably even if you do, you
should use `Ordering::SeqCst`.) The danger in this parallel version of
diff --git a/METADATA b/METADATA
index 7d115ed..ddf9f80 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/rayon/rayon-1.5.0.crate"
+ value: "https://static.crates.io/crates/rayon/rayon-1.5.1.crate"
}
- version: "1.5.0"
+ version: "1.5.1"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 12
- day: 21
+ year: 2021
+ month: 5
+ day: 19
}
}
diff --git a/README.md b/README.md
index 869c537..4c4dff6 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon)
[![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon)
![minimum rustc 1.36](https://img.shields.io/badge/rustc-1.36+-red.svg)
-![build status](https://github.com/rayon-rs/rayon/workflows/master/badge.svg)
+[![build status](https://github.com/rayon-rs/rayon/workflows/master/badge.svg)](https://github.com/rayon-rs/rayon/actions)
[![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Rayon is a data-parallelism library for Rust. It is extremely
@@ -15,7 +15,7 @@ Belt Rust conference.) Rayon is
[available on crates.io](https://crates.io/crates/rayon), and
[API Documentation is available on docs.rs](https://docs.rs/rayon/).
-[blog]: http://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/
+[blog]: https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/
[video]: https://www.youtube.com/watch?v=gof_OEv71Aw
## Parallel iterators and more
diff --git a/RELEASES.md b/RELEASES.md
index fa452c3..0299436 100644
--- a/RELEASES.md
+++ b/RELEASES.md
@@ -1,3 +1,30 @@
+# Release rayon 1.5.1 / rayon-core 1.9.1 (2021-05-18)
+
+- The new `in_place_scope` and `in_place_scope_fifo` are variations of `scope`
+ and `scope_fifo`, running the initial non-`Send` callback directly on the
+ current thread, rather than moving execution to the thread pool.
+- With Rust 1.51 or later, arrays now implement `IntoParallelIterator`.
+- New implementations of `FromParallelIterator` make it possible to `collect`
+ complicated nestings of items.
+ - `FromParallelIterator<(A, B)> for (FromA, FromB)` works like `unzip`.
+ - `FromParallelIterator<Either<L, R>> for (A, B)` works like `partition_map`.
+- Type inference now works better with parallel `Range` and `RangeInclusive`.
+- The implementation of `FromParallelIterator` and `ParallelExtend` for
+ `Vec<T>` now uses `MaybeUninit<T>` internally to avoid creating any
+ references to uninitialized data.
+- `ParallelBridge` fixed a bug with threads missing available work.
+
+## Contributors
+
+Thanks to all of the contributors for this release!
+
+- @atouchet
+- @cuviper
+- @Hywan
+- @iRaiko
+- @Qwaz
+- @rocallahan
+
# Release rayon 1.5.0 / rayon-core 1.9.0 (2020-10-21)
- Update crossbeam dependencies.
@@ -606,7 +633,7 @@ Thanks to the following people for their contributions to this release:
This release includes a lot of progress towards the goal of parity
with the sequential iterator API, though there are still a few methods
that are not yet complete. If you'd like to help with that effort,
-[check out the milestone](https://github.com/nikomatsakis/rayon/issues?q=is%3Aopen+is%3Aissue+milestone%3A%22Parity+with+the+%60Iterator%60+trait%22)
+[check out the milestone](https://github.com/rayon-rs/rayon/issues?q=is%3Aopen+is%3Aissue+milestone%3A%22Parity+with+the+%60Iterator%60+trait%22)
to see the remaining issues.
**Announcement:** @cuviper has been added as a collaborator to the
@@ -672,7 +699,7 @@ API. Thanks @cuviper! Keep it up.
- We are considering removing weights or changing the weight mechanism
before 1.0. Examples of scenarios where you still need weights even
with this adaptive mechanism would be great. Join the discussion
- at <https://github.com/nikomatsakis/rayon/issues/111>.
+ at <https://github.com/rayon-rs/rayon/issues/111>.
- New (unstable) scoped threads API, see `rayon::scope` for details.
- You will need to supply the [cargo feature] `unstable`.
- The various demos and benchmarks have been consolidated into one
@@ -682,7 +709,7 @@ API. Thanks @cuviper! Keep it up.
- Various internal cleanup in the implementation and typo fixes.
Thanks @cuviper, @Eh2406, and @spacejam!
-[cargo feature]: http://doc.crates.io/manifest.html#the-features-section
+[cargo feature]: https://doc.rust-lang.org/cargo/reference/features.html#the-features-section
# Release 0.4.2 (2016-09-15)
diff --git a/build.rs b/build.rs
index 70e301b..e52e326 100644
--- a/build.rs
+++ b/build.rs
@@ -3,4 +3,7 @@ fn main() {
if ac.probe_expression("(0..10).step_by(2).rev()") {
autocfg::emit("step_by");
}
+ if ac.probe_expression("{ fn foo<const N: usize>() {} }") {
+ autocfg::emit("min_const_generics");
+ }
}
diff --git a/src/array.rs b/src/array.rs
new file mode 100644
index 0000000..7922cd6
--- /dev/null
+++ b/src/array.rs
@@ -0,0 +1,90 @@
+#![cfg(min_const_generics)]
+//! Parallel iterator types for [arrays] (`[T; N]`)
+//!
+//! You will rarely need to interact with this module directly unless you need
+//! to name one of the iterator types.
+//!
+//! Everything in this module requires const generics, stabilized in Rust 1.51.
+//!
+//! [arrays]: https://doc.rust-lang.org/std/primitive.array.html
+
+use crate::iter::plumbing::*;
+use crate::iter::*;
+use crate::slice::{Iter, IterMut};
+use crate::vec::DrainProducer;
+use std::mem::ManuallyDrop;
+
+/// This implementation requires const generics, stabilized in Rust 1.51.
+impl<'data, T: Sync + 'data, const N: usize> IntoParallelIterator for &'data [T; N] {
+ type Item = &'data T;
+ type Iter = Iter<'data, T>;
+
+ fn into_par_iter(self) -> Self::Iter {
+ <&[T]>::into_par_iter(self)
+ }
+}
+
+/// This implementation requires const generics, stabilized in Rust 1.51.
+impl<'data, T: Send + 'data, const N: usize> IntoParallelIterator for &'data mut [T; N] {
+ type Item = &'data mut T;
+ type Iter = IterMut<'data, T>;
+
+ fn into_par_iter(self) -> Self::Iter {
+ <&mut [T]>::into_par_iter(self)
+ }
+}
+
+/// This implementation requires const generics, stabilized in Rust 1.51.
+impl<T: Send, const N: usize> IntoParallelIterator for [T; N] {
+ type Item = T;
+ type Iter = IntoIter<T, N>;
+
+ fn into_par_iter(self) -> Self::Iter {
+ IntoIter { array: self }
+ }
+}
+
+/// Parallel iterator that moves out of an array.
+#[derive(Debug, Clone)]
+pub struct IntoIter<T: Send, const N: usize> {
+ array: [T; N],
+}
+
+impl<T: Send, const N: usize> ParallelIterator for IntoIter<T, N> {
+ type Item = T;
+
+ fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ where
+ C: UnindexedConsumer<Self::Item>,
+ {
+ bridge(self, consumer)
+ }
+
+ fn opt_len(&self) -> Option<usize> {
+ Some(N)
+ }
+}
+
+impl<T: Send, const N: usize> IndexedParallelIterator for IntoIter<T, N> {
+ fn drive<C>(self, consumer: C) -> C::Result
+ where
+ C: Consumer<Self::Item>,
+ {
+ bridge(self, consumer)
+ }
+
+ fn len(&self) -> usize {
+ N
+ }
+
+ fn with_producer<CB>(self, callback: CB) -> CB::Output
+ where
+ CB: ProducerCallback<Self::Item>,
+ {
+ unsafe {
+ // Drain every item, and then the local array can just fall out of scope.
+ let mut array = ManuallyDrop::new(self.array);
+ callback.callback(DrainProducer::new(&mut *array))
+ }
+ }
+}
diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs
index 689f29c..3a8eea0 100644
--- a/src/iter/collect/consumer.rs
+++ b/src/iter/collect/consumer.rs
@@ -1,26 +1,18 @@
use super::super::plumbing::*;
use std::marker::PhantomData;
+use std::mem::MaybeUninit;
use std::ptr;
use std::slice;
pub(super) struct CollectConsumer<'c, T: Send> {
/// A slice covering the target memory, not yet initialized!
- target: &'c mut [T],
-}
-
-pub(super) struct CollectFolder<'c, T: Send> {
- /// The folder writes into `result` and must extend the result
- /// up to exactly this number of elements.
- final_len: usize,
-
- /// The current written-to part of our slice of the target
- result: CollectResult<'c, T>,
+ target: &'c mut [MaybeUninit<T>],
}
impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// The target memory is considered uninitialized, and will be
/// overwritten without reading or dropping existing values.
- pub(super) fn new(target: &'c mut [T]) -> Self {
+ pub(super) fn new(target: &'c mut [MaybeUninit<T>]) -> Self {
CollectConsumer { target }
}
}
@@ -31,8 +23,12 @@ impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// the elements will be dropped, unless its ownership is released before then.
#[must_use]
pub(super) struct CollectResult<'c, T> {
- start: *mut T,
+ /// A slice covering the target memory, initialized up to our separate `len`.
+ target: &'c mut [MaybeUninit<T>],
+ /// The current initialized length in `target`
len: usize,
+ /// Lifetime invariance guarantees that the data flows from consumer to result,
+ /// especially for the `scope_fn` callback in `Collect::with_consumer`.
invariant_lifetime: PhantomData<&'c mut &'c mut [T]>,
}
@@ -57,13 +53,15 @@ impl<'c, T> Drop for CollectResult<'c, T> {
// Drop the first `self.len` elements, which have been recorded
// to be initialized by the folder.
unsafe {
- ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len));
+ // TODO: use `MaybeUninit::slice_as_mut_ptr`
+ let start = self.target.as_mut_ptr() as *mut T;
+ ptr::drop_in_place(slice::from_raw_parts_mut(start, self.len));
}
}
}
impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
- type Folder = CollectFolder<'c, T>;
+ type Folder = CollectResult<'c, T>;
type Reducer = CollectReducer;
type Result = CollectResult<'c, T>;
@@ -80,16 +78,13 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
)
}
- fn into_folder(self) -> CollectFolder<'c, T> {
- // Create a folder that consumes values and writes them
+ fn into_folder(self) -> Self::Folder {
+ // Create a result/folder that consumes values and writes them
// into target. The initial result has length 0.
- CollectFolder {
- final_len: self.target.len(),
- result: CollectResult {
- start: self.target.as_mut_ptr(),
- len: 0,
- invariant_lifetime: PhantomData,
- },
+ CollectResult {
+ target: self.target,
+ len: 0,
+ invariant_lifetime: PhantomData,
}
}
@@ -98,19 +93,19 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
}
}
-impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
- type Result = CollectResult<'c, T>;
+impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> {
+ type Result = Self;
- fn consume(mut self, item: T) -> CollectFolder<'c, T> {
- if self.result.len >= self.final_len {
- panic!("too many values pushed to consumer");
- }
+ fn consume(mut self, item: T) -> Self {
+ let dest = self
+ .target
+ .get_mut(self.len)
+ .expect("too many values pushed to consumer");
- // Compute target pointer and write to it, and
- // extend the current result by one element
+ // Write item and increase the initialized length
unsafe {
- self.result.start.add(self.result.len).write(item);
- self.result.len += 1;
+ dest.as_mut_ptr().write(item);
+ self.len += 1;
}
self
@@ -119,7 +114,7 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
fn complete(self) -> Self::Result {
// NB: We don't explicitly check that the local writes were complete,
// but Collect will assert the total result length in the end.
- self.result
+ self
}
fn full(&self) -> bool {
@@ -151,8 +146,13 @@ impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer {
// Merge if the CollectResults are adjacent and in left to right order
// else: drop the right piece now and total length will end up short in the end,
// when the correctness of the collected result is asserted.
- if left.start.wrapping_add(left.len) == right.start {
- left.len += right.release_ownership();
+ let left_end = left.target[left.len..].as_ptr();
+ if left_end == right.target.as_ptr() {
+ let len = left.len + right.release_ownership();
+ unsafe {
+ left.target = slice::from_raw_parts_mut(left.target.as_mut_ptr(), len);
+ }
+ left.len = len;
}
left
}
diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs
index e18298e..7cbf215 100644
--- a/src/iter/collect/mod.rs
+++ b/src/iter/collect/mod.rs
@@ -1,4 +1,5 @@
use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
+use std::mem::MaybeUninit;
use std::slice;
mod consumer;
@@ -88,55 +89,56 @@ impl<'c, T: Send + 'c> Collect<'c, T> {
where
F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
{
+ let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
+ let result = scope_fn(CollectConsumer::new(slice));
+
+ // The CollectResult represents a contiguous part of the
+ // slice, that has been written to.
+ // On unwind here, the CollectResult will be dropped.
+ // If some producers on the way did not produce enough elements,
+ // partial CollectResults may have been dropped without
+ // being reduced to the final result, and we will see
+ // that as the length coming up short.
+ //
+ // Here, we assert that `slice` is fully initialized. This is
+ // checked by the following assert, which verifies if a
+ // complete CollectResult was produced; if the length is
+ // correct, it is necessarily covering the target slice.
+ // Since we know that the consumer cannot have escaped from
+ // `drive` (by parametricity, essentially), we know that any
+ // stores that will happen, have happened. Unless some code is buggy,
+ // that means we should have seen `len` total writes.
+ let actual_writes = result.len();
+ assert!(
+ actual_writes == self.len,
+ "expected {} total writes, but got {}",
+ self.len,
+ actual_writes
+ );
+
+ // Release the result's mutable borrow and "proxy ownership"
+ // of the elements, before the vector takes it over.
+ result.release_ownership();
+
+ let new_len = self.vec.len() + self.len;
+
unsafe {
- let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
- let result = scope_fn(CollectConsumer::new(slice));
-
- // The CollectResult represents a contiguous part of the
- // slice, that has been written to.
- // On unwind here, the CollectResult will be dropped.
- // If some producers on the way did not produce enough elements,
- // partial CollectResults may have been dropped without
- // being reduced to the final result, and we will see
- // that as the length coming up short.
- //
- // Here, we assert that `slice` is fully initialized. This is
- // checked by the following assert, which verifies if a
- // complete CollectResult was produced; if the length is
- // correct, it is necessarily covering the target slice.
- // Since we know that the consumer cannot have escaped from
- // `drive` (by parametricity, essentially), we know that any
- // stores that will happen, have happened. Unless some code is buggy,
- // that means we should have seen `len` total writes.
- let actual_writes = result.len();
- assert!(
- actual_writes == self.len,
- "expected {} total writes, but got {}",
- self.len,
- actual_writes
- );
-
- // Release the result's mutable borrow and "proxy ownership"
- // of the elements, before the vector takes it over.
- result.release_ownership();
-
- let new_len = self.vec.len() + self.len;
self.vec.set_len(new_len);
}
}
/// Reserve space for `len` more elements in the vector,
/// and return a slice to the uninitialized tail of the vector
- ///
- /// Safety: The tail slice is uninitialized
- unsafe fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [T] {
+ fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
// Reserve the new space.
vec.reserve(len);
- // Get a correct borrow, then extend it for the newly added length.
+ // TODO: use `Vec::spare_capacity_mut` instead
+ // SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
+ // as `T`, and we already made sure to have the additional space.
let start = vec.len();
- let slice = &mut vec[start..];
- slice::from_raw_parts_mut(slice.as_mut_ptr(), len)
+ let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
+ unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
}
}
diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs
index 00c16c4..ddf7757 100644
--- a/src/iter/collect/test.rs
+++ b/src/iter/collect/test.rs
@@ -24,7 +24,7 @@ fn produce_too_many_items() {
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
- folder.consume(24);
+ folder = folder.consume(24);
unreachable!("folder does not complete")
});
}
diff --git a/src/iter/mod.rs b/src/iter/mod.rs
index 0c82933..edff1a6 100644
--- a/src/iter/mod.rs
+++ b/src/iter/mod.rs
@@ -61,7 +61,7 @@
//! If you'd like to build a custom parallel iterator, or to write your own
//! combinator, then check out the [split] function and the [plumbing] module.
//!
-//! [regular iterator]: http://doc.rust-lang.org/std/iter/trait.Iterator.html
+//! [regular iterator]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
//! [`ParallelIterator`]: trait.ParallelIterator.html
//! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
//! [split]: fn.split.html
@@ -1966,6 +1966,79 @@ pub trait ParallelIterator: Sized + Send {
///
/// assert_eq!(sync_vec, async_vec);
/// ```
+ ///
+ /// You can collect a pair of collections like [`unzip`](#method.unzip)
+ /// for paired items:
+ ///
+ /// ```
+ /// use rayon::prelude::*;
+ ///
+ /// let a = [(0, 1), (1, 2), (2, 3), (3, 4)];
+ /// let (first, second): (Vec<_>, Vec<_>) = a.into_par_iter().collect();
+ ///
+ /// assert_eq!(first, [0, 1, 2, 3]);
+ /// assert_eq!(second, [1, 2, 3, 4]);
+ /// ```
+ ///
+ /// Or like [`partition_map`](#method.partition_map) for `Either` items:
+ ///
+ /// ```
+ /// use rayon::prelude::*;
+ /// use rayon::iter::Either;
+ ///
+ /// let (left, right): (Vec<_>, Vec<_>) = (0..8).into_par_iter().map(|x| {
+ /// if x % 2 == 0 {
+ /// Either::Left(x * 4)
+ /// } else {
+ /// Either::Right(x * 3)
+ /// }
+ /// }).collect();
+ ///
+ /// assert_eq!(left, [0, 8, 16, 24]);
+ /// assert_eq!(right, [3, 9, 15, 21]);
+ /// ```
+ ///
+ /// You can even collect an arbitrarily-nested combination of pairs and `Either`:
+ ///
+ /// ```
+ /// use rayon::prelude::*;
+ /// use rayon::iter::Either;
+ ///
+ /// let (first, (left, right)): (Vec<_>, (Vec<_>, Vec<_>))
+ /// = (0..8).into_par_iter().map(|x| {
+ /// if x % 2 == 0 {
+ /// (x, Either::Left(x * 4))
+ /// } else {
+ /// (-x, Either::Right(x * 3))
+ /// }
+ /// }).collect();
+ ///
+ /// assert_eq!(first, [0, -1, 2, -3, 4, -5, 6, -7]);
+ /// assert_eq!(left, [0, 8, 16, 24]);
+ /// assert_eq!(right, [3, 9, 15, 21]);
+ /// ```
+ ///
+ /// All of that can _also_ be combined with short-circuiting collection of
+ /// `Result` or `Option` types:
+ ///
+ /// ```
+ /// use rayon::prelude::*;
+ /// use rayon::iter::Either;
+ ///
+ /// let result: Result<(Vec<_>, (Vec<_>, Vec<_>)), _>
+ /// = (0..8).into_par_iter().map(|x| {
+ /// if x > 5 {
+ /// Err(x)
+ /// } else if x % 2 == 0 {
+ /// Ok((x, Either::Left(x * 4)))
+ /// } else {
+ /// Ok((-x, Either::Right(x * 3)))
+ /// }
+ /// }).collect();
+ ///
+ /// let error = result.unwrap_err();
+ /// assert!(error == 6 || error == 7);
+ /// ```
fn collect<C>(self) -> C
where
C: FromParallelIterator<Self::Item>,
@@ -2130,7 +2203,7 @@ pub trait ParallelIterator: Sized + Send {
/// See the [README] for more details on the internals of parallel
/// iterators.
///
- /// [README]: README.md
+ /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>;
@@ -2817,7 +2890,7 @@ pub trait IndexedParallelIterator: ParallelIterator {
/// See the [README] for more details on the internals of parallel
/// iterators.
///
- /// [README]: README.md
+ /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result;
/// Internal method used to define the behavior of this parallel
@@ -2834,7 +2907,7 @@ pub trait IndexedParallelIterator: ParallelIterator {
/// See the [README] for more details on the internals of parallel
/// iterators.
///
- /// [README]: README.md
+ /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output;
}
diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs
index 4c2b96e..339ac1a 100644
--- a/src/iter/par_bridge.rs
+++ b/src/iter/par_bridge.rs
@@ -125,16 +125,19 @@ where
let mut count = self.split_count.load(Ordering::SeqCst);
loop {
- let done = self.done.load(Ordering::SeqCst);
+ // Check if the iterator is exhausted *and* we've consumed every item from it.
+ let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
+
match count.checked_sub(1) {
Some(new_count) if !done => {
- let last_count =
- self.split_count
- .compare_and_swap(count, new_count, Ordering::SeqCst);
- if last_count == count {
- return (self.clone(), Some(self));
- } else {
- count = last_count;
+ match self.split_count.compare_exchange_weak(
+ count,
+ new_count,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => return (self.clone(), Some(self)),
+ Err(last_count) => count = last_count,
}
}
_ => {
@@ -157,13 +160,26 @@ where
}
}
Steal::Empty => {
+ // Don't storm the mutex if we're already done.
if self.done.load(Ordering::SeqCst) {
- // the iterator is out of items, no use in continuing
- return folder;
+ // Someone might have pushed more between our `steal()` and `done.load()`
+ if self.items.is_empty() {
+ // The iterator is out of items, no use in continuing
+ return folder;
+ }
} else {
// our cache is out of items, time to load more from the iterator
match self.iter.try_lock() {
Ok(mut guard) => {
+ // Check `done` again in case we raced with the previous lock
+ // holder on its way out.
+ if self.done.load(Ordering::SeqCst) {
+ if self.items.is_empty() {
+ return folder;
+ }
+ continue;
+ }
+
let count = current_num_threads();
let count = (count * count) * 2;
@@ -184,7 +200,7 @@ where
}
Err(TryLockError::WouldBlock) => {
// someone else has the mutex, just sit tight until it's ready
- yield_now(); //TODO: use a thread=pool-aware yield? (#548)
+ yield_now(); //TODO: use a thread-pool-aware yield? (#548)
}
Err(TryLockError::Poisoned(_)) => {
// any panics from other threads will have been caught by the pool,
diff --git a/src/iter/unzip.rs b/src/iter/unzip.rs
index 219b909..0b7074e 100644
--- a/src/iter/unzip.rs
+++ b/src/iter/unzip.rs
@@ -462,3 +462,64 @@ where
}
}
}
+
+impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
+where
+ A: Send,
+ B: Send,
+ FromA: Send + FromParallelIterator<A>,
+ FromB: Send + FromParallelIterator<B>,
+{
+ fn from_par_iter<I>(pi: I) -> Self
+ where
+ I: IntoParallelIterator<Item = (A, B)>,
+ {
+ let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
+ (a.result.unwrap(), b.result.unwrap())
+ }
+}
+
+impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
+where
+ L: Send,
+ R: Send,
+ A: Send + FromParallelIterator<L>,
+ B: Send + FromParallelIterator<R>,
+{
+ fn from_par_iter<I>(pi: I) -> Self
+ where
+ I: IntoParallelIterator<Item = Either<L, R>>,
+ {
+ fn identity<T>(x: T) -> T {
+ x
+ }
+
+ let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
+ (a.result.unwrap(), b.result.unwrap())
+ }
+}
+
+/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`.
+struct Collector<FromT> {
+ result: Option<FromT>,
+}
+
+impl<FromT> Default for Collector<FromT> {
+ fn default() -> Self {
+ Collector { result: None }
+ }
+}
+
+impl<T, FromT> ParallelExtend<T> for Collector<FromT>
+where
+ T: Send,
+ FromT: Send + FromParallelIterator<T>,
+{
+ fn par_extend<I>(&mut self, pi: I)
+ where
+ I: IntoParallelIterator<Item = T>,
+ {
+ debug_assert!(self.result.is_none());
+ self.result = Some(pi.into_par_iter().collect());
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index d5d0314..b1e6fab 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -91,6 +91,7 @@ mod private;
mod split_producer;
+pub mod array;
pub mod collections;
pub mod iter;
pub mod option;
@@ -114,7 +115,7 @@ pub use rayon_core::ThreadPool;
pub use rayon_core::ThreadPoolBuildError;
pub use rayon_core::ThreadPoolBuilder;
pub use rayon_core::{current_num_threads, current_thread_index};
+pub use rayon_core::{in_place_scope, scope, Scope};
+pub use rayon_core::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use rayon_core::{join, join_context};
-pub use rayon_core::{scope, Scope};
-pub use rayon_core::{scope_fifo, ScopeFifo};
pub use rayon_core::{spawn, spawn_fifo};
diff --git a/src/range.rs b/src/range.rs
index 09ba25e..57b613e 100644
--- a/src/range.rs
+++ b/src/range.rs
@@ -19,10 +19,11 @@
use crate::iter::plumbing::*;
use crate::iter::*;
use std::char;
+use std::convert::TryFrom;
use std::ops::Range;
use std::usize;
-/// Parallel iterator over a range, implemented for all integer types.
+/// Parallel iterator over a range, implemented for all integer types and `char`.
///
/// **Note:** The `zip` operation requires `IndexedParallelIterator`
/// which is not implemented for `u64`, `i64`, `u128`, or `i128`.
@@ -48,6 +49,7 @@ pub struct Iter<T> {
range: Range<T>,
}
+/// Implemented for ranges of all primitive integer types and `char`.
impl<T> IntoParallelIterator for Range<T>
where
Iter<T>: ParallelIterator,
@@ -76,40 +78,117 @@ where
}
}
+/// These traits help drive integer type inference. Without them, an unknown `{integer}` type only
+/// has constraints on `Iter<{integer}>`, which will probably give up and use `i32`. By adding
+/// these traits on the item type, the compiler can see a more direct constraint to infer like
+/// `{integer}: RangeInteger`, which works better. See `test_issue_833` for an example.
+///
+/// They have to be `pub` since they're seen in the public `impl ParallelIterator` constraints, but
+/// we put them in a private modules so they're not actually reachable in our public API.
+mod private {
+ use super::*;
+
+ /// Implementation details of `ParallelIterator for Iter<Self>`
+ pub trait RangeInteger: Sized + Send {
+ private_decl! {}
+
+ fn drive_unindexed<C>(iter: Iter<Self>, consumer: C) -> C::Result
+ where
+ C: UnindexedConsumer<Self>;
+
+ fn opt_len(iter: &Iter<Self>) -> Option<usize>;
+ }
+
+ /// Implementation details of `IndexedParallelIterator for Iter<Self>`
+ pub trait IndexedRangeInteger: RangeInteger {
+ private_decl! {}
+
+ fn drive<C>(iter: Iter<Self>, consumer: C) -> C::Result
+ where
+ C: Consumer<Self>;
+
+ fn len(iter: &Iter<Self>) -> usize;
+
+ fn with_producer<CB>(iter: Iter<Self>, callback: CB) -> CB::Output
+ where
+ CB: ProducerCallback<Self>;
+ }
+}
+use private::{IndexedRangeInteger, RangeInteger};
+
+impl<T: RangeInteger> ParallelIterator for Iter<T> {
+ type Item = T;
+
+ fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ where
+ C: UnindexedConsumer<T>,
+ {
+ T::drive_unindexed(self, consumer)
+ }
+
+ #[inline]
+ fn opt_len(&self) -> Option<usize> {
+ T::opt_len(self)
+ }
+}
+
+impl<T: IndexedRangeInteger> IndexedParallelIterator for Iter<T> {
+ fn drive<C>(self, consumer: C) -> C::Result
+ where
+ C: Consumer<T>,
+ {
+ T::drive(self, consumer)
+ }
+
+ #[inline]
+ fn len(&self) -> usize {
+ T::len(self)
+ }
+
+ fn with_producer<CB>(self, callback: CB) -> CB::Output
+ where
+ CB: ProducerCallback<T>,
+ {
+ T::with_producer(self, callback)
+ }
+}
+
macro_rules! indexed_range_impl {
( $t:ty ) => {
- impl ParallelIterator for Iter<$t> {
- type Item = $t;
+ impl RangeInteger for $t {
+ private_impl! {}
- fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ fn drive_unindexed<C>(iter: Iter<$t>, consumer: C) -> C::Result
where
- C: UnindexedConsumer<Self::Item>,
+ C: UnindexedConsumer<$t>,
{
- bridge(self, consumer)
+ bridge(iter, consumer)
}
- fn opt_len(&self) -> Option<usize> {
- Some(self.len())
+ fn opt_len(iter: &Iter<$t>) -> Option<usize> {
+ Some(iter.range.len())
}
}
- impl IndexedParallelIterator for Iter<$t> {
- fn drive<C>(self, consumer: C) -> C::Result
+ impl IndexedRangeInteger for $t {
+ private_impl! {}
+
+ fn drive<C>(iter: Iter<$t>, consumer: C) -> C::Result
where
- C: Consumer<Self::Item>,
+ C: Consumer<$t>,
{
- bridge(self, consumer)
+ bridge(iter, consumer)
}
- fn len(&self) -> usize {
- self.range.len()
+ fn len(iter: &Iter<$t>) -> usize {
+ iter.range.len()
}
- fn with_producer<CB>(self, callback: CB) -> CB::Output
+ fn with_producer<CB>(iter: Iter<$t>, callback: CB) -> CB::Output
where
- CB: ProducerCallback<Self::Item>,
+ CB: ProducerCallback<$t>,
{
- callback.callback(IterProducer { range: self.range })
+ callback.callback(IterProducer { range: iter.range })
}
}
@@ -150,36 +229,31 @@ macro_rules! unindexed_range_impl {
}
}
- impl ParallelIterator for Iter<$t> {
- type Item = $t;
+ impl RangeInteger for $t {
+ private_impl! {}
- fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ fn drive_unindexed<C>(iter: Iter<$t>, consumer: C) -> C::Result
where
- C: UnindexedConsumer<Self::Item>,
+ C: UnindexedConsumer<$t>,
{
#[inline]
fn offset(start: $t) -> impl Fn(usize) -> $t {
move |i| start.wrapping_add(i as $t)
}
- if let Some(len) = self.opt_len() {
+ if let Some(len) = iter.opt_len() {
// Drive this in indexed mode for better `collect`.
(0..len)
.into_par_iter()
- .map(offset(self.range.start))
+ .map(offset(iter.range.start))
.drive(consumer)
} else {
- bridge_unindexed(IterProducer { range: self.range }, consumer)
+ bridge_unindexed(IterProducer { range: iter.range }, consumer)
}
}
- fn opt_len(&self) -> Option<usize> {
- let len = self.range.len();
- if len <= usize::MAX as $len_t {
- Some(len as usize)
- } else {
- None
- }
+ fn opt_len(iter: &Iter<$t>) -> Option<usize> {
+ usize::try_from(iter.range.len()).ok()
}
}
@@ -366,3 +440,23 @@ fn test_usize_i64_overflow() {
let pool = ThreadPoolBuilder::new().num_threads(8).build().unwrap();
pool.install(|| assert_eq!(iter.find_last(|_| true), Some(i64::MAX - 1)));
}
+
+#[test]
+fn test_issue_833() {
+ fn is_even(n: i64) -> bool {
+ n % 2 == 0
+ }
+
+ // The integer type should be inferred from `is_even`
+ let v: Vec<_> = (1..100).into_par_iter().filter(|&x| is_even(x)).collect();
+ assert!(v.into_iter().eq((2..100).step_by(2)));
+
+ // Try examples with indexed iterators too
+ let pos = (0..100).into_par_iter().position_any(|x| x == 50i16);
+ assert_eq!(pos, Some(50usize));
+
+ assert!((0..100)
+ .into_par_iter()
+ .zip(0..100)
+ .all(|(a, b)| i16::eq(&a, &b)));
+}
diff --git a/src/range_inclusive.rs b/src/range_inclusive.rs
index c802b6c..b7bb0ca 100644
--- a/src/range_inclusive.rs
+++ b/src/range_inclusive.rs
@@ -21,10 +21,10 @@ use crate::iter::*;
use std::char;
use std::ops::RangeInclusive;
-/// Parallel iterator over an inclusive range, implemented for all integer types.
+/// Parallel iterator over an inclusive range, implemented for all integer types and `char`.
///
/// **Note:** The `zip` operation requires `IndexedParallelIterator`
-/// which is only implemented for `u8`, `i8`, `u16`, and `i16`.
+/// which is only implemented for `u8`, `i8`, `u16`, `i16`, and `char`.
///
/// ```
/// use rayon::prelude::*;
@@ -71,6 +71,7 @@ where
}
}
+/// Implemented for ranges of all primitive integer types and `char`.
impl<T> IntoParallelIterator for RangeInclusive<T>
where
Iter<T>: ParallelIterator,
@@ -83,34 +84,109 @@ where
}
}
+/// These traits help drive integer type inference. Without them, an unknown `{integer}` type only
+/// has constraints on `Iter<{integer}>`, which will probably give up and use `i32`. By adding
+/// these traits on the item type, the compiler can see a more direct constraint to infer like
+/// `{integer}: RangeInteger`, which works better. See `test_issue_833` for an example.
+///
+/// They have to be `pub` since they're seen in the public `impl ParallelIterator` constraints, but
+/// we put them in a private modules so they're not actually reachable in our public API.
+mod private {
+ use super::*;
+
+ /// Implementation details of `ParallelIterator for Iter<Self>`
+ pub trait RangeInteger: Sized + Send {
+ private_decl! {}
+
+ fn drive_unindexed<C>(iter: Iter<Self>, consumer: C) -> C::Result
+ where
+ C: UnindexedConsumer<Self>;
+
+ fn opt_len(iter: &Iter<Self>) -> Option<usize>;
+ }
+
+ /// Implementation details of `IndexedParallelIterator for Iter<Self>`
+ pub trait IndexedRangeInteger: RangeInteger {
+ private_decl! {}
+
+ fn drive<C>(iter: Iter<Self>, consumer: C) -> C::Result
+ where
+ C: Consumer<Self>;
+
+ fn len(iter: &Iter<Self>) -> usize;
+
+ fn with_producer<CB>(iter: Iter<Self>, callback: CB) -> CB::Output
+ where
+ CB: ProducerCallback<Self>;
+ }
+}
+use private::{IndexedRangeInteger, RangeInteger};
+
+impl<T: RangeInteger> ParallelIterator for Iter<T> {
+ type Item = T;
+
+ fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ where
+ C: UnindexedConsumer<T>,
+ {
+ T::drive_unindexed(self, consumer)
+ }
+
+ #[inline]
+ fn opt_len(&self) -> Option<usize> {
+ T::opt_len(self)
+ }
+}
+
+impl<T: IndexedRangeInteger> IndexedParallelIterator for Iter<T> {
+ fn drive<C>(self, consumer: C) -> C::Result
+ where
+ C: Consumer<T>,
+ {
+ T::drive(self, consumer)
+ }
+
+ #[inline]
+ fn len(&self) -> usize {
+ T::len(self)
+ }
+
+ fn with_producer<CB>(self, callback: CB) -> CB::Output
+ where
+ CB: ProducerCallback<T>,
+ {
+ T::with_producer(self, callback)
+ }
+}
+
macro_rules! convert {
- ( $self:ident . $method:ident ( $( $arg:expr ),* ) ) => {
- if let Some((start, end)) = $self.bounds() {
+ ( $iter:ident . $method:ident ( $( $arg:expr ),* ) ) => {
+ if let Some((start, end)) = $iter.bounds() {
if let Some(end) = end.checked_add(1) {
(start..end).into_par_iter().$method($( $arg ),*)
} else {
(start..end).into_par_iter().chain(once(end)).$method($( $arg ),*)
}
} else {
- empty::<Self::Item>().$method($( $arg ),*)
+ empty::<Self>().$method($( $arg ),*)
}
};
}
macro_rules! parallel_range_impl {
( $t:ty ) => {
- impl ParallelIterator for Iter<$t> {
- type Item = $t;
+ impl RangeInteger for $t {
+ private_impl! {}
- fn drive_unindexed<C>(self, consumer: C) -> C::Result
+ fn drive_unindexed<C>(iter: Iter<$t>, consumer: C) -> C::Result
where
- C: UnindexedConsumer<Self::Item>,
+ C: UnindexedConsumer<$t>,
{
- convert!(self.drive_unindexed(consumer))
+ convert!(iter.drive_unindexed(consumer))
}
- fn opt_len(&self) -> Option<usize> {
- convert!(self.opt_len())
+ fn opt_len(iter: &Iter<$t>) -> Option<usize> {
+ convert!(iter.opt_len())
}
}
};
@@ -120,23 +196,25 @@ macro_rules! indexed_range_impl {
( $t:ty ) => {
parallel_range_impl! { $t }
- impl IndexedParallelIterator for Iter<$t> {
- fn drive<C>(self, consumer: C) -> C::Result
+ impl IndexedRangeInteger for $t {
+ private_impl! {}
+
+ fn drive<C>(iter: Iter<$t>, consumer: C) -> C::Result
where
- C: Consumer<Self::Item>,
+ C: Consumer<$t>,
{
- convert!(self.drive(consumer))
+ convert!(iter.drive(consumer))
}
- fn len(&self) -> usize {
- self.range.len()
+ fn len(iter: &Iter<$t>) -> usize {
+ iter.range.len()
}
- fn with_producer<CB>(self, callback: CB) -> CB::Output
+ fn with_producer<CB>(iter: Iter<$t>, callback: CB) -> CB::Output
where
- CB: ProducerCallback<Self::Item>,
+ CB: ProducerCallback<$t>,
{
- convert!(self.with_producer(callback))
+ convert!(iter.with_producer(callback))
}
}
};
@@ -179,7 +257,7 @@ macro_rules! convert_char {
.$method($( $arg ),*)
}
} else {
- empty().into_par_iter().$method($( $arg ),*)
+ empty::<char>().$method($( $arg ),*)
}
};
}
@@ -286,3 +364,23 @@ fn test_usize_i64_overflow() {
let pool = ThreadPoolBuilder::new().num_threads(8).build().unwrap();
pool.install(|| assert_eq!(iter.find_last(|_| true), Some(i64::MAX)));
}
+
+#[test]
+fn test_issue_833() {
+ fn is_even(n: i64) -> bool {
+ n % 2 == 0
+ }
+
+ // The integer type should be inferred from `is_even`
+ let v: Vec<_> = (1..=100).into_par_iter().filter(|&x| is_even(x)).collect();
+ assert!(v.into_iter().eq((2..=100).step_by(2)));
+
+ // Try examples with indexed iterators too
+ let pos = (0..=100).into_par_iter().position_any(|x| x == 50i16);
+ assert_eq!(pos, Some(50usize));
+
+ assert!((0..=100)
+ .into_par_iter()
+ .zip(0..=100)
+ .all(|(a, b)| i16::eq(&a, &b)));
+}
diff --git a/src/slice/mergesort.rs b/src/slice/mergesort.rs
index a007cae..e9a5d43 100644
--- a/src/slice/mergesort.rs
+++ b/src/slice/mergesort.rs
@@ -284,7 +284,7 @@ fn collapse(runs: &[Run]) -> Option<usize> {
/// Otherwise, it sorts the slice into non-descending order.
///
/// This merge sort borrows some (but not all) ideas from TimSort, which is described in detail
-/// [here](http://svn.python.org/projects/python/trunk/Objects/listsort.txt).
+/// [here](https://svn.python.org/projects/python/trunk/Objects/listsort.txt).
///
/// The algorithm identifies strictly descending and non-descending subsequences, which are called
/// natural runs. There is a stack of pending runs yet to be merged. Each newly found run is pushed
@@ -739,12 +739,12 @@ mod tests {
check(&[1, 2, 2, 2, 2, 3], &[]);
check(&[], &[1, 2, 2, 2, 2, 3]);
- let mut rng = thread_rng();
+ let ref mut rng = thread_rng();
for _ in 0..100 {
- let limit: u32 = rng.gen_range(1, 21);
- let left_len: usize = rng.gen_range(0, 20);
- let right_len: usize = rng.gen_range(0, 20);
+ let limit: u32 = rng.gen_range(1..21);
+ let left_len: usize = rng.gen_range(0..20);
+ let right_len: usize = rng.gen_range(0..20);
let mut left = rng
.sample_iter(&Uniform::new(0, limit))
diff --git a/src/slice/mod.rs b/src/slice/mod.rs
index b80125f..d47b0d3 100644
--- a/src/slice/mod.rs
+++ b/src/slice/mod.rs
@@ -470,15 +470,6 @@ impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] {
}
}
-impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec<T> {
- type Item = &'data T;
- type Iter = Iter<'data, T>;
-
- fn into_par_iter(self) -> Self::Iter {
- Iter { slice: self }
- }
-}
-
impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut [T] {
type Item = &'data mut T;
type Iter = IterMut<'data, T>;
@@ -488,15 +479,6 @@ impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut [T] {
}
}
-impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec<T> {
- type Item = &'data mut T;
- type Iter = IterMut<'data, T>;
-
- fn into_par_iter(self) -> Self::Iter {
- IterMut { slice: self }
- }
-}
-
/// Parallel iterator over immutable items in a slice
#[derive(Debug)]
pub struct Iter<'data, T: Sync> {
diff --git a/src/slice/quicksort.rs b/src/slice/quicksort.rs
index b985073..17c6f33 100644
--- a/src/slice/quicksort.rs
+++ b/src/slice/quicksort.rs
@@ -229,7 +229,7 @@ where
/// Partitioning is performed block-by-block in order to minimize the cost of branching operations.
/// This idea is presented in the [BlockQuicksort][pdf] paper.
///
-/// [pdf]: http://drops.dagstuhl.de/opus/volltexte/2016/6389/pdf/LIPIcs-ESA-2016-38.pdf
+/// [pdf]: https://drops.dagstuhl.de/opus/volltexte/2016/6389/pdf/LIPIcs-ESA-2016-38.pdf
fn partition_in_blocks<T, F>(v: &mut [T], pivot: &T, is_less: &F) -> usize
where
F: Fn(&T, &T) -> bool,
@@ -766,7 +766,7 @@ mod tests {
#[test]
fn test_heapsort() {
- let rng = thread_rng();
+ let ref mut rng = thread_rng();
for len in (0..25).chain(500..501) {
for &modulus in &[5, 10, 100] {
diff --git a/src/slice/test.rs b/src/slice/test.rs
index 97de7d8..71743e2 100644
--- a/src/slice/test.rs
+++ b/src/slice/test.rs
@@ -10,7 +10,7 @@ macro_rules! sort {
($f:ident, $name:ident) => {
#[test]
fn $name() {
- let mut rng = thread_rng();
+ let ref mut rng = thread_rng();
for len in (0..25).chain(500..501) {
for &modulus in &[5, 10, 100] {
@@ -105,7 +105,7 @@ fn test_par_sort_stability() {
let mut rng = thread_rng();
let mut v: Vec<_> = (0..len)
.map(|_| {
- let n: usize = rng.gen_range(0, 10);
+ let n: usize = rng.gen_range(0..10);
counts[n] += 1;
(n, counts[n])
})
diff --git a/src/vec.rs b/src/vec.rs
index 686673b..1e68aa0 100644
--- a/src/vec.rs
+++ b/src/vec.rs
@@ -8,12 +8,31 @@
use crate::iter::plumbing::*;
use crate::iter::*;
use crate::math::simplify_range;
+use crate::slice::{Iter, IterMut};
use std::iter;
use std::mem;
use std::ops::{Range, RangeBounds};
use std::ptr;
use std::slice;
+impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec<T> {
+ type Item = &'data T;
+ type Iter = Iter<'data, T>;
+
+ fn into_par_iter(self) -> Self::Iter {
+ <&[T]>::into_par_iter(self)
+ }
+}
+
+impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec<T> {
+ type Item = &'data mut T;
+ type Iter = IterMut<'data, T>;
+
+ fn into_par_iter(self) -> Self::Iter {
+ <&mut [T]>::into_par_iter(self)
+ }
+}
+
/// Parallel iterator that moves out of a vector.
#[derive(Debug, Clone)]
pub struct IntoIter<T: Send> {
@@ -122,12 +141,16 @@ impl<'data, T: Send> IndexedParallelIterator for Drain<'data, T> {
let start = self.range.start;
self.vec.set_len(start);
- // Get a correct borrow lifetime, then extend it to the original length.
- let mut slice = &mut self.vec[start..];
- slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len());
+ // Create the producer as the exclusive "owner" of the slice.
+ let producer = {
+ // Get a correct borrow lifetime, then extend it to the original length.
+ let mut slice = &mut self.vec[start..];
+ slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len());
+ DrainProducer::new(slice)
+ };
// The producer will move or drop each item from the drained range.
- callback.callback(DrainProducer::new(slice))
+ callback.callback(producer)
}
}
}
@@ -208,7 +231,9 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
- let ptr = self.iter.next()?;
+ // Coerce the pointer early, so we don't keep the
+ // reference that's about to be invalidated.
+ let ptr: *const T = self.iter.next()?;
Some(unsafe { ptr::read(ptr) })
}
@@ -223,7 +248,9 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> {
impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> {
fn next_back(&mut self) -> Option<Self::Item> {
- let ptr = self.iter.next_back()?;
+ // Coerce the pointer early, so we don't keep the
+ // reference that's about to be invalidated.
+ let ptr: *const T = self.iter.next_back()?;
Some(unsafe { ptr::read(ptr) })
}
}
diff --git a/tests/clones.rs b/tests/clones.rs
index fa93f8a..9add775 100644
--- a/tests/clones.rs
+++ b/tests/clones.rs
@@ -108,6 +108,12 @@ fn clone_vec() {
}
#[test]
+fn clone_array() {
+ let a = [0i32; 100];
+ check(a.into_par_iter());
+}
+
+#[test]
fn clone_adaptors() {
let v: Vec<_> = (0..1000).map(Some).collect();
check(v.par_iter().chain(&v));
diff --git a/tests/debug.rs b/tests/debug.rs
index fb11110..3584ba0 100644
--- a/tests/debug.rs
+++ b/tests/debug.rs
@@ -131,6 +131,12 @@ fn debug_vec() {
}
#[test]
+fn debug_array() {
+ let a = [0i32; 10];
+ check(a.into_par_iter());
+}
+
+#[test]
fn debug_adaptors() {
let v: Vec<_> = (0..10).collect();
check(v.par_iter().chain(&v));
diff --git a/tests/producer_split_at.rs b/tests/producer_split_at.rs
index 752bc3e..3a49059 100644
--- a/tests/producer_split_at.rs
+++ b/tests/producer_split_at.rs
@@ -109,6 +109,12 @@ fn check_len<I: ExactSizeIterator>(iter: &I, len: usize) {
// **** Base Producers ****
#[test]
+fn array() {
+ let a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
+ check(&a, || a);
+}
+
+#[test]
fn empty() {
let v = vec![42];
check(&v[..0], rayon::iter::empty);
diff --git a/tests/sort-panic-safe.rs b/tests/sort-panic-safe.rs
index 7c50505..00a9731 100644
--- a/tests/sort-panic-safe.rs
+++ b/tests/sort-panic-safe.rs
@@ -132,7 +132,7 @@ fn sort_panic_safe() {
let mut rng = thread_rng();
let mut input = (0..len)
.map(|id| DropCounter {
- x: rng.gen_range(0, modulus),
+ x: rng.gen_range(0..modulus),
id,
version: Cell::new(0),
})