aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 05:16:38 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 05:16:38 +0000
commit2c26b7c98274f6358eb4781ff217069b72b728c9 (patch)
treeded15626a7744fe21c3f07d1e6d9230e28909490
parent30628deab525306a799417fc9c0a96a3764428db (diff)
parentf110d982be1a009b2da88e6feeeb261762daa4dc (diff)
downloadrayon-core-aml_uwb_341513070.tar.gz
Change-Id: Ia1cf23ffd5de4a04f7e034d2217d670d3e52509f
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp11
-rw-r--r--Cargo.toml38
-rw-r--r--Cargo.toml.orig11
-rw-r--r--METADATA14
-rw-r--r--README.md2
-rw-r--r--TEST_MAPPING3
-rw-r--r--src/broadcast/mod.rs151
-rw-r--r--src/broadcast/test.rs262
-rw-r--r--src/compile_fail/rc_return.rs8
-rw-r--r--src/compile_fail/rc_upvar.rs8
-rw-r--r--src/job.rs115
-rw-r--r--src/join/mod.rs3
-rw-r--r--src/join/test.rs6
-rw-r--r--src/latch.rs105
-rw-r--r--src/lib.rs89
-rw-r--r--src/log.rs64
-rw-r--r--src/registry.rs276
-rw-r--r--src/scope/mod.rs218
-rw-r--r--src/scope/test.rs120
-rw-r--r--src/sleep/README.md4
-rw-r--r--src/sleep/counters.rs10
-rw-r--r--src/sleep/mod.rs4
-rw-r--r--src/spawn/mod.rs21
-rw-r--r--src/spawn/test.rs16
-rw-r--r--src/test.rs27
-rw-r--r--src/thread_pool/mod.rs137
-rw-r--r--src/thread_pool/test.rs62
-rw-r--r--tests/double_init_fail.rs3
-rw-r--r--tests/init_zero_threads.rs1
-rw-r--r--tests/scoped_threadpool.rs3
-rw-r--r--tests/stack_overflow_crash.rs95
32 files changed, 1499 insertions, 395 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c8aa4e4..2a30505 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "ebcb09b1dc53211c6b5abdf4dc5b40e4bcd0a965"
- }
-}
+ "sha1": "6236214d717694917e77aa1c16d91176b9bc2fff"
+ },
+ "path_in_vcs": "rayon-core"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 65b0527..54886c4 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,14 +42,19 @@ rust_library {
host_supported: true,
crate_name: "rayon_core",
cargo_env_compat: true,
- cargo_pkg_version: "1.9.1",
+ cargo_pkg_version: "1.11.0",
srcs: ["src/lib.rs"],
- edition: "2018",
+ edition: "2021",
rustlibs: [
"libcrossbeam_channel",
"libcrossbeam_deque",
"libcrossbeam_utils",
- "liblazy_static",
"libnum_cpus",
],
+ apex_available: [
+ "//apex_available:platform",
+ "//apex_available:anyapex",
+ ],
+ product_available: true,
+ vendor_available: true,
}
diff --git a/Cargo.toml b/Cargo.toml
index 8b110fd..d41715e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,32 +3,40 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
-edition = "2018"
+edition = "2021"
+rust-version = "1.59"
name = "rayon-core"
-version = "1.9.1"
-authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"]
+version = "1.11.0"
+authors = [
+ "Niko Matsakis <niko@alum.mit.edu>",
+ "Josh Stone <cuviper@gmail.com>",
+]
build = "build.rs"
links = "rayon-core"
description = "Core APIs for Rayon"
documentation = "https://docs.rs/rayon/"
readme = "README.md"
-keywords = ["parallel", "thread", "concurrency", "join", "performance"]
+keywords = [
+ "parallel",
+ "thread",
+ "concurrency",
+ "join",
+ "performance",
+]
categories = ["concurrency"]
-license = "Apache-2.0/MIT"
+license = "MIT OR Apache-2.0"
repository = "https://github.com/rayon-rs/rayon"
[[test]]
name = "stack_overflow_crash"
path = "tests/stack_overflow_crash.rs"
-harness = false
[[test]]
name = "double_init_fail"
@@ -49,20 +57,19 @@ path = "tests/simple_panic.rs"
[[test]]
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
+
[dependencies.crossbeam-channel]
version = "0.5.0"
[dependencies.crossbeam-deque]
-version = "0.8.0"
+version = "0.8.1"
[dependencies.crossbeam-utils]
version = "0.8.0"
-[dependencies.lazy_static]
-version = "1"
-
[dependencies.num_cpus]
version = "1.2"
+
[dev-dependencies.rand]
version = "0.8"
@@ -71,5 +78,6 @@ version = "0.3"
[dev-dependencies.scoped-tls]
version = "1.0"
+
[target."cfg(unix)".dev-dependencies.libc]
version = "0.2"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d31ed22..920ffe5 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,13 +1,14 @@
[package]
name = "rayon-core"
-version = "1.9.1" # reminder to update html_root_url attribute
+version = "1.11.0"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
-license = "Apache-2.0/MIT"
+license = "MIT OR Apache-2.0"
repository = "https://github.com/rayon-rs/rayon"
documentation = "https://docs.rs/rayon/"
-edition = "2018"
+rust-version = "1.59"
+edition = "2021"
links = "rayon-core"
build = "build.rs"
readme = "README.md"
@@ -17,9 +18,8 @@ categories = ["concurrency"]
# Some dependencies may not be their latest version, in order to support older rustc.
[dependencies]
num_cpus = "1.2"
-lazy_static = "1"
crossbeam-channel = "0.5.0"
-crossbeam-deque = "0.8.0"
+crossbeam-deque = "0.8.1"
crossbeam-utils = "0.8.0"
[dev-dependencies]
@@ -33,7 +33,6 @@ libc = "0.2"
[[test]]
name = "stack_overflow_crash"
path = "tests/stack_overflow_crash.rs"
-harness = false
# NB: having one [[test]] manually defined means we need to declare them all
diff --git a/METADATA b/METADATA
index 7814725..e9917f9 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/rayon-core
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "rayon-core"
description: "Core APIs for Rayon"
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/rayon-core/rayon-core-1.9.1.crate"
+ value: "https://static.crates.io/crates/rayon-core/rayon-core-1.11.0.crate"
}
- version: "1.9.1"
+ version: "1.11.0"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 5
- day: 19
+ year: 2023
+ month: 4
+ day: 3
}
}
diff --git a/README.md b/README.md
index 0c49362..448901b 100644
--- a/README.md
+++ b/README.md
@@ -8,4 +8,4 @@ Please see [Rayon Docs] for details about using Rayon.
[Rayon Docs]: https://docs.rs/rayon/
-Rayon-core currently requires `rustc 1.36.0` or greater.
+Rayon-core currently requires `rustc 1.59.0` or greater.
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 3cbd48d..55384f0 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -5,6 +5,9 @@
"path": "external/rust/crates/base64"
},
{
+ "path": "external/rust/crates/hashbrown"
+ },
+ {
"path": "external/rust/crates/tinytemplate"
},
{
diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs
new file mode 100644
index 0000000..d991c54
--- /dev/null
+++ b/src/broadcast/mod.rs
@@ -0,0 +1,151 @@
+use crate::job::{ArcJob, StackJob};
+use crate::latch::LatchRef;
+use crate::registry::{Registry, WorkerThread};
+use crate::scope::ScopeLatch;
+use std::fmt;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+mod test;
+
+/// Executes `op` within every thread in the current threadpool. If this is
+/// called from a non-Rayon thread, it will execute in the global threadpool.
+/// Any attempts to use `join`, `scope`, or parallel iterators will then operate
+/// within that threadpool. When the call has completed on each thread, returns
+/// a vector containing all of their return values.
+///
+/// For more information, see the [`ThreadPool::broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.broadcast
+pub fn broadcast<OP, R>(op: OP) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ // We assert that current registry has not terminated.
+ unsafe { broadcast_in(op, &Registry::current()) }
+}
+
+/// Spawns an asynchronous task on every thread in this thread-pool. This task
+/// will run in the implicit, global scope, which means that it may outlast the
+/// current stack frame -- therefore, it cannot capture any references onto the
+/// stack (you will likely need a `move` closure).
+///
+/// For more information, see the [`ThreadPool::spawn_broadcast()`][m] method.
+///
+/// [m]: struct.ThreadPool.html#method.spawn_broadcast
+pub fn spawn_broadcast<OP>(op: OP)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ // We assert that current registry has not terminated.
+ unsafe { spawn_broadcast_in(op, &Registry::current()) }
+}
+
+/// Provides context to a closure called by `broadcast`.
+pub struct BroadcastContext<'a> {
+ worker: &'a WorkerThread,
+
+ /// Make sure to prevent auto-traits like `Send` and `Sync`.
+ _marker: PhantomData<&'a mut dyn Fn()>,
+}
+
+impl<'a> BroadcastContext<'a> {
+ pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
+ let worker_thread = WorkerThread::current();
+ assert!(!worker_thread.is_null());
+ f(BroadcastContext {
+ worker: unsafe { &*worker_thread },
+ _marker: PhantomData,
+ })
+ }
+
+ /// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
+ #[inline]
+ pub fn index(&self) -> usize {
+ self.worker.index()
+ }
+
+ /// The number of threads receiving the broadcast in the thread pool.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Future versions of Rayon might vary the number of threads over time, but
+ /// this method will always return the number of threads which are actually
+ /// receiving your particular `broadcast` call.
+ #[inline]
+ pub fn num_threads(&self) -> usize {
+ self.worker.registry().num_threads()
+ }
+}
+
+impl<'a> fmt::Debug for BroadcastContext<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BroadcastContext")
+ .field("index", &self.index())
+ .field("num_threads", &self.num_threads())
+ .field("pool_id", &self.worker.registry().id())
+ .finish()
+ }
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function will not return until all
+/// threads have completed the `op`.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn broadcast_in<OP, R>(op: OP, registry: &Arc<Registry>) -> Vec<R>
+where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+{
+ let f = move |injected: bool| {
+ debug_assert!(injected);
+ BroadcastContext::with(&op)
+ };
+
+ let n_threads = registry.num_threads();
+ let current_thread = WorkerThread::current().as_ref();
+ let latch = ScopeLatch::with_count(n_threads, current_thread);
+ let jobs: Vec<_> = (0..n_threads)
+ .map(|_| StackJob::new(&f, LatchRef::new(&latch)))
+ .collect();
+ let job_refs = jobs.iter().map(|job| job.as_job_ref());
+
+ registry.inject_broadcast(job_refs);
+
+ // Wait for all jobs to complete, then collect the results, maybe propagating a panic.
+ latch.wait(current_thread);
+ jobs.into_iter().map(|job| job.into_result()).collect()
+}
+
+/// Execute `op` on every thread in the pool. It will be executed on each
+/// thread when they have nothing else to do locally, before they try to
+/// steal work from other threads. This function returns immediately after
+/// injecting the jobs.
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn spawn_broadcast_in<OP>(op: OP, registry: &Arc<Registry>)
+where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+{
+ let job = ArcJob::new({
+ let registry = Arc::clone(registry);
+ move || {
+ registry.catch_unwind(|| BroadcastContext::with(&op));
+ registry.terminate(); // (*) permit registry to terminate now
+ }
+ });
+
+ let n_threads = registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| {
+ // Ensure that registry cannot terminate until this job has executed
+ // on each thread. This ref is decremented at the (*) above.
+ registry.increment_terminate_count();
+
+ ArcJob::as_static_job_ref(&job)
+ });
+
+ registry.inject_broadcast(job_refs);
+}
diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs
new file mode 100644
index 0000000..3ae11f7
--- /dev/null
+++ b/src/broadcast/test.rs
@@ -0,0 +1,262 @@
+#![cfg(test)]
+
+use crate::ThreadPoolBuilder;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::{thread, time};
+
+#[test]
+fn broadcast_global() {
+ let v = crate::broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_global() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_pool() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.broadcast(|ctx| ctx.index());
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_pool() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_self() {
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_self() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
+
+ let mut v: Vec<_> = rx.into_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..7));
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ pool2.broadcast(|_| {
+ pool1.broadcast(|_| {
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_mutual_sleepy() {
+ let count = AtomicUsize::new(0);
+ let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.install(|| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.broadcast(|_| {
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.broadcast(|_| {
+ thread::sleep(time::Duration::from_millis(100));
+ count.fetch_add(1, Ordering::Relaxed);
+ })
+ })
+ });
+ assert_eq!(count.into_inner(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn spawn_broadcast_mutual_sleepy() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
+ let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool1.spawn({
+ let pool1 = Arc::clone(&pool1);
+ move || {
+ thread::sleep(time::Duration::from_secs(1));
+ pool2.spawn_broadcast(move |_| {
+ let tx = tx.clone();
+ thread::sleep(time::Duration::from_secs(1));
+ pool1.spawn_broadcast(move |_| {
+ thread::sleep(time::Duration::from_millis(100));
+ tx.send(()).unwrap();
+ })
+ })
+ }
+ });
+ assert_eq!(rx.into_iter().count(), 3 * 7);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_one() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 1);
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.broadcast(|ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ })
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
+fn spawn_broadcast_panic_many() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+ let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(7)
+ .panic_handler(move |e| panic_tx.send(e).unwrap())
+ .build()
+ .unwrap();
+ pool.spawn_broadcast(move |ctx| {
+ tx.send(()).unwrap();
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ drop(pool); // including panic_tx
+ assert_eq!(rx.into_iter().count(), 7);
+ assert_eq!(panic_rx.into_iter().count(), 4);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn broadcast_sleep_race() {
+ let test_duration = time::Duration::from_secs(1);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let start = time::Instant::now();
+ while start.elapsed() < test_duration {
+ pool.broadcast(|ctx| {
+ // A slight spread of sleep duration increases the chance that one
+ // of the threads will race in the pool's idle sleep afterward.
+ thread::sleep(time::Duration::from_micros(ctx.index() as u64));
+ });
+ }
+}
+
+#[test]
+fn broadcast_after_spawn_broadcast() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+
+ // Queue a non-blocking spawn_broadcast.
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ // This blocking broadcast runs after all prior broadcasts.
+ crate::broadcast(|_| {});
+
+ // The spawn_broadcast **must** have run by now on all threads.
+ let mut v: Vec<_> = rx.try_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_after_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn on a thread-local deque.
+ crate::registry::in_worker(move |_, _| {
+ crate::spawn(move || tx.send(22).unwrap());
+ });
+
+ // Broadcast runs after the local deque is empty.
+ crate::broadcast(|_| {});
+
+ // The spawn **must** have run by now.
+ assert_eq!(22, rx.try_recv().unwrap());
+}
diff --git a/src/compile_fail/rc_return.rs b/src/compile_fail/rc_return.rs
index 164f8ce..93e3a60 100644
--- a/src/compile_fail/rc_return.rs
+++ b/src/compile_fail/rc_return.rs
@@ -2,9 +2,7 @@
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
-}
+rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
``` */
mod left {}
@@ -13,9 +11,7 @@ mod left {}
use std::rc::Rc;
-fn main() {
- rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
-}
+rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
``` */
mod right {}
diff --git a/src/compile_fail/rc_upvar.rs b/src/compile_fail/rc_upvar.rs
index 62895bf..d8aebcf 100644
--- a/src/compile_fail/rc_upvar.rs
+++ b/src/compile_fail/rc_upvar.rs
@@ -2,10 +2,8 @@
use std::rc::Rc;
-fn main() {
- let r = Rc::new(22);
- rayon_core::join(|| r.clone(), || r.clone());
- //~^ ERROR
-}
+let r = Rc::new(22);
+rayon_core::join(|| r.clone(), || r.clone());
+//~^ ERROR
``` */
diff --git a/src/job.rs b/src/job.rs
index a71f1b0..5664bb3 100644
--- a/src/job.rs
+++ b/src/job.rs
@@ -4,6 +4,7 @@ use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
+use std::sync::Arc;
pub(super) enum JobResult<T> {
None,
@@ -20,7 +21,7 @@ pub(super) trait Job {
/// Unsafe: this may be called from a different thread than the one
/// which scheduled the job, so the implementer must ensure the
/// appropriate traits are met, whether `Send`, `Sync`, or both.
- unsafe fn execute(this: *const Self);
+ unsafe fn execute(this: *const ());
}
/// Effectively a Job trait object. Each JobRef **must** be executed
@@ -29,7 +30,6 @@ pub(super) trait Job {
/// Internally, we store the job's data in a `*const ()` pointer. The
/// true type is something like `*const StackJob<...>`, but we hide
/// it. We also carry the "execute fn" from the `Job` trait.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) struct JobRef {
pointer: *const (),
execute_fn: unsafe fn(*const ()),
@@ -45,17 +45,22 @@ impl JobRef {
where
T: Job,
{
- let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
-
// erase types:
JobRef {
pointer: data as *const (),
- execute_fn: mem::transmute(fn_ptr),
+ execute_fn: <T as Job>::execute,
}
}
+ /// Returns an opaque handle that can be saved and compared,
+ /// without making `JobRef` itself `Copy + Eq`.
+ #[inline]
+ pub(super) fn id(&self) -> impl Eq {
+ (self.pointer, self.execute_fn)
+ }
+
#[inline]
- pub(super) unsafe fn execute(&self) {
+ pub(super) unsafe fn execute(self) {
(self.execute_fn)(self.pointer)
}
}
@@ -108,19 +113,12 @@ where
F: FnOnce(bool) -> R + Send,
R: Send,
{
- unsafe fn execute(this: *const Self) {
- fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
- move || func(true)
- }
-
- let this = &*this;
+ unsafe fn execute(this: *const ()) {
+ let this = &*(this as *const Self);
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
- (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
- Ok(x) => JobResult::Ok(x),
- Err(x) => JobResult::Panic(x),
- };
- this.latch.set();
+ (*this.result.get()) = JobResult::call(func);
+ Latch::set(&this.latch);
mem::forget(abort);
}
}
@@ -135,25 +133,30 @@ pub(super) struct HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- job: UnsafeCell<Option<BODY>>,
+ job: BODY,
}
impl<BODY> HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- pub(super) fn new(func: BODY) -> Self {
- HeapJob {
- job: UnsafeCell::new(Some(func)),
- }
+ pub(super) fn new(job: BODY) -> Box<Self> {
+ Box::new(HeapJob { job })
}
/// Creates a `JobRef` from this job -- note that this hides all
/// lifetimes, so it is up to you to ensure that this JobRef
/// doesn't outlive any data that it closes over.
- pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
- let this: *const Self = mem::transmute(self);
- JobRef::new(this)
+ pub(super) unsafe fn into_job_ref(self: Box<Self>) -> JobRef {
+ JobRef::new(Box::into_raw(self))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn into_static_job_ref(self: Box<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { self.into_job_ref() }
}
}
@@ -161,14 +164,63 @@ impl<BODY> Job for HeapJob<BODY>
where
BODY: FnOnce() + Send,
{
- unsafe fn execute(this: *const Self) {
- let this: Box<Self> = mem::transmute(this);
- let job = (*this.job.get()).take().unwrap();
- job();
+ unsafe fn execute(this: *const ()) {
+ let this = Box::from_raw(this as *mut Self);
+ (this.job)();
+ }
+}
+
+/// Represents a job stored in an `Arc` -- like `HeapJob`, but may
+/// be turned into multiple `JobRef`s and called multiple times.
+pub(super) struct ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ job: BODY,
+}
+
+impl<BODY> ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ pub(super) fn new(job: BODY) -> Arc<Self> {
+ Arc::new(ArcJob { job })
+ }
+
+ /// Creates a `JobRef` from this job -- note that this hides all
+ /// lifetimes, so it is up to you to ensure that this JobRef
+ /// doesn't outlive any data that it closes over.
+ pub(super) unsafe fn as_job_ref(this: &Arc<Self>) -> JobRef {
+ JobRef::new(Arc::into_raw(Arc::clone(this)))
+ }
+
+ /// Creates a static `JobRef` from this job.
+ pub(super) fn as_static_job_ref(this: &Arc<Self>) -> JobRef
+ where
+ BODY: 'static,
+ {
+ unsafe { Self::as_job_ref(this) }
+ }
+}
+
+impl<BODY> Job for ArcJob<BODY>
+where
+ BODY: Fn() + Send + Sync,
+{
+ unsafe fn execute(this: *const ()) {
+ let this = Arc::from_raw(this as *mut Self);
+ (this.job)();
}
}
impl<T> JobResult<T> {
+ fn call(func: impl FnOnce(bool) -> T) -> Self {
+ match unwind::halt_unwinding(|| func(true)) {
+ Ok(x) => JobResult::Ok(x),
+ Err(x) => JobResult::Panic(x),
+ }
+ }
+
/// Convert the `JobResult` for a job that has finished (and hence
/// its JobResult is populated) into its return value.
///
@@ -204,10 +256,11 @@ impl JobFifo {
}
impl Job for JobFifo {
- unsafe fn execute(this: *const Self) {
+ unsafe fn execute(this: *const ()) {
// We "execute" a queue by executing its first job, FIFO.
+ let this = &*(this as *const Self);
loop {
- match (*this).inner.steal() {
+ match this.inner.steal() {
Steal::Success(job_ref) => break job_ref.execute(),
Steal::Empty => panic!("FIFO is empty"),
Steal::Retry => {}
diff --git a/src/join/mod.rs b/src/join/mod.rs
index d72c7e6..5ab9f6b 100644
--- a/src/join/mod.rs
+++ b/src/join/mod.rs
@@ -135,6 +135,7 @@ where
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
+ let job_b_id = job_b_ref.id();
worker_thread.push(job_b_ref);
// Execute task a; hopefully b gets stolen in the meantime.
@@ -151,7 +152,7 @@ where
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
- if job == job_b_ref {
+ if job_b_id == job.id() {
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
diff --git a/src/join/test.rs b/src/join/test.rs
index e7f287f..b303dbc 100644
--- a/src/join/test.rs
+++ b/src/join/test.rs
@@ -47,6 +47,7 @@ fn sort() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
@@ -77,6 +78,7 @@ fn panic_propagate_both() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
@@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
@@ -94,6 +97,7 @@ fn join_context_both() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -104,6 +108,7 @@ fn join_context_neither() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;
@@ -127,6 +132,7 @@ fn join_context_second() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;
diff --git a/src/latch.rs b/src/latch.rs
index 1d573b7..de43272 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -1,3 +1,5 @@
+use std::marker::PhantomData;
+use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;
@@ -37,10 +39,15 @@ pub(super) trait Latch {
///
/// Setting a latch triggers other threads to wake up and (in some
/// cases) complete. This may, in turn, cause memory to be
- /// allocated and so forth. One must be very careful about this,
+ /// deallocated and so forth. One must be very careful about this,
/// and it's typically better to read all the fields you will need
/// to access *before* a latch is set!
- fn set(&self);
+ ///
+ /// This function operates on `*const Self` instead of `&self` to allow it
+ /// to become dangling during this call. The caller must ensure that the
+ /// pointer is valid upon entry, and not invalidated during the call by any
+ /// actions other than `set` itself.
+ unsafe fn set(this: *const Self);
}
pub(super) trait AsCoreLatch {
@@ -123,8 +130,8 @@ impl CoreLatch {
/// doing some wakeups; those are encapsulated in the surrounding
/// latch code.
#[inline]
- fn set(&self) -> bool {
- let old_state = self.state.swap(SET, Ordering::AcqRel);
+ unsafe fn set(this: *const Self) -> bool {
+ let old_state = (*this).state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}
@@ -186,27 +193,29 @@ impl<'r> AsCoreLatch for SpinLatch<'r> {
impl<'r> Latch for SpinLatch<'r> {
#[inline]
- fn set(&self) {
+ unsafe fn set(this: *const Self) {
let cross_registry;
- let registry = if self.cross {
+ let registry: &Registry = if (*this).cross {
// Ensure the registry stays alive while we notify it.
// Otherwise, it would be possible that we set the spin
// latch and the other thread sees it and exits, causing
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
- cross_registry = Arc::clone(self.registry);
+ cross_registry = Arc::clone((*this).registry);
&cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
// thread which is performing `set` is itself ensuring
- // that the registry stays alive.
- self.registry
+ // that the registry stays alive. However, that doesn't
+ // include this *particular* `Arc` handle if the waiting
+ // thread then exits, so we must completely dereference it.
+ (*this).registry
};
- let target_worker_index = self.target_worker_index;
+ let target_worker_index = (*this).target_worker_index;
- // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
- if self.core_latch.set() {
+ // NOTE: Once we `set`, the target may proceed and invalidate `this`!
+ if CoreLatch::set(&(*this).core_latch) {
// Subtle: at this point, we can no longer read from
// `self`, because the thread owning this spin latch may
// have awoken and deallocated the latch. Therefore, we
@@ -253,10 +262,10 @@ impl LockLatch {
impl Latch for LockLatch {
#[inline]
- fn set(&self) {
- let mut guard = self.m.lock().unwrap();
+ unsafe fn set(this: *const Self) {
+ let mut guard = (*this).m.lock().unwrap();
*guard = true;
- self.v.notify_all();
+ (*this).v.notify_all();
}
}
@@ -284,9 +293,14 @@ pub(super) struct CountLatch {
impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
+ Self::with_count(1)
+ }
+
+ #[inline]
+ pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
core_latch: CoreLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
@@ -300,9 +314,9 @@ impl CountLatch {
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
- pub(super) fn set(&self) -> bool {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.core_latch.set();
+ pub(super) unsafe fn set(this: *const Self) -> bool {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ CoreLatch::set(&(*this).core_latch);
true
} else {
false
@@ -313,8 +327,12 @@ impl CountLatch {
/// the latch is set, then the specific worker thread is tickled,
/// which should be the one that owns this latch.
#[inline]
- pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
- if self.set() {
+ pub(super) unsafe fn set_and_tickle_one(
+ this: *const Self,
+ registry: &Registry,
+ target_worker_index: usize,
+ ) {
+ if Self::set(this) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
@@ -335,10 +353,10 @@ pub(super) struct CountLockLatch {
impl CountLockLatch {
#[inline]
- pub(super) fn new() -> CountLockLatch {
+ pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
- counter: AtomicUsize::new(1),
+ counter: AtomicUsize::new(n),
}
}
@@ -355,19 +373,42 @@ impl CountLockLatch {
impl Latch for CountLockLatch {
#[inline]
- fn set(&self) {
- if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- self.lock_latch.set();
+ unsafe fn set(this: *const Self) {
+ if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ LockLatch::set(&(*this).lock_latch);
+ }
+ }
+}
+
+/// `&L` without any implication of `dereferenceable` for `Latch::set`
+pub(super) struct LatchRef<'a, L> {
+ inner: *const L,
+ marker: PhantomData<&'a L>,
+}
+
+impl<L> LatchRef<'_, L> {
+ pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
+ LatchRef {
+ inner,
+ marker: PhantomData,
}
}
}
-impl<'a, L> Latch for &'a L
-where
- L: Latch,
-{
+unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
+
+impl<L> Deref for LatchRef<'_, L> {
+ type Target = L;
+
+ fn deref(&self) -> &L {
+ // SAFETY: if we have &self, the inner latch is still alive
+ unsafe { &*self.inner }
+ }
+}
+
+impl<L: Latch> Latch for LatchRef<'_, L> {
#[inline]
- fn set(&self) {
- L::set(self);
+ unsafe fn set(this: *const Self) {
+ L::set((*this).inner);
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 2df24b2..c9694ee 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -26,7 +26,24 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
-//! ## Restricting multiple versions
+//! # Global fallback when threading is unsupported
+//!
+//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
+//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
+//! targets are notable examples of this. Rather than panicking on the unsupported error when
+//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
+//!
+//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
+//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
+//! there is no other thread to share the work. However, since the pool is not running independent
+//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
+//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
+//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
+//! can also volunteer execution time.
+//!
+//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
+//!
+//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
@@ -44,7 +61,6 @@
//! conflicting requirements will need to be resolved before the build will
//! succeed.
-#![doc(html_root_url = "https://docs.rs/rayon-core/1.9")]
#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
#![deny(unreachable_pub)]
@@ -63,6 +79,7 @@ mod log;
#[macro_use]
mod private;
+mod broadcast;
mod job;
mod join;
mod latch;
@@ -76,6 +93,7 @@ mod unwind;
mod compile_fail;
mod test;
+pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
@@ -84,9 +102,21 @@ pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
+pub use self::thread_pool::{yield_local, yield_now, Yield};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
+/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
+///
+/// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
+/// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
+///
+/// The value may vary between different targets, and is subject to change in new Rayon versions.
+pub fn max_num_threads() -> usize {
+ // We are limited by the bits available in the sleep counter's `AtomicUsize`.
+ crate::sleep::THREADS_MAX
+}
+
/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
/// the number of threads for the thread-pool of the current
@@ -174,6 +204,7 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
///
/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
#[deprecated(note = "Use `ThreadPoolBuilder`")]
+#[derive(Default)]
pub struct Configuration {
builder: ThreadPoolBuilder,
}
@@ -258,7 +289,7 @@ impl ThreadPoolBuilder {
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -328,7 +359,7 @@ impl<S> ThreadPoolBuilder<S> {
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
///
/// # Examples
///
@@ -373,6 +404,39 @@ impl<S> ThreadPoolBuilder<S> {
/// Ok(())
/// }
/// ```
+ ///
+ /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
+ /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
+ /// [`build_scoped`](#method.build_scoped).
+ ///
+ /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// std::thread::scope(|scope| {
+ /// let pool = rayon::ThreadPoolBuilder::new()
+ /// .spawn_handler(|thread| {
+ /// let mut builder = std::thread::Builder::new();
+ /// if let Some(name) = thread.name() {
+ /// builder = builder.name(name.to_string());
+ /// }
+ /// if let Some(size) = thread.stack_size() {
+ /// builder = builder.stack_size(size);
+ /// }
+ /// builder.spawn_scoped(scope, || {
+ /// // Add any scoped initialization here, then run!
+ /// thread.run()
+ /// })?;
+ /// Ok(())
+ /// })
+ /// .build()?;
+ ///
+ /// pool.install(|| println!("Hello from my custom scoped thread!"));
+ /// Ok(())
+ /// })
+ /// }
+ /// ```
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
@@ -462,7 +526,7 @@ impl<S> ThreadPoolBuilder<S> {
/// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
/// variable. If both variables are specified, `RAYON_NUM_THREADS` will
- /// be prefered.
+ /// be preferred.
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
@@ -515,7 +579,7 @@ impl<S> ThreadPoolBuilder<S> {
/// to true, however, workers will prefer to execute in a
/// *breadth-first* fashion -- that is, they will search for jobs at
/// the *bottom* of their local deque. (At present, workers *always*
- /// steal from the bottom of other worker's deques, regardless of
+ /// steal from the bottom of other workers' deques, regardless of
/// the setting of this flag.)
///
/// If you think of the tasks as a tree, where a parent task
@@ -661,6 +725,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}
+
+ fn is_unsupported(&self) -> bool {
+ matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
+ }
}
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
@@ -738,15 +806,6 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
}
#[allow(deprecated)]
-impl Default for Configuration {
- fn default() -> Self {
- Configuration {
- builder: Default::default(),
- }
- }
-}
-
-#[allow(deprecated)]
impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.builder.fmt(f)
diff --git a/src/log.rs b/src/log.rs
index e1ff827..7b6daf0 100644
--- a/src/log.rs
+++ b/src/log.rs
@@ -93,6 +93,9 @@ pub(super) enum Event {
/// A job was removed from the global queue.
JobUninjected { worker: usize },
+ /// A job was broadcasted to N threads.
+ JobBroadcast { count: usize },
+
/// When announcing a job, this was the value of the counters we observed.
///
/// No effect on thread state, just a debugging event.
@@ -124,15 +127,15 @@ impl Logger {
let (sender, receiver) = crossbeam_channel::unbounded();
- if env_log.starts_with("tail:") {
- let filename = env_log["tail:".len()..].to_string();
+ if let Some(filename) = env_log.strip_prefix("tail:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
});
} else if env_log == "all" {
::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
- } else if env_log.starts_with("profile:") {
- let filename = env_log["profile:".len()..].to_string();
+ } else if let Some(filename) = env_log.strip_prefix("profile:") {
+ let filename = filename.to_string();
::std::thread::spawn(move || {
Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
});
@@ -140,9 +143,9 @@ impl Logger {
panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
}
- return Logger {
+ Logger {
sender: Some(sender),
- };
+ }
}
fn disabled() -> Logger {
@@ -175,19 +178,12 @@ impl Logger {
let timeout = std::time::Duration::from_secs(30);
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- break;
- } else {
- events.push(event);
- }
- }
-
- Err(_) => break,
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ break;
}
+ events.push(event);
if events.len() == capacity {
break;
}
@@ -219,31 +215,25 @@ impl Logger {
let mut skipped = false;
loop {
- loop {
- match receiver.recv_timeout(timeout) {
- Ok(event) => {
- if let Event::Flush = event {
- // We ignore Flush events in tail mode --
- // we're really just looking for
- // deadlocks.
- continue;
- } else {
- if events.len() == capacity {
- let event = events.pop_front().unwrap();
- state.simulate(&event);
- skipped = true;
- }
-
- events.push_back(event);
- }
+ while let Ok(event) = receiver.recv_timeout(timeout) {
+ if let Event::Flush = event {
+ // We ignore Flush events in tail mode --
+ // we're really just looking for
+ // deadlocks.
+ continue;
+ } else {
+ if events.len() == capacity {
+ let event = events.pop_front().unwrap();
+ state.simulate(&event);
+ skipped = true;
}
- Err(_) => break,
+ events.push_back(event);
}
}
if skipped {
- write!(writer, "...\n").unwrap();
+ writeln!(writer, "...").unwrap();
skipped = false;
}
@@ -417,7 +407,7 @@ impl SimulatorState {
}
}
- write!(w, "\n")?;
+ writeln!(w)?;
Ok(())
}
}
diff --git a/src/registry.rs b/src/registry.rs
index 4156b90..5d56ac9 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -1,14 +1,14 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+ Yield,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
-use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
@@ -16,10 +16,8 @@ use std::hash::Hasher;
use std::io;
use std::mem;
use std::ptr;
-#[allow(deprecated)]
-use std::sync::atomic::ATOMIC_USIZE_INIT;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Once};
+use std::sync::{Arc, Mutex, Once};
use std::thread;
use std::usize;
@@ -29,6 +27,7 @@ pub struct ThreadBuilder {
name: Option<String>,
stack_size: Option<usize>,
worker: Worker<JobRef>,
+ stealer: Stealer<JobRef>,
registry: Arc<Registry>,
index: usize,
}
@@ -41,7 +40,7 @@ impl ThreadBuilder {
/// Gets the string that was specified by `ThreadPoolBuilder::name()`.
pub fn name(&self) -> Option<&str> {
- self.name.as_ref().map(String::as_str)
+ self.name.as_deref()
}
/// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
@@ -52,7 +51,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.registry, self.index) }
+ unsafe { main_loop(self) }
}
}
@@ -135,6 +134,7 @@ pub(super) struct Registry {
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
+ broadcasts: Mutex<Vec<Worker<JobRef>>>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
@@ -165,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -199,6 +199,46 @@ where
result
}
+fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
+ let result = Registry::new(ThreadPoolBuilder::new());
+
+ // If we're running in an environment that doesn't support threads at all, we can fall back to
+ // using the current thread alone. This is crude, and probably won't work for non-blocking
+ // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
+ //
+ // Notably, this allows current WebAssembly targets to work even though their threading support
+ // is stubbed out, and we won't have to change anything if they do add real threading.
+ let unsupported = matches!(&result, Err(e) if e.is_unsupported());
+ if unsupported && WorkerThread::current().is_null() {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(1)
+ .spawn_handler(|thread| {
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+
+ Ok(())
+ });
+
+ let fallback_result = Registry::new(builder);
+ if fallback_result.is_ok() {
+ return fallback_result;
+ }
+ }
+
+ result
+}
+
struct Terminator<'a>(&'a Arc<Registry>);
impl<'a> Drop for Terminator<'a> {
@@ -214,7 +254,9 @@ impl Registry {
where
S: ThreadSpawn,
{
- let n_threads = builder.get_num_threads();
+ // Soft-limit the number of threads that we can actually support.
+ let n_threads = Ord::min(builder.get_num_threads(), crate::max_num_threads());
+
let breadth_first = builder.get_breadth_first();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
@@ -230,12 +272,21 @@ impl Registry {
})
.unzip();
+ let (broadcasts, broadcast_stealers): (Vec<_>, Vec<_>) = (0..n_threads)
+ .map(|_| {
+ let worker = Worker::new_fifo();
+ let stealer = worker.stealer();
+ (worker, stealer)
+ })
+ .unzip();
+
let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(logger, n_threads),
injected_jobs: Injector::new(),
+ broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
@@ -245,12 +296,13 @@ impl Registry {
// If we return early or panic, make sure to terminate existing threads.
let t1000 = Terminator(&registry);
- for (index, worker) in workers.into_iter().enumerate() {
+ for (index, (worker, stealer)) in workers.into_iter().zip(broadcast_stealers).enumerate() {
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
- registry: registry.clone(),
+ registry: Arc::clone(&registry),
worker,
+ stealer,
index,
};
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
@@ -261,17 +313,18 @@ impl Registry {
// Returning normally now, without termination.
mem::forget(t1000);
- Ok(registry.clone())
+ Ok(registry)
}
pub(super) fn current() -> Arc<Registry> {
unsafe {
let worker_thread = WorkerThread::current();
- if worker_thread.is_null() {
- global_registry().clone()
+ let registry = if worker_thread.is_null() {
+ global_registry()
} else {
- (*worker_thread).registry.clone()
- }
+ &(*worker_thread).registry
+ };
+ Arc::clone(registry)
}
}
@@ -319,19 +372,14 @@ impl Registry {
self.thread_infos.len()
}
- pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
- match self.panic_handler {
- Some(ref handler) => {
- // If the customizable panic handler itself panics,
- // then we abort.
- let abort_guard = unwind::AbortIfPanic;
+ pub(super) fn catch_unwind(&self, f: impl FnOnce()) {
+ if let Err(err) = unwind::halt_unwinding(f) {
+ // If there is no handler, or if that handler itself panics, then we abort.
+ let abort_guard = unwind::AbortIfPanic;
+ if let Some(ref handler) = self.panic_handler {
handler(err);
mem::forget(abort_guard);
}
- None => {
- // Default panic handler aborts.
- let _ = unwind::AbortIfPanic; // let this drop.
- }
}
}
@@ -369,18 +417,16 @@ impl Registry {
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
- self.inject(&[job_ref]);
+ self.inject(job_ref);
}
}
}
/// Push a job into the "external jobs" queue; it will be taken by
- /// whatever worker has nothing to do. Use this is you know that
+ /// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- self.log(|| JobsInjected {
- count: injected_jobs.len(),
- });
+ pub(super) fn inject(&self, injected_job: JobRef) {
+ self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
@@ -395,12 +441,8 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
- for &job_ref in injected_jobs {
- self.injected_jobs.push(job_ref);
- }
-
- self.sleep
- .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ self.injected_jobs.push(injected_job);
+ self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
@@ -422,6 +464,40 @@ impl Registry {
}
}
+ /// Push a job into each thread's own "external jobs" queue; it will be
+ /// executed only on that thread, when it has nothing else to do locally,
+ /// before it tries to steal other work.
+ ///
+ /// **Panics** if not given exactly as many jobs as there are threads.
+ pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
+ assert_eq!(self.num_threads(), injected_jobs.len());
+ self.log(|| JobBroadcast {
+ count: self.num_threads(),
+ });
+ {
+ let broadcasts = self.broadcasts.lock().unwrap();
+
+ // It should not be possible for `state.terminate` to be true
+ // here. It is only set to true when the user creates (and
+ // drops) a `ThreadPool`; and, in that case, they cannot be
+ // calling `inject_broadcast()` later, since they dropped their
+ // `ThreadPool`.
+ debug_assert_ne!(
+ self.terminate_count.load(Ordering::Acquire),
+ 0,
+ "inject_broadcast() sees state.terminate as true"
+ );
+
+ assert_eq!(broadcasts.len(), injected_jobs.len());
+ for (worker, job_ref) in broadcasts.iter().zip(injected_jobs) {
+ worker.push(job_ref);
+ }
+ }
+ for i in 0..self.num_threads() {
+ self.sleep.notify_worker_latch_is_set(i);
+ }
+ }
+
/// If already in a worker-thread of this registry, just execute `op`.
/// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
/// completes and return its return value. If `op` panics, that panic will
@@ -464,9 +540,9 @@ impl Registry {
assert!(injected && !worker_thread.is_null());
op(&*worker_thread, true)
},
- l,
+ LatchRef::new(l),
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
// flush accumulated logs as we exit the thread
@@ -494,7 +570,7 @@ impl Registry {
},
latch,
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
current_thread.wait_until(&job.latch);
job.into_result()
}
@@ -534,7 +610,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- thread_info.terminate.set_and_tickle_one(self, i);
+ unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -591,6 +667,9 @@ pub(super) struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,
+ /// the "stealer" half of the worker's broadcast deque
+ stealer: Stealer<JobRef>,
+
/// local queue used for `spawn_fifo` indirection
fifo: JobFifo,
@@ -608,7 +687,20 @@ pub(super) struct WorkerThread {
// worker is fully unwound. Using an unsafe pointer avoids the need
// for a RefCell<T> etc.
thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
+}
+
+impl From<ThreadBuilder> for WorkerThread {
+ fn from(thread: ThreadBuilder) -> Self {
+ Self {
+ worker: thread.worker,
+ stealer: thread.stealer,
+ fifo: JobFifo::new(),
+ index: thread.index,
+ rng: XorShift64Star::new(),
+ registry: thread.registry,
+ }
+ }
}
impl Drop for WorkerThread {
@@ -681,14 +773,25 @@ impl WorkerThread {
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
- pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ pub(super) fn take_local_job(&self) -> Option<JobRef> {
let popped_job = self.worker.pop();
if popped_job.is_some() {
self.log(|| JobPopped { worker: self.index });
+ return popped_job;
}
- popped_job
+ loop {
+ match self.stealer.steal() {
+ Steal::Success(job) => return Some(job),
+ Steal::Empty => return None,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ fn has_injected_job(&self) -> bool {
+ !self.stealer.is_empty() || self.registry.has_injected_job()
}
/// Wait until the latch is set. Try to keep busy by popping and
@@ -712,23 +815,14 @@ impl WorkerThread {
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- if let Some(job) = self
- .take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
- {
+ if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
} else {
self.registry
.sleep
- .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
+ .no_work_found(&mut idle_state, latch, || self.has_injected_job())
}
}
@@ -744,6 +838,37 @@ impl WorkerThread {
mem::forget(abort_guard); // successful execution, do not abort
}
+ fn find_work(&self) -> Option<JobRef> {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ self.take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ }
+
+ pub(super) fn yield_now(&self) -> Yield {
+ match self.find_work() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
+ pub(super) fn yield_local(&self) -> Yield {
+ match self.take_local_job() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
#[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
@@ -753,7 +878,7 @@ impl WorkerThread {
///
/// This should only be done as a last resort, when there is no
/// local work to do.
- unsafe fn steal(&self) -> Option<JobRef> {
+ fn steal(&self) -> Option<JobRef> {
// we only steal when we don't have any work to do locally
debug_assert!(self.local_deque_is_empty());
@@ -796,18 +921,14 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
- let worker_thread = &WorkerThread {
- worker,
- fifo: JobFifo::new(),
- index,
- rng: XorShift64Star::new(),
- registry: registry.clone(),
- };
+unsafe fn main_loop(thread: ThreadBuilder) {
+ let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
// let registry know we are ready to do work
- registry.thread_infos[index].primed.set();
+ Latch::set(&registry.thread_infos[index].primed);
// Worker threads should not panic. If they do, just abort, as the
// internal state of the threadpool is corrupted. Note that if
@@ -816,13 +937,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we started a thread.
if let Some(ref handler) = registry.start_handler {
- let registry = registry.clone();
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
}
let my_terminate_latch = &registry.thread_infos[index].terminate;
@@ -836,7 +951,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
debug_assert!(worker_thread.take_local_job().is_none());
// let registry know we are done
- registry.thread_infos[index].stopped.set();
+ Latch::set(&registry.thread_infos[index].stopped);
// Normal termination, do not abort.
mem::forget(abort_guard);
@@ -845,13 +960,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
- let registry = registry.clone();
- match unwind::halt_unwinding(|| handler(index)) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(|| handler(index));
// We're already exiting the thread, there's nothing else to do.
}
}
@@ -874,7 +983,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
- global_registry().in_worker_cold(op)
+ global_registry().in_worker(op)
}
}
}
@@ -893,8 +1002,7 @@ impl XorShift64Star {
let mut seed = 0;
while seed == 0 {
let mut hasher = DefaultHasher::new();
- #[allow(deprecated)]
- static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
seed = hasher.finish();
}
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
index 543aa26..f460dd7 100644
--- a/src/scope/mod.rs
+++ b/src/scope/mod.rs
@@ -5,14 +5,15 @@
//! [`in_place_scope()`]: fn.in_place_scope.html
//! [`join()`]: ../join/join.fn.html
-use crate::job::{HeapJob, JobFifo};
+use crate::broadcast::BroadcastContext;
+use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, CountLockLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
@@ -38,7 +39,7 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
-enum ScopeLatch {
+pub(super) enum ScopeLatch {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
@@ -127,7 +128,7 @@ struct ScopeBase<'scope> {
/// Task execution potentially starts as soon as `spawn()` is called.
/// The task will end sometime before `scope()` returns. Note that the
/// *closure* given to scope may return much earlier. In general
-/// the lifetime of a scope created like `scope(body) goes something like this:
+/// the lifetime of a scope created like `scope(body)` goes something like this:
///
/// - Scope begins when `scope(body)` is called
/// - Scope body `body()` is invoked
@@ -241,7 +242,7 @@ struct ScopeBase<'scope> {
/// });
///
/// // That closure is fine, but now we can't use `ok` anywhere else,
-/// // since it is owend by the previous task:
+/// // since it is owned by the previous task:
/// // s.spawn(|_| println!("ok: {:?}", ok));
/// });
/// ```
@@ -495,7 +496,7 @@ impl<'scope> Scope<'scope> {
/// the stack (if those variables outlive the scope) or
/// communicate through shared channels.
///
- /// (The intention is to eventualy integrate with Rust futures to
+ /// (The intention is to eventually integrate with Rust futures to
/// support spawns of functions that compute a value.)
///
/// # Examples
@@ -538,18 +539,37 @@ impl<'scope> Scope<'scope> {
where
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // Since `Scope` implements `Sync`, we can't be sure that we're still in a
- // thread of this pool, so we can't just push to the local worker thread.
- // Also, this might be an in-place scope.
- self.base.registry.inject_or_push(job_ref);
- }
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // Since `Scope` implements `Sync`, we can't be sure that we're still in a
+ // thread of this pool, so we can't just push to the local worker thread.
+ // Also, this might be an in-place scope.
+ self.base.registry.inject_or_push(job_ref);
+ }
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
}
}
@@ -579,24 +599,44 @@ impl<'scope> ScopeFifo<'scope> {
where
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
- self.base.increment();
- unsafe {
- let job_ref = Box::new(HeapJob::new(move || {
- self.base.execute_job(move || body(self))
- }))
- .as_job_ref();
-
- // If we're in the pool, use our scope's private fifo for this thread to execute
- // in a locally-FIFO order. Otherwise, just use the pool's global injector.
- match self.base.registry.current_thread() {
- Some(worker) => {
- let fifo = &self.fifos[worker.index()];
- worker.push(fifo.push(job_ref));
- }
- None => self.base.registry.inject(&[job_ref]),
+ let scope_ptr = ScopePtr(self);
+ let job = HeapJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
+ });
+ let job_ref = self.base.heap_job_ref(job);
+
+ // If we're in the pool, use our scope's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match self.base.registry.current_thread() {
+ Some(worker) => {
+ let fifo = &self.fifos[worker.index()];
+ // SAFETY: this job will execute before the scope ends.
+ unsafe { worker.push(fifo.push(job_ref)) };
}
+ None => self.base.registry.inject(job_ref),
}
}
+
+ /// Spawns a job into every thread of the fork-join scope `self`. This job will
+ /// execute on each thread sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its own reference
+ /// to the scope `self` as argument, as well as a `BroadcastContext`.
+ pub fn spawn_broadcast<BODY>(&self, body: BODY)
+ where
+ BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
+ {
+ let scope_ptr = ScopePtr(self);
+ let job = ArcJob::new(move || unsafe {
+ // SAFETY: this job will execute before the scope ends.
+ let scope = scope_ptr.as_ref();
+ let body = &body;
+ let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
+ ScopeBase::execute_job(&scope.base, func)
+ });
+ self.base.inject_broadcast(job)
+ }
}
impl<'scope> ScopeBase<'scope> {
@@ -619,13 +659,36 @@ impl<'scope> ScopeBase<'scope> {
self.job_completed_latch.increment();
}
+ fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
+ where
+ FUNC: FnOnce() + Send + 'scope,
+ {
+ unsafe {
+ self.increment();
+ job.into_job_ref()
+ }
+ }
+
+ fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
+ where
+ FUNC: Fn() + Send + Sync + 'scope,
+ {
+ let n_threads = self.registry.num_threads();
+ let job_refs = (0..n_threads).map(|_| unsafe {
+ self.increment();
+ ArcJob::as_job_ref(&job)
+ });
+
+ self.registry.inject_broadcast(job_refs);
+ }
+
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
where
FUNC: FnOnce() -> R,
{
- let result = self.execute_job_closure(func);
+ let result = unsafe { Self::execute_job_closure(self, func) };
self.job_completed_latch.wait(owner);
self.maybe_propagate_panic();
result.unwrap() // only None if `op` panicked, and that would have been propagated
@@ -633,28 +696,28 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- fn execute_job<FUNC>(&self, func: FUNC)
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
where
FUNC: FnOnce(),
{
- let _: Option<()> = self.execute_job_closure(func);
+ let _: Option<()> = Self::execute_job_closure(this, func);
}
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_latch.set();
+ Latch::set(&(*this).job_completed_latch);
Some(r)
}
Err(err) => {
- self.job_panicked(err);
- self.job_completed_latch.set();
+ (*this).job_panicked(err);
+ Latch::set(&(*this).job_completed_latch);
None
}
}
@@ -662,14 +725,20 @@ impl<'scope> ScopeBase<'scope> {
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
- let nil = ptr::null_mut();
- let mut err = Box::new(err); // box up the fat ptr
- if self
- .panic
- .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- mem::forget(err); // ownership now transferred into self.panic
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
}
}
@@ -687,14 +756,18 @@ impl<'scope> ScopeBase<'scope> {
impl ScopeLatch {
fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
match owner {
Some(owner) => ScopeLatch::Stealing {
- latch: CountLatch::new(),
+ latch: CountLatch::with_count(count),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => ScopeLatch::Blocking {
- latch: CountLockLatch::new(),
+ latch: CountLockLatch::with_count(count),
},
}
}
@@ -706,18 +779,7 @@ impl ScopeLatch {
}
}
- fn set(&self) {
- match self {
- ScopeLatch::Stealing {
- latch,
- registry,
- worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
- }
- }
-
- fn wait(&self, owner: Option<&WorkerThread>) {
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match self {
ScopeLatch::Stealing {
latch,
@@ -734,6 +796,19 @@ impl ScopeLatch {
}
}
+impl Latch for ScopeLatch {
+ unsafe fn set(this: *const Self) {
+ match &*this {
+ ScopeLatch::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
+ ScopeLatch::Blocking { latch } => Latch::set(latch),
+ }
+ }
+}
+
impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scope")
@@ -769,3 +844,22 @@ impl fmt::Debug for ScopeLatch {
}
}
}
+
+/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
+///
+/// Unsafe code is still required to dereference the pointer, but that's fine in
+/// scope jobs that are guaranteed to execute before the scope ends.
+struct ScopePtr<T>(*const T);
+
+// SAFETY: !Send for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Send for ScopePtr<T> {}
+
+// SAFETY: !Sync for raw pointers is not for safety, just as a lint
+unsafe impl<T: Sync> Sync for ScopePtr<T> {}
+
+impl<T> ScopePtr<T> {
+ // Helper to avoid disjoint captures of `scope_ptr.0`
+ unsafe fn as_ref(&self) -> &T {
+ &*self.0
+ }
+}
diff --git a/src/scope/test.rs b/src/scope/test.rs
index 471d78e..ad8c4af 100644
--- a/src/scope/test.rs
+++ b/src/scope/test.rs
@@ -6,7 +6,7 @@ use rand_xorshift::XorShiftRng;
use std::cmp;
use std::iter::once;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex;
+use std::sync::{Barrier, Mutex};
use std::vec;
#[test]
@@ -42,7 +42,7 @@ fn scope_divide_and_conquer() {
scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
let counter_s = &AtomicUsize::new(0);
- divide_and_conquer_seq(&counter_s, 1024);
+ divide_and_conquer_seq(counter_s, 1024);
let p = counter_p.load(Ordering::SeqCst);
let s = counter_s.load(Ordering::SeqCst);
@@ -75,7 +75,7 @@ struct Tree<T: Send> {
}
impl<T: Send> Tree<T> {
- fn iter<'s>(&'s self) -> vec::IntoIter<&'s T> {
+ fn iter(&self) -> vec::IntoIter<&T> {
once(&self.value)
.chain(self.children.iter().flat_map(Tree::iter))
.collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
@@ -148,6 +148,7 @@ fn update_tree() {
/// linearly with N. We test this by some unsafe hackery and
/// permitting an approx 10% change with a 10x input change.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn linear_stack_growth() {
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = builder.build().unwrap();
@@ -213,6 +214,7 @@ fn panic_propagate_nested_scope_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_1() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -227,6 +229,7 @@ fn panic_propagate_still_execute_1() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_2() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -241,11 +244,12 @@ fn panic_propagate_still_execute_2() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_3() {
let mut x = false;
match unwind::halt_unwinding(|| {
scope(|s| {
- s.spawn(|_| x = true); // spanwed job should still execute despite later panic
+ s.spawn(|_| x = true); // spawned job should still execute despite later panic
panic!("Hello, world!");
});
}) {
@@ -255,6 +259,7 @@ fn panic_propagate_still_execute_3() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_4() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -292,16 +297,18 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
- // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_order!(scope => spawn);
let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
assert_eq!(vec, expected);
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
- // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
assert_eq!(vec, expected);
@@ -334,22 +341,25 @@ macro_rules! test_nested_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_order() {
- // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ // In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_nested_order!(scope => spawn, scope => spawn);
let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
assert_eq!(vec, expected);
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_order() {
- // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ // In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
assert_eq!(vec, expected);
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
@@ -361,6 +371,7 @@ fn nested_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
@@ -403,6 +414,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -412,6 +424,7 @@ fn mixed_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
let expected = vec![-1, 0, -2, 1, -3, 2, 3];
@@ -419,6 +432,7 @@ fn mixed_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -428,6 +442,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
@@ -513,3 +528,92 @@ fn mixed_lifetime_scope_fifo() {
increment(&[&counter; 100]);
assert_eq!(counter.into_inner(), 100);
}
+
+#[test]
+fn scope_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_fifo_spawn_broadcast() {
+ let sum = AtomicUsize::new(0);
+ let n = scope_fifo(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * (n - 1) / 2);
+}
+
+#[test]
+fn scope_spawn_broadcast_nested() {
+ let sum = AtomicUsize::new(0);
+ let n = scope(|s| {
+ s.spawn_broadcast(|s, _| {
+ s.spawn_broadcast(|_, ctx| {
+ sum.fetch_add(ctx.index(), Ordering::Relaxed);
+ });
+ });
+ crate::current_num_threads()
+ });
+ assert_eq!(sum.into_inner(), n * n * (n - 1) / 2);
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_barrier() {
+ let barrier = Barrier::new(8);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ pool.in_place_scope(|s| {
+ s.spawn_broadcast(|_, _| {
+ barrier.wait();
+ });
+ barrier.wait();
+ });
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_one() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() == 3 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn scope_spawn_broadcast_panic_many() {
+ let count = AtomicUsize::new(0);
+ let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
+ let result = crate::unwind::halt_unwinding(|| {
+ pool.scope(|s| {
+ s.spawn_broadcast(|_, ctx| {
+ count.fetch_add(1, Ordering::Relaxed);
+ if ctx.index() % 2 == 0 {
+ panic!("Hello, world!");
+ }
+ });
+ });
+ });
+ assert_eq!(count.into_inner(), 7);
+ assert!(result.is_err(), "broadcast panic should propagate!");
+}
diff --git a/src/sleep/README.md b/src/sleep/README.md
index c62c397..55426c8 100644
--- a/src/sleep/README.md
+++ b/src/sleep/README.md
@@ -75,7 +75,7 @@ These counters are adjusted as follows:
* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
* Subtle point: the thread that *awakens* the sleeping thread decrements the
counter, not the thread that is *sleeping*. This is because there is a delay
- between siganling a thread to wake and the thread actually waking:
+ between signaling a thread to wake and the thread actually waking:
decrementing the counter when awakening the thread means that other threads
that may be posting work will see the up-to-date value that much faster.
* When a thread finds work, exiting the idle state: decrement the inactive
@@ -137,7 +137,7 @@ The full protocol for a thread to fall asleep is as follows:
above, remembering the `final_value` of the JEC. It does one more search for work.
* If no work is found, the thread atomically:
* Checks the JEC to see that it has not changed from `final_value`.
- * If it has, then the thread goes back to searchin for work. We reset to
+ * If it has, then the thread goes back to searching for work. We reset to
just before we got sleepy, so that we will do one more search
before attending to sleep again (rather than searching for many rounds).
* Increments the number of sleeping threads by 1.
diff --git a/src/sleep/counters.rs b/src/sleep/counters.rs
index 879d32e..f2a3de3 100644
--- a/src/sleep/counters.rs
+++ b/src/sleep/counters.rs
@@ -53,14 +53,20 @@ impl JobsEventCounter {
}
/// Number of bits used for the thread counters.
-const THREADS_BITS: usize = 10;
+#[cfg(target_pointer_width = "64")]
+const THREADS_BITS: usize = 16;
+
+#[cfg(target_pointer_width = "32")]
+const THREADS_BITS: usize = 8;
/// Bits to shift to select the sleeping threads
/// (used with `select_bits`).
+#[allow(clippy::erasing_op)]
const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
/// Bits to shift to select the inactive threads
/// (used with `select_bits`).
+#[allow(clippy::identity_op)]
const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
/// Bits to shift to select the JEC
@@ -68,7 +74,7 @@ const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
const JEC_SHIFT: usize = 2 * THREADS_BITS;
/// Max value for the thread counters.
-const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
+pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
/// Constant that can be added to add one sleeping thread.
const ONE_SLEEPING: usize = 1;
diff --git a/src/sleep/mod.rs b/src/sleep/mod.rs
index 2aa262c..af7225a 100644
--- a/src/sleep/mod.rs
+++ b/src/sleep/mod.rs
@@ -11,6 +11,7 @@ use std::thread;
use std::usize;
mod counters;
+pub(crate) use self::counters::THREADS_MAX;
use self::counters::{AtomicCounters, JobsEventCounter};
/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
@@ -62,6 +63,7 @@ const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
impl Sleep {
pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
+ assert!(n_threads <= THREADS_MAX);
Sleep {
logger,
worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
@@ -157,7 +159,7 @@ impl Sleep {
debug_assert!(!*is_blocked);
// Our latch was signalled. We should wake back up fully as we
- // wil have some stuff to do.
+ // will have some stuff to do.
if !latch.fall_asleep() {
self.logger.log(|| ThreadSleepInterruptedByLatch {
worker: worker_index,
diff --git a/src/spawn/mod.rs b/src/spawn/mod.rs
index 0006103..1aa9edb 100644
--- a/src/spawn/mod.rs
+++ b/src/spawn/mod.rs
@@ -73,7 +73,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
@@ -91,19 +91,14 @@ where
// executed. This ref is decremented at the (*) below.
registry.increment_terminate_count();
- Box::new(HeapJob::new({
- let registry = registry.clone();
+ HeapJob::new({
+ let registry = Arc::clone(registry);
move || {
- match unwind::halt_unwinding(func) {
- Ok(()) => {}
- Err(err) => {
- registry.handle_panic(err);
- }
- }
+ registry.catch_unwind(func);
registry.terminate(); // (*) permit registry to terminate now
}
- }))
- .as_job_ref()
+ })
+ .into_static_job_ref()
}
/// Fires off a task into the Rayon threadpool in the "static" or
@@ -148,7 +143,7 @@ where
F: FnOnce() + Send + 'static,
{
// We assert that this does not hold any references (we know
- // this because of the `'static` bound in the inferface);
+ // this because of the `'static` bound in the interface);
// moreover, we assert that the code below is not supposed to
// be able to panic, and hence the data won't leak but will be
// enqueued into some deque for later execution.
@@ -159,7 +154,7 @@ where
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
- None => registry.inject(&[job_ref]),
+ None => registry.inject(job_ref),
}
mem::forget(abort_guard);
}
diff --git a/src/spawn/test.rs b/src/spawn/test.rs
index 9c59754..b7a0535 100644
--- a/src/spawn/test.rs
+++ b/src/spawn/test.rs
@@ -7,6 +7,7 @@ use super::{spawn, spawn_fifo};
use crate::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_in_worker() {
let (tx, rx) = channel();
scope(move |_| {
@@ -16,6 +17,7 @@ fn spawn_then_join_in_worker() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_outside_worker() {
let (tx, rx) = channel();
spawn(move || tx.send(22).unwrap());
@@ -23,6 +25,7 @@ fn spawn_then_join_outside_worker() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_fwd() {
let (tx, rx) = channel();
@@ -54,6 +57,7 @@ fn panic_fwd() {
/// still active asynchronous tasks. We expect the thread-pool to stay
/// alive and executing until those threads are complete.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn termination_while_things_are_executing() {
let (tx0, rx0) = channel();
let (tx1, rx1) = channel();
@@ -80,6 +84,7 @@ fn termination_while_things_are_executing() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_spawn() {
let (tx, rx) = channel();
@@ -107,6 +112,7 @@ fn custom_panic_handler_and_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_nested_spawn() {
let (tx, rx) = channel();
@@ -165,22 +171,25 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
- // In the absense of stealing, `spawn()` jobs on a thread will run in LIFO order.
+ // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
let vec = test_order!(spawn, spawn);
let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
assert_eq!(vec, expected);
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
- // In the absense of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
+ // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
let vec = test_order!(spawn_fifo, spawn_fifo);
let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
assert_eq!(vec, expected);
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_order!(spawn, spawn_fifo);
@@ -192,6 +201,7 @@ fn lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_order!(spawn_fifo, spawn);
@@ -229,6 +239,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
let vec = test_mixed_order!(spawn, spawn_fifo);
let expected = vec![3, -1, 2, -2, 1, -3, 0];
@@ -236,6 +247,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(spawn_fifo, spawn);
let expected = vec![0, -3, 1, -2, 2, -1, 3];
diff --git a/src/test.rs b/src/test.rs
index 015d3ec..25b8487 100644
--- a/src/test.rs
+++ b/src/test.rs
@@ -1,12 +1,11 @@
#![cfg(test)]
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn worker_thread_index() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -16,14 +15,15 @@ fn worker_thread_index() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn start_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
// Wait for all the threads in the pool plus the one running tests.
let barrier = Arc::new(Barrier::new(n_threads + 1));
- let b = barrier.clone();
- let nc = n_called.clone();
+ let b = Arc::clone(&barrier);
+ let nc = Arc::clone(&n_called);
let start_handler = move |_| {
nc.fetch_add(1, Ordering::SeqCst);
b.wait();
@@ -42,14 +42,15 @@ fn start_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn exit_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
// Wait for all the threads in the pool plus the one running tests.
let barrier = Arc::new(Barrier::new(n_threads + 1));
- let b = barrier.clone();
- let nc = n_called.clone();
+ let b = Arc::clone(&barrier);
+ let nc = Arc::clone(&n_called);
let exit_handler = move |_| {
nc.fetch_add(1, Ordering::SeqCst);
b.wait();
@@ -71,6 +72,7 @@ fn exit_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn handler_panics_handled_correctly() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -85,9 +87,9 @@ fn handler_panics_handled_correctly() {
panic!("ensure panic handler is called when exiting");
};
- let sb = start_barrier.clone();
- let eb = exit_barrier.clone();
- let nc = n_called.clone();
+ let sb = Arc::clone(&start_barrier);
+ let eb = Arc::clone(&exit_barrier);
+ let nc = Arc::clone(&n_called);
let panic_handler = move |_| {
let val = nc.fetch_add(1, Ordering::SeqCst);
if val < n_threads {
@@ -121,7 +123,7 @@ fn handler_panics_handled_correctly() {
}
#[test]
-#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_config_build() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -137,6 +139,7 @@ fn check_error_send_sync() {
#[allow(deprecated)]
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn configuration() {
let start_handler = move |_| {};
let exit_handler = move |_| {};
@@ -144,7 +147,7 @@ fn configuration() {
let thread_name = move |i| format!("thread_name_{}", i);
// Ensure we can call all public methods on Configuration
- Configuration::new()
+ crate::Configuration::new()
.thread_name(thread_name)
.num_threads(5)
.panic_handler(panic_handler)
@@ -157,6 +160,7 @@ fn configuration() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn default_pool() {
ThreadPoolBuilder::default().build().unwrap();
}
@@ -165,6 +169,7 @@ fn default_pool() {
/// the pool is done with them, allowing them to be used with rayon again
/// later. e.g. WebAssembly want to have their own pool of available threads.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
let n_threads = 5;
let mut handles = vec![];
diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs
index 5edaedc..c37826e 100644
--- a/src/thread_pool/mod.rs
+++ b/src/thread_pool/mod.rs
@@ -3,12 +3,11 @@
//!
//! [`ThreadPool`]: struct.ThreadPool.html
+use crate::broadcast::{self, BroadcastContext};
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{scope, Scope};
use crate::{scope_fifo, ScopeFifo};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
@@ -57,7 +56,7 @@ impl ThreadPool {
#[deprecated(note = "Use `ThreadPoolBuilder::build`")]
#[allow(deprecated)]
/// Deprecated in favor of `ThreadPoolBuilder::build`.
- pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
+ pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
Self::build(configuration.into_builder()).map_err(Box::from)
}
@@ -111,6 +110,57 @@ impl ThreadPool {
self.registry.in_worker(|_, _| op())
}
+ /// Executes `op` within every thread in the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate within that
+ /// threadpool.
+ ///
+ /// Broadcasts are executed on each thread after they have exhausted their
+ /// local work queue, before they attempt work-stealing from other threads.
+ /// The goal of that strategy is to run everywhere in a timely manner
+ /// *without* being too disruptive to current work. There may be alternative
+ /// broadcast styles added in the future for more or less aggressive
+ /// injection, if the need arises.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic on one or more threads, exactly one panic
+ /// will be propagated, only after all threads have completed
+ /// (or panicked) their own `op`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// use std::sync::atomic::{AtomicUsize, Ordering};
+ ///
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
+ ///
+ /// // The argument gives context, including the index of each thread.
+ /// let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
+ /// assert_eq!(v, &[0, 1, 4, 9, 16]);
+ ///
+ /// // The closure can reference the local stack
+ /// let count = AtomicUsize::new(0);
+ /// pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
+ /// assert_eq!(count.into_inner(), 5);
+ /// }
+ /// ```
+ pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
+ where
+ OP: Fn(BroadcastContext<'_>) -> R + Sync,
+ R: Send,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::broadcast_in(op, &self.registry) }
+ }
+
/// Returns the (current) number of threads in the thread pool.
///
/// # Future compatibility note
@@ -277,6 +327,42 @@ impl ThreadPool {
// We assert that `self.registry` has not terminated.
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
}
+
+ /// Spawns an asynchronous task on every thread in this thread-pool. This task
+ /// will run in the implicit, global scope, which means that it may outlast the
+ /// current stack frame -- therefore, it cannot capture any references onto the
+ /// stack (you will likely need a `move` closure).
+ pub fn spawn_broadcast<OP>(&self, op: OP)
+ where
+ OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
+ }
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
}
impl Drop for ThreadPool {
@@ -338,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
Some(!curr.local_deque_is_empty())
}
}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs
index 1510e37..6143e57 100644
--- a/src/thread_pool/test.rs
+++ b/src/thread_pool/test.rs
@@ -4,8 +4,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
-#[allow(deprecated)]
-use crate::Configuration;
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
#[test]
@@ -18,6 +16,7 @@ fn panic_propagate() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
@@ -28,7 +27,7 @@ fn workers_stop() {
// do some work on these threads
join_a_lot(22);
- thread_pool.registry.clone()
+ Arc::clone(&thread_pool.registry)
});
assert_eq!(registry.num_threads(), 22);
}
@@ -45,6 +44,7 @@ fn join_a_lot(n: usize) {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
@@ -53,7 +53,7 @@ fn sleeper_stop() {
{
// once we exit this block, thread-pool will be dropped
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
- registry = thread_pool.registry.clone();
+ registry = Arc::clone(&thread_pool.registry);
// Give time for at least some of the thread pool to fall asleep.
thread::sleep(time::Duration::from_secs(1));
@@ -67,7 +67,7 @@ fn sleeper_stop() {
/// Creates a start/exit handler that increments an atomic counter.
fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
let count = Arc::new(AtomicUsize::new(0));
- (count.clone(), move |_| {
+ (Arc::clone(&count), move |_| {
count.fetch_add(1, Ordering::SeqCst);
})
}
@@ -91,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
// Note: we first tried to force failure with a `usize::MAX` stack, but
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
@@ -117,6 +118,7 @@ fn failed_thread_stack() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
@@ -141,6 +143,7 @@ fn panic_thread_name() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -149,6 +152,7 @@ fn self_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -168,6 +172,7 @@ fn mutual_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
@@ -196,8 +201,9 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
- let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
+ let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
}
@@ -221,6 +227,7 @@ macro_rules! test_scope_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -228,6 +235,7 @@ fn scope_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -252,6 +260,7 @@ macro_rules! test_spawn_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -259,6 +268,7 @@ fn spawn_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -266,6 +276,7 @@ fn spawn_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
@@ -302,6 +313,7 @@ fn nested_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
@@ -338,6 +350,7 @@ fn nested_fifo_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -353,6 +366,7 @@ fn in_place_scope_no_deadlock() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -366,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
rx_ref.recv().unwrap();
});
}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
diff --git a/tests/double_init_fail.rs b/tests/double_init_fail.rs
index ea06bf0..1591530 100644
--- a/tests/double_init_fail.rs
+++ b/tests/double_init_fail.rs
@@ -2,9 +2,10 @@ use rayon_core::ThreadPoolBuilder;
use std::error::Error;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn double_init_fail() {
let result1 = ThreadPoolBuilder::new().build_global();
- assert_eq!(result1.unwrap(), ());
+ assert!(result1.is_ok());
let err = ThreadPoolBuilder::new().build_global().unwrap_err();
assert!(err.source().is_none());
assert_eq!(
diff --git a/tests/init_zero_threads.rs b/tests/init_zero_threads.rs
index ebd73c5..3c1ad25 100644
--- a/tests/init_zero_threads.rs
+++ b/tests/init_zero_threads.rs
@@ -1,6 +1,7 @@
use rayon_core::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn init_zero_threads() {
ThreadPoolBuilder::new()
.num_threads(0)
diff --git a/tests/scoped_threadpool.rs b/tests/scoped_threadpool.rs
index db3d0b8..534e8bb 100644
--- a/tests/scoped_threadpool.rs
+++ b/tests/scoped_threadpool.rs
@@ -7,6 +7,7 @@ struct Local(i32);
scoped_tls::scoped_thread_local!(static LOCAL: Local);
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn missing_scoped_tls() {
LOCAL.set(&Local(42), || {
let pool = ThreadPoolBuilder::new()
@@ -21,6 +22,7 @@ fn missing_scoped_tls() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
@@ -63,6 +65,7 @@ fn spawn_scoped_tls_threadpool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn build_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
diff --git a/tests/stack_overflow_crash.rs b/tests/stack_overflow_crash.rs
index 6128898..7dcde43 100644
--- a/tests/stack_overflow_crash.rs
+++ b/tests/stack_overflow_crash.rs
@@ -1,13 +1,14 @@
use rayon_core::ThreadPoolBuilder;
use std::env;
-use std::process::Command;
+use std::process::{Command, ExitStatus, Stdio};
#[cfg(target_os = "linux")]
use std::os::unix::process::ExitStatusExt;
fn force_stack_overflow(depth: u32) {
- let _buffer = [0u8; 1024 * 1024];
+ let mut buffer = [0u8; 1024 * 1024];
+ std::hint::black_box(&mut buffer);
if depth > 0 {
force_stack_overflow(depth - 1);
}
@@ -34,49 +35,63 @@ fn overflow_code() -> Option<i32> {
#[cfg(windows)]
fn overflow_code() -> Option<i32> {
use std::os::windows::process::ExitStatusExt;
- use std::process::ExitStatus;
ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code()
}
-fn main() {
- if env::args().len() == 1 {
- // first check that the recursivecall actually causes a stack overflow, and does not get optimized away
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("8")
- .status()
- .unwrap();
+#[test]
+#[cfg_attr(not(any(unix, windows)), ignore)]
+fn stack_overflow_crash() {
+ // First check that the recursive call actually causes a stack overflow,
+ // and does not get optimized away.
+ let status = run_ignored("run_with_small_stack");
+ assert!(!status.success());
+ #[cfg(any(unix, windows))]
+ assert_eq!(status.code(), overflow_code());
+ #[cfg(target_os = "linux")]
+ assert!(matches!(
+ status.signal(),
+ Some(libc::SIGABRT | libc::SIGSEGV)
+ ));
- #[cfg(any(unix, windows))]
- assert_eq!(status.code(), overflow_code());
+ // Now run with a larger stack and verify correct operation.
+ let status = run_ignored("run_with_large_stack");
+ assert_eq!(status.code(), Some(0));
+ #[cfg(target_os = "linux")]
+ assert_eq!(status.signal(), None);
+}
- #[cfg(target_os = "linux")]
- assert!(
- status.signal() == Some(11 /*SIGABRT*/) || status.signal() == Some(6 /*SIGSEGV*/)
- );
- }
+fn run_ignored(test: &str) -> ExitStatus {
+ Command::new(env::current_exe().unwrap())
+ .arg("--ignored")
+ .arg("--exact")
+ .arg(test)
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status()
+ .unwrap()
+}
- // now run with a larger stack and verify correct operation
- {
- let status = Command::new(env::current_exe().unwrap())
- .arg("48")
- .status()
- .unwrap();
- assert_eq!(status.code(), Some(0));
- #[cfg(target_os = "linux")]
- assert_eq!(status.signal(), None);
- }
- } else {
- let stack_size_in_mb: usize = env::args().nth(1).unwrap().parse().unwrap();
- let pool = ThreadPoolBuilder::new()
- .stack_size(stack_size_in_mb * 1024 * 1024)
- .build()
- .unwrap();
- pool.install(|| {
- #[cfg(unix)]
- disable_core();
- force_stack_overflow(32);
- });
- }
+#[test]
+#[ignore]
+fn run_with_small_stack() {
+ run_with_stack(8);
+}
+
+#[test]
+#[ignore]
+fn run_with_large_stack() {
+ run_with_stack(48);
+}
+
+fn run_with_stack(stack_size_in_mb: usize) {
+ let pool = ThreadPoolBuilder::new()
+ .stack_size(stack_size_in_mb * 1024 * 1024)
+ .build()
+ .unwrap();
+ pool.install(|| {
+ #[cfg(unix)]
+ disable_core();
+ force_stack_overflow(32);
+ });
}