aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2024-02-06 13:01:42 +0100
committerJeff Vander Stoep <jeffv@google.com>2024-02-06 13:01:43 +0100
commit96905156f39d29938fabf7cb278c4f8c52955af5 (patch)
treec23f768a86f6dec3105f8f8420f4b742e9cf3044
parent76cd97482d403c63d42bbd4e31bfba39442db1d1 (diff)
downloadrayon-core-96905156f39d29938fabf7cb278c4f8c52955af5.tar.gz
Upgrade rayon-core to 1.12.1emu-34-3-release
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/rayon-core For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I4844dc36a655701157d377fbbbe8619abcce504c
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp4
-rw-r--r--Cargo.toml17
-rw-r--r--Cargo.toml.orig19
-rw-r--r--METADATA25
-rw-r--r--README.md2
-rw-r--r--src/broadcast/mod.rs5
-rw-r--r--src/broadcast/test.rs23
-rw-r--r--src/latch.rs171
-rw-r--r--src/lib.rs84
-rw-r--r--src/log.rs413
-rw-r--r--src/registry.rs177
-rw-r--r--src/scope/mod.rs116
-rw-r--r--src/sleep/counters.rs4
-rw-r--r--src/sleep/mod.rs97
-rw-r--r--src/spawn/mod.rs4
-rw-r--r--src/thread_pool/mod.rs37
-rw-r--r--src/thread_pool/test.rs4
-rw-r--r--tests/scoped_threadpool.rs2
-rw-r--r--tests/use_current_thread.rs57
20 files changed, 419 insertions, 844 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 2a30505..f0dc383 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "6236214d717694917e77aa1c16d91176b9bc2fff"
+ "sha1": "7df6d5b8493a5fdb257565cab26dacdfe08ec8aa"
},
"path_in_vcs": "rayon-core"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 64d1452..9b8ff99 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,14 +42,12 @@ rust_library {
host_supported: true,
crate_name: "rayon_core",
cargo_env_compat: true,
- cargo_pkg_version: "1.11.0",
+ cargo_pkg_version: "1.12.1",
srcs: ["src/lib.rs"],
edition: "2021",
rustlibs: [
- "libcrossbeam_channel",
"libcrossbeam_deque",
"libcrossbeam_utils",
- "libnum_cpus",
],
apex_available: [
"//apex_available:platform",
diff --git a/Cargo.toml b/Cargo.toml
index d41715e..a3a11c2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,9 @@
[package]
edition = "2021"
-rust-version = "1.59"
+rust-version = "1.63"
name = "rayon-core"
-version = "1.11.0"
+version = "1.12.1"
authors = [
"Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>",
@@ -58,8 +58,9 @@ path = "tests/simple_panic.rs"
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
-[dependencies.crossbeam-channel]
-version = "0.5.0"
+[[test]]
+name = "use_current_thread"
+path = "tests/use_current_thread.rs"
[dependencies.crossbeam-deque]
version = "0.8.1"
@@ -67,8 +68,9 @@ version = "0.8.1"
[dependencies.crossbeam-utils]
version = "0.8.0"
-[dependencies.num_cpus]
-version = "1.2"
+[dependencies.wasm_sync]
+version = "0.1.0"
+optional = true
[dev-dependencies.rand]
version = "0.8"
@@ -79,5 +81,8 @@ version = "0.3"
[dev-dependencies.scoped-tls]
version = "1.0"
+[features]
+web_spin_lock = ["dep:wasm_sync"]
+
[target."cfg(unix)".dev-dependencies.libc]
version = "0.2"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 920ffe5..09f8bf6 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,13 +1,13 @@
[package]
name = "rayon-core"
-version = "1.11.0"
+version = "1.12.1"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rayon-rs/rayon"
documentation = "https://docs.rs/rayon/"
-rust-version = "1.59"
+rust-version = "1.63"
edition = "2021"
links = "rayon-core"
build = "build.rs"
@@ -17,10 +17,17 @@ categories = ["concurrency"]
# Some dependencies may not be their latest version, in order to support older rustc.
[dependencies]
-num_cpus = "1.2"
-crossbeam-channel = "0.5.0"
crossbeam-deque = "0.8.1"
crossbeam-utils = "0.8.0"
+wasm_sync = { version = "0.1.0", optional = true }
+
+[features]
+
+# This feature switches to a spin-lock implementation on the browser's
+# main thread to avoid the forbidden `atomics.wait`.
+#
+# Only useful on the `wasm32-unknown-unknown` target.
+web_spin_lock = ["dep:wasm_sync"]
[dev-dependencies]
rand = "0.8"
@@ -55,3 +62,7 @@ path = "tests/simple_panic.rs"
[[test]]
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
+
+[[test]]
+name = "use_current_thread"
+path = "tests/use_current_thread.rs"
diff --git a/METADATA b/METADATA
index e9917f9..1eb7dcd 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
# This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/rayon-core
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/rayon-core
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
name: "rayon-core"
description: "Core APIs for Rayon"
third_party {
- url {
- type: HOMEPAGE
- value: "https://crates.io/crates/rayon-core"
- }
- url {
- type: ARCHIVE
- value: "https://static.crates.io/crates/rayon-core/rayon-core-1.11.0.crate"
- }
- version: "1.11.0"
license_type: NOTICE
last_upgrade_date {
- year: 2023
- month: 4
- day: 3
+ year: 2024
+ month: 2
+ day: 6
+ }
+ homepage: "https://crates.io/crates/rayon-core"
+ identifier {
+ type: "Archive"
+ value: "https://static.crates.io/crates/rayon-core/rayon-core-1.12.1.crate"
+ version: "1.12.1"
}
}
diff --git a/README.md b/README.md
index 448901b..6e2ebe2 100644
--- a/README.md
+++ b/README.md
@@ -8,4 +8,4 @@ Please see [Rayon Docs] for details about using Rayon.
[Rayon Docs]: https://docs.rs/rayon/
-Rayon-core currently requires `rustc 1.59.0` or greater.
+Rayon-core currently requires `rustc 1.63.0` or greater.
diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs
index d991c54..96611e4 100644
--- a/src/broadcast/mod.rs
+++ b/src/broadcast/mod.rs
@@ -1,7 +1,6 @@
use crate::job::{ArcJob, StackJob};
-use crate::latch::LatchRef;
+use crate::latch::{CountLatch, LatchRef};
use crate::registry::{Registry, WorkerThread};
-use crate::scope::ScopeLatch;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -107,7 +106,7 @@ where
let n_threads = registry.num_threads();
let current_thread = WorkerThread::current().as_ref();
- let latch = ScopeLatch::with_count(n_threads, current_thread);
+ let latch = CountLatch::with_count(n_threads, current_thread);
let jobs: Vec<_> = (0..n_threads)
.map(|_| StackJob::new(&f, LatchRef::new(&latch)))
.collect();
diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs
index 3ae11f7..00ab4ad 100644
--- a/src/broadcast/test.rs
+++ b/src/broadcast/test.rs
@@ -2,6 +2,7 @@
use crate::ThreadPoolBuilder;
use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::channel;
use std::sync::Arc;
use std::{thread, time};
@@ -14,7 +15,7 @@ fn broadcast_global() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
let mut v: Vec<_> = rx.into_iter().collect();
@@ -33,7 +34,7 @@ fn broadcast_pool() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@@ -53,7 +54,7 @@ fn broadcast_self() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
@@ -81,7 +82,7 @@ fn broadcast_mutual() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool1.spawn({
@@ -118,7 +119,7 @@ fn broadcast_mutual_sleepy() {
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
pool1.spawn({
@@ -158,8 +159,8 @@ fn broadcast_panic_one() {
#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
- let (tx, rx) = crossbeam_channel::unbounded();
- let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
+ let (panic_tx, panic_rx) = channel();
let pool = ThreadPoolBuilder::new()
.num_threads(7)
.panic_handler(move |e| panic_tx.send(e).unwrap())
@@ -196,8 +197,8 @@ fn broadcast_panic_many() {
#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
- let (tx, rx) = crossbeam_channel::unbounded();
- let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
+ let (panic_tx, panic_rx) = channel();
let pool = ThreadPoolBuilder::new()
.num_threads(7)
.panic_handler(move |e| panic_tx.send(e).unwrap())
@@ -231,7 +232,7 @@ fn broadcast_sleep_race() {
#[test]
fn broadcast_after_spawn_broadcast() {
- let (tx, rx) = crossbeam_channel::unbounded();
+ let (tx, rx) = channel();
// Queue a non-blocking spawn_broadcast.
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@@ -247,7 +248,7 @@ fn broadcast_after_spawn_broadcast() {
#[test]
fn broadcast_after_spawn() {
- let (tx, rx) = crossbeam_channel::bounded(1);
+ let (tx, rx) = channel();
// Queue a regular spawn on a thread-local deque.
crate::registry::in_worker(move |_, _| {
diff --git a/src/latch.rs b/src/latch.rs
index de43272..6c2e4fe 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -1,10 +1,11 @@
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Condvar, Mutex};
+use std::sync::Arc;
use std::usize;
use crate::registry::{Registry, WorkerThread};
+use crate::sync::{Condvar, Mutex};
/// We define various kinds of latches, which are all a primitive signaling
/// mechanism. A latch starts as false. Eventually someone calls `set()` and
@@ -84,13 +85,6 @@ impl CoreLatch {
}
}
- /// Returns the address of this core latch as an integer. Used
- /// for logging.
- #[inline]
- pub(super) fn addr(&self) -> usize {
- self as *const CoreLatch as usize
- }
-
/// Invoked by owning thread as it prepares to sleep. Returns true
/// if the owning thread may proceed to fall asleep, false if the
/// latch was set in the meantime.
@@ -142,6 +136,13 @@ impl CoreLatch {
}
}
+impl AsCoreLatch for CoreLatch {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ self
+ }
+}
+
/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
@@ -269,62 +270,32 @@ impl Latch for LockLatch {
}
}
-/// Counting latches are used to implement scopes. They track a
-/// counter. Unlike other latches, calling `set()` does not
-/// necessarily make the latch be considered `set()`; instead, it just
-/// decrements the counter. The latch is only "set" (in the sense that
-/// `probe()` returns true) once the counter reaches zero.
+/// Once latches are used to implement one-time blocking, primarily
+/// for the termination flag of the threads in the pool.
///
-/// Note: like a `SpinLatch`, count laches are always associated with
+/// Note: like a `SpinLatch`, once-latches are always associated with
/// some registry that is probing them, which must be tickled when
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
/// reference to that registry. This is because in some cases the
-/// registry owns the count-latch, and that would create a cycle. So a
-/// `CountLatch` must be given a reference to its owning registry when
+/// registry owns the once-latch, and that would create a cycle. So a
+/// `OnceLatch` must be given a reference to its owning registry when
/// it is set. For this reason, it does not implement the `Latch`
/// trait (but it doesn't have to, as it is not used in those generic
/// contexts).
#[derive(Debug)]
-pub(super) struct CountLatch {
+pub(super) struct OnceLatch {
core_latch: CoreLatch,
- counter: AtomicUsize,
}
-impl CountLatch {
- #[inline]
- pub(super) fn new() -> CountLatch {
- Self::with_count(1)
- }
-
+impl OnceLatch {
#[inline]
- pub(super) fn with_count(n: usize) -> CountLatch {
- CountLatch {
+ pub(super) fn new() -> OnceLatch {
+ Self {
core_latch: CoreLatch::new(),
- counter: AtomicUsize::new(n),
- }
- }
-
- #[inline]
- pub(super) fn increment(&self) {
- debug_assert!(!self.core_latch.probe());
- self.counter.fetch_add(1, Ordering::Relaxed);
- }
-
- /// Decrements the latch counter by one. If this is the final
- /// count, then the latch is **set**, and calls to `probe()` will
- /// return true. Returns whether the latch was set.
- #[inline]
- pub(super) unsafe fn set(this: *const Self) -> bool {
- if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- CoreLatch::set(&(*this).core_latch);
- true
- } else {
- false
}
}
- /// Decrements the latch counter by one and possibly set it. If
- /// the latch is set, then the specific worker thread is tickled,
+ /// Set the latch, then tickle the specific worker thread,
/// which should be the one that owns this latch.
#[inline]
pub(super) unsafe fn set_and_tickle_one(
@@ -332,31 +303,81 @@ impl CountLatch {
registry: &Registry,
target_worker_index: usize,
) {
- if Self::set(this) {
+ if CoreLatch::set(&(*this).core_latch) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}
-impl AsCoreLatch for CountLatch {
+impl AsCoreLatch for OnceLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}
+/// Counting latches are used to implement scopes. They track a
+/// counter. Unlike other latches, calling `set()` does not
+/// necessarily make the latch be considered `set()`; instead, it just
+/// decrements the counter. The latch is only "set" (in the sense that
+/// `probe()` returns true) once the counter reaches zero.
#[derive(Debug)]
-pub(super) struct CountLockLatch {
- lock_latch: LockLatch,
+pub(super) struct CountLatch {
counter: AtomicUsize,
+ kind: CountLatchKind,
}
-impl CountLockLatch {
- #[inline]
- pub(super) fn with_count(n: usize) -> CountLockLatch {
- CountLockLatch {
- lock_latch: LockLatch::new(),
- counter: AtomicUsize::new(n),
+enum CountLatchKind {
+ /// A latch for scopes created on a rayon thread which will participate in work-
+ /// stealing while it waits for completion. This thread is not necessarily part
+ /// of the same registry as the scope itself!
+ Stealing {
+ latch: CoreLatch,
+ /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
+ /// with registry B, when a job completes in a thread of registry B, we may
+ /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
+ /// That means we need a reference to registry A (since at that point we will
+ /// only have a reference to registry B), so we stash it here.
+ registry: Arc<Registry>,
+ /// The index of the worker to wake in `registry`
+ worker_index: usize,
+ },
+
+ /// A latch for scopes created on a non-rayon thread which will block to wait.
+ Blocking { latch: LockLatch },
+}
+
+impl std::fmt::Debug for CountLatchKind {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ CountLatchKind::Stealing { latch, .. } => {
+ f.debug_tuple("Stealing").field(latch).finish()
+ }
+ CountLatchKind::Blocking { latch, .. } => {
+ f.debug_tuple("Blocking").field(latch).finish()
+ }
+ }
+ }
+}
+
+impl CountLatch {
+ pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
+ Self {
+ counter: AtomicUsize::new(count),
+ kind: match owner {
+ Some(owner) => CountLatchKind::Stealing {
+ latch: CoreLatch::new(),
+ registry: Arc::clone(owner.registry()),
+ worker_index: owner.index(),
+ },
+ None => CountLatchKind::Blocking {
+ latch: LockLatch::new(),
+ },
+ },
}
}
@@ -366,16 +387,42 @@ impl CountLockLatch {
debug_assert!(old_counter != 0);
}
- pub(super) fn wait(&self) {
- self.lock_latch.wait();
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
+ match &self.kind {
+ CountLatchKind::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => unsafe {
+ let owner = owner.expect("owner thread");
+ debug_assert_eq!(registry.id(), owner.registry().id());
+ debug_assert_eq!(*worker_index, owner.index());
+ owner.wait_until(latch);
+ },
+ CountLatchKind::Blocking { latch } => latch.wait(),
+ }
}
}
-impl Latch for CountLockLatch {
+impl Latch for CountLatch {
#[inline]
unsafe fn set(this: *const Self) {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- LockLatch::set(&(*this).lock_latch);
+ // NOTE: Once we call `set` on the internal `latch`,
+ // the target may proceed and invalidate `this`!
+ match (*this).kind {
+ CountLatchKind::Stealing {
+ ref latch,
+ ref registry,
+ worker_index,
+ } => {
+ let registry = Arc::clone(registry);
+ if CoreLatch::set(latch) {
+ registry.notify_worker_latch_is_set(worker_index);
+ }
+ }
+ CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
+ }
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index c9694ee..39df8a2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -73,10 +73,9 @@ use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::str::FromStr;
+use std::thread;
#[macro_use]
-mod log;
-#[macro_use]
mod private;
mod broadcast;
@@ -104,6 +103,12 @@ pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
+#[cfg(not(feature = "web_spin_lock"))]
+use std::sync;
+
+#[cfg(feature = "web_spin_lock")]
+use wasm_sync as sync;
+
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
@@ -148,6 +153,7 @@ pub struct ThreadPoolBuildError {
#[derive(Debug)]
enum ErrorKind {
GlobalPoolAlreadyInitialized,
+ CurrentThreadAlreadyInPool,
IOError(io::Error),
}
@@ -175,6 +181,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
num_threads: usize,
+ /// The thread we're building *from* will also be part of the pool.
+ use_current_thread: bool,
+
/// Custom closure, if any, to handle a panic that we cannot propagate
/// anywhere else.
panic_handler: Option<Box<PanicHandler>>,
@@ -228,6 +237,7 @@ impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
+ use_current_thread: false,
panic_handler: None,
get_thread_name: None,
stack_size: None,
@@ -284,12 +294,12 @@ where
impl ThreadPoolBuilder {
/// Creates a scoped `ThreadPool` initialized using this configuration.
///
- /// This is a convenience function for building a pool using [`crossbeam::scope`]
+ /// This is a convenience function for building a pool using [`std::thread::scope`]
/// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
+ /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// # Examples
///
@@ -324,28 +334,22 @@ impl ThreadPoolBuilder {
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
F: FnOnce(&ThreadPool) -> R,
{
- let result = crossbeam_utils::thread::scope(|scope| {
- let wrapper = &wrapper;
+ std::thread::scope(|scope| {
let pool = self
.spawn_handler(|thread| {
- let mut builder = scope.builder();
+ let mut builder = std::thread::Builder::new();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
- builder.spawn(move |_| wrapper(thread))?;
+ builder.spawn_scoped(scope, || wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
- });
-
- match result {
- Ok(result) => result,
- Err(err) => unwind::resume_unwinding(err),
- }
+ })
}
}
@@ -354,13 +358,11 @@ impl<S> ThreadPoolBuilder<S> {
///
/// Note that the threads will not exit until after the pool is dropped. It
/// is up to the caller to wait for thread termination if that is important
- /// for any invariants. For instance, threads created in [`crossbeam::scope`]
+ /// for any invariants. For instance, threads created in [`std::thread::scope`]
/// will be joined before that scope returns, and this will block indefinitely
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
- ///
/// # Examples
///
/// A minimal spawn handler just needs to call `run()` from an independent thread.
@@ -409,6 +411,7 @@ impl<S> ThreadPoolBuilder<S> {
/// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
/// [`build_scoped`](#method.build_scoped).
///
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
/// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// ```
@@ -445,6 +448,7 @@ impl<S> ThreadPoolBuilder<S> {
spawn_handler: CustomSpawn::new(spawn),
// ..self
num_threads: self.num_threads,
+ use_current_thread: self.use_current_thread,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
@@ -465,12 +469,18 @@ impl<S> ThreadPoolBuilder<S> {
if self.num_threads > 0 {
self.num_threads
} else {
+ let default = || {
+ thread::available_parallelism()
+ .map(|n| n.get())
+ .unwrap_or(1)
+ };
+
match env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
- Some(x) if x > 0 => return x,
- Some(x) if x == 0 => return num_cpus::get(),
+ Some(x @ 1..) => return x,
+ Some(0) => return default(),
_ => {}
}
@@ -479,8 +489,8 @@ impl<S> ThreadPoolBuilder<S> {
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
- Some(x) if x > 0 => x,
- _ => num_cpus::get(),
+ Some(x @ 1..) => x,
+ _ => default(),
}
}
}
@@ -519,9 +529,8 @@ impl<S> ThreadPoolBuilder<S> {
/// may change in the future, if you wish to rely on a fixed
/// number of threads, you should use this function to specify
/// that number. To reproduce the current default behavior, you
- /// may wish to use the [`num_cpus`
- /// crate](https://crates.io/crates/num_cpus) to query the number
- /// of CPUs dynamically.
+ /// may wish to use [`std::thread::available_parallelism`]
+ /// to query the number of CPUs dynamically.
///
/// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
@@ -532,6 +541,24 @@ impl<S> ThreadPoolBuilder<S> {
self
}
+ /// Use the current thread as one of the threads in the pool.
+ ///
+ /// The current thread is guaranteed to be at index 0, and since the thread is not managed by
+ /// rayon, the spawn and exit handlers do not run for that thread.
+ ///
+ /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
+ /// the thread-pool will generally not be picked up automatically by this thread unless you
+ /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
+ ///
+ /// # Local thread-pools
+ ///
+ /// Using this in a local thread-pool means the registry will be leaked. In future versions
+ /// there might be a way of cleaning up the current-thread state.
+ pub fn use_current_thread(mut self) -> Self {
+ self.use_current_thread = true;
+ self
+ }
+
/// Returns a copy of the current panic handler.
fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
self.panic_handler.take()
@@ -734,18 +761,22 @@ impl ThreadPoolBuildError {
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
"The global thread pool has already been initialized.";
+const CURRENT_THREAD_ALREADY_IN_POOL: &str =
+ "The current thread is already part of another thread pool.";
+
impl Error for ThreadPoolBuildError {
#[allow(deprecated)]
fn description(&self) -> &str {
match self.kind {
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
+ ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
ErrorKind::IOError(ref e) => e.description(),
}
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
- ErrorKind::GlobalPoolAlreadyInitialized => None,
+ ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
ErrorKind::IOError(e) => Some(e),
}
}
@@ -754,6 +785,7 @@ impl Error for ThreadPoolBuildError {
impl fmt::Display for ThreadPoolBuildError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
+ ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
ErrorKind::IOError(e) => e.fmt(f),
}
@@ -771,6 +803,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
+ ref use_current_thread,
ref get_thread_name,
ref panic_handler,
ref stack_size,
@@ -795,6 +828,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
+ .field("use_current_thread", use_current_thread)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)
diff --git a/src/log.rs b/src/log.rs
deleted file mode 100644
index 7b6daf0..0000000
--- a/src/log.rs
+++ /dev/null
@@ -1,413 +0,0 @@
-//! Debug Logging
-//!
-//! To use in a debug build, set the env var `RAYON_LOG` as
-//! described below. In a release build, logs are compiled out by
-//! default unless Rayon is built with `--cfg rayon_rs_log` (try
-//! `RUSTFLAGS="--cfg rayon_rs_log"`).
-//!
-//! Note that logs are an internally debugging tool and their format
-//! is considered unstable, as are the details of how to enable them.
-//!
-//! # Valid values for RAYON_LOG
-//!
-//! The `RAYON_LOG` variable can take on the following values:
-//!
-//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
-//! useful for tracking down deadlocks
-//! * `profile:<file>` -- dumps only those events needed to reconstruct how
-//! many workers are active at a given time
-//! * `all:<file>` -- dumps every event to the file; useful for debugging
-
-use crossbeam_channel::{self, Receiver, Sender};
-use std::collections::VecDeque;
-use std::env;
-use std::fs::File;
-use std::io::{self, BufWriter, Write};
-
-/// True if logs are compiled in.
-pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
-
-#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
-pub(super) enum Event {
- /// Flushes events to disk, used to terminate benchmarking.
- Flush,
-
- /// Indicates that a worker thread started execution.
- ThreadStart {
- worker: usize,
- terminate_addr: usize,
- },
-
- /// Indicates that a worker thread started execution.
- ThreadTerminate { worker: usize },
-
- /// Indicates that a worker thread became idle, blocked on `latch_addr`.
- ThreadIdle { worker: usize, latch_addr: usize },
-
- /// Indicates that an idle worker thread found work to do, after
- /// yield rounds. It should no longer be considered idle.
- ThreadFoundWork { worker: usize, yields: u32 },
-
- /// Indicates that a worker blocked on a latch observed that it was set.
- ///
- /// Internal debugging event that does not affect the state
- /// machine.
- ThreadSawLatchSet { worker: usize, latch_addr: usize },
-
- /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
- /// sleep state that we saw at the time.
- ThreadSleepy { worker: usize, jobs_counter: usize },
-
- /// Indicates that the thread's attempt to fall asleep was
- /// interrupted because the latch was set. (This is not, in and of
- /// itself, a change to the thread state.)
- ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
-
- /// Indicates that the thread's attempt to fall asleep was
- /// interrupted because a job was posted. (This is not, in and of
- /// itself, a change to the thread state.)
- ThreadSleepInterruptedByJob { worker: usize },
-
- /// Indicates that an idle worker has gone to sleep.
- ThreadSleeping { worker: usize, latch_addr: usize },
-
- /// Indicates that a sleeping worker has awoken.
- ThreadAwoken { worker: usize, latch_addr: usize },
-
- /// Indicates that the given worker thread was notified it should
- /// awaken.
- ThreadNotify { worker: usize },
-
- /// The given worker has pushed a job to its local deque.
- JobPushed { worker: usize },
-
- /// The given worker has popped a job from its local deque.
- JobPopped { worker: usize },
-
- /// The given worker has stolen a job from the deque of another.
- JobStolen { worker: usize, victim: usize },
-
- /// N jobs were injected into the global queue.
- JobsInjected { count: usize },
-
- /// A job was removed from the global queue.
- JobUninjected { worker: usize },
-
- /// A job was broadcasted to N threads.
- JobBroadcast { count: usize },
-
- /// When announcing a job, this was the value of the counters we observed.
- ///
- /// No effect on thread state, just a debugging event.
- JobThreadCounts {
- worker: usize,
- num_idle: u16,
- num_sleepers: u16,
- },
-}
-
-/// Handle to the logging thread, if any. You can use this to deliver
-/// logs. You can also clone it freely.
-#[derive(Clone)]
-pub(super) struct Logger {
- sender: Option<Sender<Event>>,
-}
-
-impl Logger {
- pub(super) fn new(num_workers: usize) -> Logger {
- if !LOG_ENABLED {
- return Self::disabled();
- }
-
- // see the doc comment for the format
- let env_log = match env::var("RAYON_LOG") {
- Ok(s) => s,
- Err(_) => return Self::disabled(),
- };
-
- let (sender, receiver) = crossbeam_channel::unbounded();
-
- if let Some(filename) = env_log.strip_prefix("tail:") {
- let filename = filename.to_string();
- ::std::thread::spawn(move || {
- Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
- });
- } else if env_log == "all" {
- ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
- } else if let Some(filename) = env_log.strip_prefix("profile:") {
- let filename = filename.to_string();
- ::std::thread::spawn(move || {
- Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
- });
- } else {
- panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
- }
-
- Logger {
- sender: Some(sender),
- }
- }
-
- fn disabled() -> Logger {
- Logger { sender: None }
- }
-
- #[inline]
- pub(super) fn log(&self, event: impl FnOnce() -> Event) {
- if !LOG_ENABLED {
- return;
- }
-
- if let Some(sender) = &self.sender {
- sender.send(event()).unwrap();
- }
- }
-
- fn profile_logger_thread(
- num_workers: usize,
- log_filename: String,
- capacity: usize,
- receiver: Receiver<Event>,
- ) {
- let file = File::create(&log_filename)
- .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
-
- let mut writer = BufWriter::new(file);
- let mut events = Vec::with_capacity(capacity);
- let mut state = SimulatorState::new(num_workers);
- let timeout = std::time::Duration::from_secs(30);
-
- loop {
- while let Ok(event) = receiver.recv_timeout(timeout) {
- if let Event::Flush = event {
- break;
- }
-
- events.push(event);
- if events.len() == capacity {
- break;
- }
- }
-
- for event in events.drain(..) {
- if state.simulate(&event) {
- state.dump(&mut writer, &event).unwrap();
- }
- }
-
- writer.flush().unwrap();
- }
- }
-
- fn tail_logger_thread(
- num_workers: usize,
- log_filename: String,
- capacity: usize,
- receiver: Receiver<Event>,
- ) {
- let file = File::create(&log_filename)
- .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
-
- let mut writer = BufWriter::new(file);
- let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
- let mut state = SimulatorState::new(num_workers);
- let timeout = std::time::Duration::from_secs(30);
- let mut skipped = false;
-
- loop {
- while let Ok(event) = receiver.recv_timeout(timeout) {
- if let Event::Flush = event {
- // We ignore Flush events in tail mode --
- // we're really just looking for
- // deadlocks.
- continue;
- } else {
- if events.len() == capacity {
- let event = events.pop_front().unwrap();
- state.simulate(&event);
- skipped = true;
- }
-
- events.push_back(event);
- }
- }
-
- if skipped {
- writeln!(writer, "...").unwrap();
- skipped = false;
- }
-
- for event in events.drain(..) {
- // In tail mode, we dump *all* events out, whether or
- // not they were 'interesting' to the state machine.
- state.simulate(&event);
- state.dump(&mut writer, &event).unwrap();
- }
-
- writer.flush().unwrap();
- }
- }
-
- fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
- let stderr = std::io::stderr();
- let mut state = SimulatorState::new(num_workers);
-
- for event in receiver {
- let mut writer = BufWriter::new(stderr.lock());
- state.simulate(&event);
- state.dump(&mut writer, &event).unwrap();
- writer.flush().unwrap();
- }
- }
-}
-
-#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
-enum State {
- Working,
- Idle,
- Notified,
- Sleeping,
- Terminated,
-}
-
-impl State {
- fn letter(&self) -> char {
- match self {
- State::Working => 'W',
- State::Idle => 'I',
- State::Notified => 'N',
- State::Sleeping => 'S',
- State::Terminated => 'T',
- }
- }
-}
-
-struct SimulatorState {
- local_queue_size: Vec<usize>,
- thread_states: Vec<State>,
- injector_size: usize,
-}
-
-impl SimulatorState {
- fn new(num_workers: usize) -> Self {
- Self {
- local_queue_size: (0..num_workers).map(|_| 0).collect(),
- thread_states: (0..num_workers).map(|_| State::Working).collect(),
- injector_size: 0,
- }
- }
-
- fn simulate(&mut self, event: &Event) -> bool {
- match *event {
- Event::ThreadIdle { worker, .. } => {
- assert_eq!(self.thread_states[worker], State::Working);
- self.thread_states[worker] = State::Idle;
- true
- }
-
- Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
- self.thread_states[worker] = State::Working;
- true
- }
-
- Event::ThreadTerminate { worker, .. } => {
- self.thread_states[worker] = State::Terminated;
- true
- }
-
- Event::ThreadSleeping { worker, .. } => {
- assert_eq!(self.thread_states[worker], State::Idle);
- self.thread_states[worker] = State::Sleeping;
- true
- }
-
- Event::ThreadAwoken { worker, .. } => {
- assert_eq!(self.thread_states[worker], State::Notified);
- self.thread_states[worker] = State::Idle;
- true
- }
-
- Event::JobPushed { worker } => {
- self.local_queue_size[worker] += 1;
- true
- }
-
- Event::JobPopped { worker } => {
- self.local_queue_size[worker] -= 1;
- true
- }
-
- Event::JobStolen { victim, .. } => {
- self.local_queue_size[victim] -= 1;
- true
- }
-
- Event::JobsInjected { count } => {
- self.injector_size += count;
- true
- }
-
- Event::JobUninjected { .. } => {
- self.injector_size -= 1;
- true
- }
-
- Event::ThreadNotify { worker } => {
- // Currently, this log event occurs while holding the
- // thread lock, so we should *always* see it before
- // the worker awakens.
- assert_eq!(self.thread_states[worker], State::Sleeping);
- self.thread_states[worker] = State::Notified;
- true
- }
-
- // remaining events are no-ops from pov of simulating the
- // thread state
- _ => false,
- }
- }
-
- fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
- let num_idle_threads = self
- .thread_states
- .iter()
- .filter(|s| **s == State::Idle)
- .count();
-
- let num_sleeping_threads = self
- .thread_states
- .iter()
- .filter(|s| **s == State::Sleeping)
- .count();
-
- let num_notified_threads = self
- .thread_states
- .iter()
- .filter(|s| **s == State::Notified)
- .count();
-
- let num_pending_jobs: usize = self.local_queue_size.iter().sum();
-
- write!(w, "{:2},", num_idle_threads)?;
- write!(w, "{:2},", num_sleeping_threads)?;
- write!(w, "{:2},", num_notified_threads)?;
- write!(w, "{:4},", num_pending_jobs)?;
- write!(w, "{:4},", self.injector_size)?;
-
- let event_str = format!("{:?}", event);
- write!(w, r#""{:60}","#, event_str)?;
-
- for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
- write!(w, " T{:02},{}", i, state.letter(),)?;
-
- if *queue_size > 0 {
- write!(w, ",{:03},", queue_size)?;
- } else {
- write!(w, ", ,")?;
- }
- }
-
- writeln!(w)?;
- Ok(())
- }
-}
diff --git a/src/registry.rs b/src/registry.rs
index 5d56ac9..46cd22b 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -1,8 +1,7 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
-use crate::log::Event::*;
-use crate::log::Logger;
+use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
use crate::sleep::Sleep;
+use crate::sync::Mutex;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
@@ -17,7 +16,7 @@ use std::io;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex, Once};
+use std::sync::{Arc, Once};
use std::thread;
use std::usize;
@@ -130,7 +129,6 @@ where
}
pub(super) struct Registry {
- logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
@@ -210,26 +208,7 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
- let builder = ThreadPoolBuilder::new()
- .num_threads(1)
- .spawn_handler(|thread| {
- // Rather than starting a new thread, we're just taking over the current thread
- // *without* running the main loop, so we can still return from here.
- // The WorkerThread is leaked, but we never shutdown the global pool anyway.
- let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
- let registry = &*worker_thread.registry;
- let index = worker_thread.index;
-
- unsafe {
- WorkerThread::set_current(worker_thread);
-
- // let registry know we are ready to do work
- Latch::set(&registry.thread_infos[index].primed);
- }
-
- Ok(())
- });
-
+ let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread();
let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
return fallback_result;
@@ -280,11 +259,9 @@ impl Registry {
})
.unzip();
- let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
- logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
- sleep: Sleep::new(logger, n_threads),
+ sleep: Sleep::new(n_threads),
injected_jobs: Injector::new(),
broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
@@ -305,6 +282,25 @@ impl Registry {
stealer,
index,
};
+
+ if index == 0 && builder.use_current_thread {
+ if !WorkerThread::current().is_null() {
+ return Err(ThreadPoolBuildError::new(
+ ErrorKind::CurrentThreadAlreadyInPool,
+ ));
+ }
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+ continue;
+ }
+
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
}
@@ -363,11 +359,6 @@ impl Registry {
}
}
- #[inline]
- pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
- self.logger.log(event)
- }
-
pub(super) fn num_threads(&self) -> usize {
self.thread_infos.len()
}
@@ -426,8 +417,6 @@ impl Registry {
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
pub(super) fn inject(&self, injected_job: JobRef) {
- self.log(|| JobsInjected { count: 1 });
-
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
@@ -442,22 +431,17 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
self.injected_jobs.push(injected_job);
- self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
+ self.sleep.new_injected_jobs(1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
!self.injected_jobs.is_empty()
}
- fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
+ fn pop_injected_job(&self) -> Option<JobRef> {
loop {
match self.injected_jobs.steal() {
- Steal::Success(job) => {
- self.log(|| JobUninjected {
- worker: worker_index,
- });
- return Some(job);
- }
+ Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => {}
}
@@ -471,9 +455,6 @@ impl Registry {
/// **Panics** if not given exactly as many jobs as there are threads.
pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
assert_eq!(self.num_threads(), injected_jobs.len());
- self.log(|| JobBroadcast {
- count: self.num_threads(),
- });
{
let broadcasts = self.broadcasts.lock().unwrap();
@@ -545,9 +526,6 @@ impl Registry {
self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
- // flush accumulated logs as we exit the thread
- self.logger.log(|| Flush);
-
job.into_result()
})
}
@@ -610,7 +588,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
+ unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -640,10 +618,7 @@ struct ThreadInfo {
/// This latch is *set* by the `terminate` method on the
/// `Registry`, once the registry's main "terminate" counter
/// reaches zero.
- ///
- /// NB. We use a `CountLatch` here because it has no lifetimes and is
- /// meant for async use, but the count never gets higher than one.
- terminate: CountLatch,
+ terminate: OnceLatch,
/// the "stealer" half of the worker's deque
stealer: Stealer<JobRef>,
@@ -654,7 +629,7 @@ impl ThreadInfo {
ThreadInfo {
primed: LockLatch::new(),
stopped: LockLatch::new(),
- terminate: CountLatch::new(),
+ terminate: OnceLatch::new(),
stealer,
}
}
@@ -737,11 +712,6 @@ impl WorkerThread {
&self.registry
}
- #[inline]
- pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
- self.registry.logger.log(event)
- }
-
/// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
#[inline]
pub(super) fn index(&self) -> usize {
@@ -750,12 +720,9 @@ impl WorkerThread {
#[inline]
pub(super) unsafe fn push(&self, job: JobRef) {
- self.log(|| JobPushed { worker: self.index });
let queue_was_empty = self.worker.is_empty();
self.worker.push(job);
- self.registry
- .sleep
- .new_internal_jobs(self.index, 1, queue_was_empty);
+ self.registry.sleep.new_internal_jobs(1, queue_was_empty);
}
#[inline]
@@ -777,7 +744,6 @@ impl WorkerThread {
let popped_job = self.worker.pop();
if popped_job.is_some() {
- self.log(|| JobPopped { worker: self.index });
return popped_job;
}
@@ -813,31 +779,51 @@ impl WorkerThread {
// accesses, which would be *very bad*
let abort_guard = unwind::AbortIfPanic;
- let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
- while !latch.probe() {
- if let Some(job) = self.find_work() {
- self.registry.sleep.work_found(idle_state);
+ 'outer: while !latch.probe() {
+ // Check for local work *before* we start marking ourself idle,
+ // especially to avoid modifying shared sleep state.
+ if let Some(job) = self.take_local_job() {
self.execute(job);
- idle_state = self.registry.sleep.start_looking(self.index, latch);
- } else {
- self.registry
- .sleep
- .no_work_found(&mut idle_state, latch, || self.has_injected_job())
+ continue;
+ }
+
+ let mut idle_state = self.registry.sleep.start_looking(self.index);
+ while !latch.probe() {
+ if let Some(job) = self.find_work() {
+ self.registry.sleep.work_found();
+ self.execute(job);
+ // The job might have injected local work, so go back to the outer loop.
+ continue 'outer;
+ } else {
+ self.registry
+ .sleep
+ .no_work_found(&mut idle_state, latch, || self.has_injected_job())
+ }
}
- }
- // If we were sleepy, we are not anymore. We "found work" --
- // whatever the surrounding thread was doing before it had to
- // wait.
- self.registry.sleep.work_found(idle_state);
+ // If we were sleepy, we are not anymore. We "found work" --
+ // whatever the surrounding thread was doing before it had to wait.
+ self.registry.sleep.work_found();
+ break;
+ }
- self.log(|| ThreadSawLatchSet {
- worker: self.index,
- latch_addr: latch.addr(),
- });
mem::forget(abort_guard); // successful execution, do not abort
}
+ unsafe fn wait_until_out_of_work(&self) {
+ debug_assert_eq!(self as *const _, WorkerThread::current());
+ let registry = &*self.registry;
+ let index = self.index;
+
+ self.wait_until(&registry.thread_infos[index].terminate);
+
+ // Should not be any work left in our queue.
+ debug_assert!(self.take_local_job().is_none());
+
+ // Let registry know we are done
+ Latch::set(&registry.thread_infos[index].stopped);
+ }
+
fn find_work(&self) -> Option<JobRef> {
// Try to find some work to do. We give preference first
// to things in our local deque, then in other workers
@@ -846,7 +832,7 @@ impl WorkerThread {
// we take on something new.
self.take_local_job()
.or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
+ .or_else(|| self.registry.pop_injected_job())
}
pub(super) fn yield_now(&self) -> Yield {
@@ -898,13 +884,7 @@ impl WorkerThread {
.find_map(|victim_index| {
let victim = &thread_infos[victim_index];
match victim.stealer.steal() {
- Steal::Success(job) => {
- self.log(|| JobStolen {
- worker: self.index,
- victim: victim_index,
- });
- Some(job)
- }
+ Steal::Success(job) => Some(job),
Steal::Empty => None,
Steal::Retry => {
retry = true;
@@ -940,24 +920,11 @@ unsafe fn main_loop(thread: ThreadBuilder) {
registry.catch_unwind(|| handler(index));
}
- let my_terminate_latch = &registry.thread_infos[index].terminate;
- worker_thread.log(|| ThreadStart {
- worker: index,
- terminate_addr: my_terminate_latch.as_core_latch().addr(),
- });
- worker_thread.wait_until(my_terminate_latch);
-
- // Should not be any work left in our queue.
- debug_assert!(worker_thread.take_local_job().is_none());
-
- // let registry know we are done
- Latch::set(&registry.thread_infos[index].stopped);
+ worker_thread.wait_until_out_of_work();
// Normal termination, do not abort.
mem::forget(abort_guard);
- worker_thread.log(|| ThreadTerminate { worker: index });
-
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
registry.catch_unwind(|| handler(index));
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
index f460dd7..b7163d1 100644
--- a/src/scope/mod.rs
+++ b/src/scope/mod.rs
@@ -7,7 +7,7 @@
use crate::broadcast::BroadcastContext;
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
-use crate::latch::{CountLatch, CountLockLatch, Latch};
+use crate::latch::{CountLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
use std::any::Any;
@@ -39,26 +39,6 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
-pub(super) enum ScopeLatch {
- /// A latch for scopes created on a rayon thread which will participate in work-
- /// stealing while it waits for completion. This thread is not necessarily part
- /// of the same registry as the scope itself!
- Stealing {
- latch: CountLatch,
- /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
- /// with registry B, when a job completes in a thread of registry B, we may
- /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
- /// That means we need a reference to registry A (since at that point we will
- /// only have a reference to registry B), so we stash it here.
- registry: Arc<Registry>,
- /// The index of the worker to wake in `registry`
- worker_index: usize,
- },
-
- /// A latch for scopes created on a non-rayon thread which will block to wait.
- Blocking { latch: CountLockLatch },
-}
-
struct ScopeBase<'scope> {
/// thread registry where `scope()` was executed or where `in_place_scope()`
/// should spawn jobs.
@@ -69,7 +49,7 @@ struct ScopeBase<'scope> {
panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
/// latch to track job counts
- job_completed_latch: ScopeLatch,
+ job_completed_latch: CountLatch,
/// You can think of a scope as containing a list of closures to execute,
/// all of which outlive `'scope`. They're not actually required to be
@@ -650,21 +630,17 @@ impl<'scope> ScopeBase<'scope> {
ScopeBase {
registry: Arc::clone(registry),
panic: AtomicPtr::new(ptr::null_mut()),
- job_completed_latch: ScopeLatch::new(owner),
+ job_completed_latch: CountLatch::new(owner),
marker: PhantomData,
}
}
- fn increment(&self) {
- self.job_completed_latch.increment();
- }
-
fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
where
FUNC: FnOnce() + Send + 'scope,
{
unsafe {
- self.increment();
+ self.job_completed_latch.increment();
job.into_job_ref()
}
}
@@ -675,7 +651,7 @@ impl<'scope> ScopeBase<'scope> {
{
let n_threads = self.registry.num_threads();
let job_refs = (0..n_threads).map(|_| unsafe {
- self.increment();
+ self.job_completed_latch.increment();
ArcJob::as_job_ref(&job)
});
@@ -710,17 +686,15 @@ impl<'scope> ScopeBase<'scope> {
where
FUNC: FnOnce() -> R,
{
- match unwind::halt_unwinding(func) {
- Ok(r) => {
- Latch::set(&(*this).job_completed_latch);
- Some(r)
- }
+ let result = match unwind::halt_unwinding(func) {
+ Ok(r) => Some(r),
Err(err) => {
(*this).job_panicked(err);
- Latch::set(&(*this).job_completed_latch);
None
}
- }
+ };
+ Latch::set(&(*this).job_completed_latch);
+ result
}
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
@@ -754,61 +728,6 @@ impl<'scope> ScopeBase<'scope> {
}
}
-impl ScopeLatch {
- fn new(owner: Option<&WorkerThread>) -> Self {
- Self::with_count(1, owner)
- }
-
- pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
- match owner {
- Some(owner) => ScopeLatch::Stealing {
- latch: CountLatch::with_count(count),
- registry: Arc::clone(owner.registry()),
- worker_index: owner.index(),
- },
- None => ScopeLatch::Blocking {
- latch: CountLockLatch::with_count(count),
- },
- }
- }
-
- fn increment(&self) {
- match self {
- ScopeLatch::Stealing { latch, .. } => latch.increment(),
- ScopeLatch::Blocking { latch } => latch.increment(),
- }
- }
-
- pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
- match self {
- ScopeLatch::Stealing {
- latch,
- registry,
- worker_index,
- } => unsafe {
- let owner = owner.expect("owner thread");
- debug_assert_eq!(registry.id(), owner.registry().id());
- debug_assert_eq!(*worker_index, owner.index());
- owner.wait_until(latch);
- },
- ScopeLatch::Blocking { latch } => latch.wait(),
- }
- }
-}
-
-impl Latch for ScopeLatch {
- unsafe fn set(this: *const Self) {
- match &*this {
- ScopeLatch::Stealing {
- latch,
- registry,
- worker_index,
- } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
- ScopeLatch::Blocking { latch } => Latch::set(latch),
- }
- }
-}
-
impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scope")
@@ -830,21 +749,6 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> {
}
}
-impl fmt::Debug for ScopeLatch {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- ScopeLatch::Stealing { latch, .. } => fmt
- .debug_tuple("ScopeLatch::Stealing")
- .field(latch)
- .finish(),
- ScopeLatch::Blocking { latch } => fmt
- .debug_tuple("ScopeLatch::Blocking")
- .field(latch)
- .finish(),
- }
- }
-}
-
/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
///
/// Unsafe code is still required to dereference the pointer, but that's fine in
diff --git a/src/sleep/counters.rs b/src/sleep/counters.rs
index f2a3de3..53d2c55 100644
--- a/src/sleep/counters.rs
+++ b/src/sleep/counters.rs
@@ -212,12 +212,12 @@ impl AtomicCounters {
#[inline]
fn select_thread(word: usize, shift: usize) -> usize {
- ((word >> shift) as usize) & THREADS_MAX
+ (word >> shift) & THREADS_MAX
}
#[inline]
fn select_jec(word: usize) -> usize {
- (word >> JEC_SHIFT) as usize
+ word >> JEC_SHIFT
}
impl Counters {
diff --git a/src/sleep/mod.rs b/src/sleep/mod.rs
index af7225a..fa1f7be 100644
--- a/src/sleep/mod.rs
+++ b/src/sleep/mod.rs
@@ -2,11 +2,9 @@
//! for an overview.
use crate::latch::CoreLatch;
-use crate::log::Event::*;
-use crate::log::Logger;
+use crate::sync::{Condvar, Mutex};
use crossbeam_utils::CachePadded;
use std::sync::atomic::Ordering;
-use std::sync::{Condvar, Mutex};
use std::thread;
use std::usize;
@@ -22,8 +20,6 @@ use self::counters::{AtomicCounters, JobsEventCounter};
///
/// [`README.md`] README.md
pub(super) struct Sleep {
- logger: Logger,
-
/// One "sleep state" per worker. Used to track if a worker is sleeping and to have
/// them block.
worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
@@ -62,22 +58,16 @@ const ROUNDS_UNTIL_SLEEPY: u32 = 32;
const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
impl Sleep {
- pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
+ pub(super) fn new(n_threads: usize) -> Sleep {
assert!(n_threads <= THREADS_MAX);
Sleep {
- logger,
worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
counters: AtomicCounters::new(),
}
}
#[inline]
- pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
- self.logger.log(|| ThreadIdle {
- worker: worker_index,
- latch_addr: latch.addr(),
- });
-
+ pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
self.counters.add_inactive_thread();
IdleState {
@@ -88,12 +78,7 @@ impl Sleep {
}
#[inline]
- pub(super) fn work_found(&self, idle_state: IdleState) {
- self.logger.log(|| ThreadFoundWork {
- worker: idle_state.worker_index,
- yields: idle_state.rounds,
- });
-
+ pub(super) fn work_found(&self) {
// If we were the last idle thread and other threads are still sleeping,
// then we should wake up another thread.
let threads_to_wake = self.counters.sub_inactive_thread();
@@ -111,7 +96,7 @@ impl Sleep {
thread::yield_now();
idle_state.rounds += 1;
} else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
- idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
+ idle_state.jobs_counter = self.announce_sleepy();
idle_state.rounds += 1;
thread::yield_now();
} else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
@@ -124,16 +109,10 @@ impl Sleep {
}
#[cold]
- fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
- let counters = self
- .counters
- .increment_jobs_event_counter_if(JobsEventCounter::is_active);
- let jobs_counter = counters.jobs_counter();
- self.logger.log(|| ThreadSleepy {
- worker: worker_index,
- jobs_counter: jobs_counter.as_usize(),
- });
- jobs_counter
+ fn announce_sleepy(&self) -> JobsEventCounter {
+ self.counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_active)
+ .jobs_counter()
}
#[cold]
@@ -146,11 +125,6 @@ impl Sleep {
let worker_index = idle_state.worker_index;
if !latch.get_sleepy() {
- self.logger.log(|| ThreadSleepInterruptedByLatch {
- worker: worker_index,
- latch_addr: latch.addr(),
- });
-
return;
}
@@ -161,11 +135,6 @@ impl Sleep {
// Our latch was signalled. We should wake back up fully as we
// will have some stuff to do.
if !latch.fall_asleep() {
- self.logger.log(|| ThreadSleepInterruptedByLatch {
- worker: worker_index,
- latch_addr: latch.addr(),
- });
-
idle_state.wake_fully();
return;
}
@@ -180,10 +149,6 @@ impl Sleep {
// we didn't see it. We should return to just before the SLEEPY
// state so we can do another search and (if we fail to find
// work) go back to sleep.
- self.logger.log(|| ThreadSleepInterruptedByJob {
- worker: worker_index,
- });
-
idle_state.wake_partly();
latch.wake_up();
return;
@@ -197,11 +162,6 @@ impl Sleep {
// Successfully registered as asleep.
- self.logger.log(|| ThreadSleeping {
- worker: worker_index,
- latch_addr: latch.addr(),
- });
-
// We have one last check for injected jobs to do. This protects against
// deadlock in the very unlikely event that
//
@@ -232,11 +192,6 @@ impl Sleep {
// Update other state:
idle_state.wake_fully();
latch.wake_up();
-
- self.logger.log(|| ThreadAwoken {
- worker: worker_index,
- latch_addr: latch.addr(),
- });
}
/// Notify the given thread that it should wake up (if it is
@@ -254,24 +209,16 @@ impl Sleep {
///
/// # Parameters
///
- /// - `source_worker_index` -- index of the thread that did the
- /// push, or `usize::MAX` if this came from outside the thread
- /// pool -- it is used only for logging.
/// - `num_jobs` -- lower bound on number of jobs available for stealing.
/// We'll try to get at least one thread per job.
#[inline]
- pub(super) fn new_injected_jobs(
- &self,
- source_worker_index: usize,
- num_jobs: u32,
- queue_was_empty: bool,
- ) {
+ pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
// This fence is needed to guarantee that threads
// as they are about to fall asleep, observe any
// new jobs that may have been injected.
std::sync::atomic::fence(Ordering::SeqCst);
- self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ self.new_jobs(num_jobs, queue_was_empty)
}
/// Signals that `num_jobs` new jobs were pushed onto a thread's
@@ -284,24 +231,16 @@ impl Sleep {
///
/// # Parameters
///
- /// - `source_worker_index` -- index of the thread that did the
- /// push, or `usize::MAX` if this came from outside the thread
- /// pool -- it is used only for logging.
/// - `num_jobs` -- lower bound on number of jobs available for stealing.
/// We'll try to get at least one thread per job.
#[inline]
- pub(super) fn new_internal_jobs(
- &self,
- source_worker_index: usize,
- num_jobs: u32,
- queue_was_empty: bool,
- ) {
- self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
+ self.new_jobs(num_jobs, queue_was_empty)
}
/// Common helper for `new_injected_jobs` and `new_internal_jobs`.
#[inline]
- fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
+ fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
// Read the counters and -- if sleepy workers have announced themselves
// -- announce that there is now work available. The final value of `counters`
// with which we exit the loop thus corresponds to a state when
@@ -311,12 +250,6 @@ impl Sleep {
let num_awake_but_idle = counters.awake_but_idle_threads();
let num_sleepers = counters.sleeping_threads();
- self.logger.log(|| JobThreadCounts {
- worker: source_worker_index,
- num_idle: num_awake_but_idle as u16,
- num_sleepers: num_sleepers as u16,
- });
-
if num_sleepers == 0 {
// nobody to wake
return;
@@ -372,8 +305,6 @@ impl Sleep {
// do.
self.counters.sub_sleeping_thread();
- self.logger.log(|| ThreadNotify { worker: index });
-
true
} else {
false
diff --git a/src/spawn/mod.rs b/src/spawn/mod.rs
index 1aa9edb..22c5898 100644
--- a/src/spawn/mod.rs
+++ b/src/spawn/mod.rs
@@ -4,8 +4,8 @@ use crate::unwind;
use std::mem;
use std::sync::Arc;
-/// Fires off a task into the Rayon threadpool in the "static" or
-/// "global" scope. Just like a standard thread, this task is not
+/// Puts the task into the Rayon threadpool's job queue in the "static"
+/// or "global" scope. Just like a standard thread, this task is not
/// tied to the current stack frame, and hence it cannot hold any
/// references other than those with `'static` lifetime. If you want
/// to spawn a task that references stack data, use [the `scope()`
diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs
index c37826e..5ae6e0f 100644
--- a/src/thread_pool/mod.rs
+++ b/src/thread_pool/mod.rs
@@ -80,6 +80,43 @@ impl ThreadPool {
/// thread-local data from the current thread will not be
/// accessible.
///
+ /// # Warning: execution order
+ ///
+ /// If the current thread is part of a different thread pool, it will try to
+ /// keep busy while the `op` completes in its target pool, similar to
+ /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
+ /// potentially schedule other tasks to run on the current thread in the
+ /// meantime. For example
+ ///
+ /// ```rust
+ /// # use rayon_core as rayon;
+ /// fn main() {
+ /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
+ /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
+ /// let do_it = || {
+ /// print!("one ");
+ /// pool.install(||{});
+ /// print!("two ");
+ /// };
+ /// rayon::join(|| do_it(), || do_it());
+ /// }
+ /// ```
+ ///
+ /// Since we configured just one thread in the global pool, one might
+ /// expect `do_it()` to run sequentially, producing:
+ ///
+ /// ```ascii
+ /// one two one two
+ /// ```
+ ///
+ /// However each call to `install()` yields implicitly, allowing rayon to
+ /// run multiple instances of `do_it()` concurrently on the single, global
+ /// thread. The following output would be equally valid:
+ ///
+ /// ```ascii
+ /// one one two two
+ /// ```
+ ///
/// # Panics
///
/// If `op` should panic, that panic will be propagated.
diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs
index 6143e57..88b3628 100644
--- a/src/thread_pool/test.rs
+++ b/src/thread_pool/test.rs
@@ -383,7 +383,7 @@ fn in_place_scope_fifo_no_deadlock() {
#[test]
fn yield_now_to_spawn() {
- let (tx, rx) = crossbeam_channel::bounded(1);
+ let (tx, rx) = channel();
// Queue a regular spawn.
crate::spawn(move || tx.send(22).unwrap());
@@ -401,7 +401,7 @@ fn yield_now_to_spawn() {
#[test]
fn yield_local_to_spawn() {
- let (tx, rx) = crossbeam_channel::bounded(1);
+ let (tx, rx) = channel();
// Queue a regular spawn.
crate::spawn(move || tx.send(22).unwrap());
diff --git a/tests/scoped_threadpool.rs b/tests/scoped_threadpool.rs
index 534e8bb..9321471 100644
--- a/tests/scoped_threadpool.rs
+++ b/tests/scoped_threadpool.rs
@@ -93,7 +93,7 @@ fn build_scoped_tls_threadpool() {
},
)
.expect("thread pool created");
- // Internally, `crossbeam::scope` will wait for the threads to exit before returning.
+ // Internally, `std::thread::scope` will wait for the threads to exit before returning.
});
});
}
diff --git a/tests/use_current_thread.rs b/tests/use_current_thread.rs
new file mode 100644
index 0000000..ec801c9
--- /dev/null
+++ b/tests/use_current_thread.rs
@@ -0,0 +1,57 @@
+use rayon_core::ThreadPoolBuilder;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{self, JoinHandle};
+
+#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
+fn use_current_thread_basic() {
+ static JOIN_HANDLES: Mutex<Vec<JoinHandle<()>>> = Mutex::new(Vec::new());
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(2)
+ .use_current_thread()
+ .spawn_handler(|builder| {
+ let handle = thread::Builder::new().spawn(|| builder.run())?;
+ JOIN_HANDLES.lock().unwrap().push(handle);
+ Ok(())
+ })
+ .build()
+ .unwrap();
+ assert_eq!(rayon_core::current_thread_index(), Some(0));
+ assert_eq!(
+ JOIN_HANDLES.lock().unwrap().len(),
+ 1,
+ "Should only spawn one extra thread"
+ );
+
+ let another_pool = ThreadPoolBuilder::new()
+ .num_threads(2)
+ .use_current_thread()
+ .build();
+ assert!(
+ another_pool.is_err(),
+ "Should error if the thread is already part of a pool"
+ );
+
+ let pair = Arc::new((Mutex::new(false), Condvar::new()));
+ let pair2 = Arc::clone(&pair);
+ pool.spawn(move || {
+ assert_ne!(rayon_core::current_thread_index(), Some(0));
+ // This should execute even if the current thread is blocked, since we have two threads in
+ // the pool.
+ let &(ref started, ref condvar) = &*pair2;
+ *started.lock().unwrap() = true;
+ condvar.notify_one();
+ });
+
+ let _guard = pair
+ .1
+ .wait_while(pair.0.lock().unwrap(), |ran| !*ran)
+ .unwrap();
+ std::mem::drop(pool); // Drop the pool.
+
+ // Wait until all threads have actually exited. This is not really needed, other than to
+ // reduce noise of leak-checking tools.
+ for handle in std::mem::take(&mut *JOIN_HANDLES.lock().unwrap()) {
+ let _ = handle.join();
+ }
+}