From 96905156f39d29938fabf7cb278c4f8c52955af5 Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Tue, 6 Feb 2024 13:01:42 +0100 Subject: Upgrade rayon-core to 1.12.1 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update external/rust/crates/rayon-core For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md Test: TreeHugger Change-Id: I4844dc36a655701157d377fbbbe8619abcce504c --- .cargo_vcs_info.json | 2 +- Android.bp | 4 +- Cargo.toml | 17 +- Cargo.toml.orig | 19 +- METADATA | 25 ++- README.md | 2 +- src/broadcast/mod.rs | 5 +- src/broadcast/test.rs | 23 +-- src/latch.rs | 171 +++++++++++------- src/lib.rs | 84 ++++++--- src/log.rs | 413 -------------------------------------------- src/registry.rs | 177 ++++++++----------- src/scope/mod.rs | 116 ++----------- src/sleep/counters.rs | 4 +- src/sleep/mod.rs | 97 ++--------- src/spawn/mod.rs | 4 +- src/thread_pool/mod.rs | 37 ++++ src/thread_pool/test.rs | 4 +- tests/scoped_threadpool.rs | 2 +- tests/use_current_thread.rs | 57 ++++++ 20 files changed, 419 insertions(+), 844 deletions(-) delete mode 100644 src/log.rs create mode 100644 tests/use_current_thread.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 2a30505..f0dc383 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "6236214d717694917e77aa1c16d91176b9bc2fff" + "sha1": "7df6d5b8493a5fdb257565cab26dacdfe08ec8aa" }, "path_in_vcs": "rayon-core" } \ No newline at end of file diff --git a/Android.bp b/Android.bp index 64d1452..9b8ff99 100644 --- a/Android.bp +++ b/Android.bp @@ -42,14 +42,12 @@ rust_library { host_supported: true, crate_name: "rayon_core", cargo_env_compat: true, - cargo_pkg_version: "1.11.0", + cargo_pkg_version: "1.12.1", srcs: ["src/lib.rs"], edition: "2021", rustlibs: [ - "libcrossbeam_channel", "libcrossbeam_deque", "libcrossbeam_utils", - "libnum_cpus", ], apex_available: [ "//apex_available:platform", diff --git a/Cargo.toml b/Cargo.toml index d41715e..a3a11c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,9 @@ [package] edition = "2021" -rust-version = "1.59" +rust-version = "1.63" name = "rayon-core" -version = "1.11.0" +version = "1.12.1" authors = [ "Niko Matsakis ", "Josh Stone ", @@ -58,8 +58,9 @@ path = "tests/simple_panic.rs" name = "scoped_threadpool" path = "tests/scoped_threadpool.rs" -[dependencies.crossbeam-channel] -version = "0.5.0" +[[test]] +name = "use_current_thread" +path = "tests/use_current_thread.rs" [dependencies.crossbeam-deque] version = "0.8.1" @@ -67,8 +68,9 @@ version = "0.8.1" [dependencies.crossbeam-utils] version = "0.8.0" -[dependencies.num_cpus] -version = "1.2" +[dependencies.wasm_sync] +version = "0.1.0" +optional = true [dev-dependencies.rand] version = "0.8" @@ -79,5 +81,8 @@ version = "0.3" [dev-dependencies.scoped-tls] version = "1.0" +[features] +web_spin_lock = ["dep:wasm_sync"] + [target."cfg(unix)".dev-dependencies.libc] version = "0.2" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 920ffe5..09f8bf6 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,13 +1,13 @@ [package] name = "rayon-core" -version = "1.11.0" +version = "1.12.1" authors = ["Niko Matsakis ", "Josh Stone "] description = "Core APIs for Rayon" license = "MIT OR Apache-2.0" repository = "https://github.com/rayon-rs/rayon" documentation = "https://docs.rs/rayon/" -rust-version = "1.59" +rust-version = "1.63" edition = "2021" links = "rayon-core" build = "build.rs" @@ -17,10 +17,17 @@ categories = ["concurrency"] # Some dependencies may not be their latest version, in order to support older rustc. [dependencies] -num_cpus = "1.2" -crossbeam-channel = "0.5.0" crossbeam-deque = "0.8.1" crossbeam-utils = "0.8.0" +wasm_sync = { version = "0.1.0", optional = true } + +[features] + +# This feature switches to a spin-lock implementation on the browser's +# main thread to avoid the forbidden `atomics.wait`. +# +# Only useful on the `wasm32-unknown-unknown` target. +web_spin_lock = ["dep:wasm_sync"] [dev-dependencies] rand = "0.8" @@ -55,3 +62,7 @@ path = "tests/simple_panic.rs" [[test]] name = "scoped_threadpool" path = "tests/scoped_threadpool.rs" + +[[test]] +name = "use_current_thread" +path = "tests/use_current_thread.rs" diff --git a/METADATA b/METADATA index e9917f9..1eb7dcd 100644 --- a/METADATA +++ b/METADATA @@ -1,23 +1,20 @@ # This project was upgraded with external_updater. -# Usage: tools/external_updater/updater.sh update rust/crates/rayon-core -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# Usage: tools/external_updater/updater.sh update external/rust/crates/rayon-core +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md name: "rayon-core" description: "Core APIs for Rayon" third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/rayon-core" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/rayon-core/rayon-core-1.11.0.crate" - } - version: "1.11.0" license_type: NOTICE last_upgrade_date { - year: 2023 - month: 4 - day: 3 + year: 2024 + month: 2 + day: 6 + } + homepage: "https://crates.io/crates/rayon-core" + identifier { + type: "Archive" + value: "https://static.crates.io/crates/rayon-core/rayon-core-1.12.1.crate" + version: "1.12.1" } } diff --git a/README.md b/README.md index 448901b..6e2ebe2 100644 --- a/README.md +++ b/README.md @@ -8,4 +8,4 @@ Please see [Rayon Docs] for details about using Rayon. [Rayon Docs]: https://docs.rs/rayon/ -Rayon-core currently requires `rustc 1.59.0` or greater. +Rayon-core currently requires `rustc 1.63.0` or greater. diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index d991c54..96611e4 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -1,7 +1,6 @@ use crate::job::{ArcJob, StackJob}; -use crate::latch::LatchRef; +use crate::latch::{CountLatch, LatchRef}; use crate::registry::{Registry, WorkerThread}; -use crate::scope::ScopeLatch; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; @@ -107,7 +106,7 @@ where let n_threads = registry.num_threads(); let current_thread = WorkerThread::current().as_ref(); - let latch = ScopeLatch::with_count(n_threads, current_thread); + let latch = CountLatch::with_count(n_threads, current_thread); let jobs: Vec<_> = (0..n_threads) .map(|_| StackJob::new(&f, LatchRef::new(&latch))) .collect(); diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs index 3ae11f7..00ab4ad 100644 --- a/src/broadcast/test.rs +++ b/src/broadcast/test.rs @@ -2,6 +2,7 @@ use crate::ThreadPoolBuilder; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::channel; use std::sync::Arc; use std::{thread, time}; @@ -14,7 +15,7 @@ fn broadcast_global() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_global() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); let mut v: Vec<_> = rx.into_iter().collect(); @@ -33,7 +34,7 @@ fn broadcast_pool() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_pool() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -53,7 +54,7 @@ fn broadcast_self() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_self() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); @@ -81,7 +82,7 @@ fn broadcast_mutual() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -118,7 +119,7 @@ fn broadcast_mutual_sleepy() { #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_broadcast_mutual_sleepy() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); pool1.spawn({ @@ -158,8 +159,8 @@ fn broadcast_panic_one() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_one() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -196,8 +197,8 @@ fn broadcast_panic_many() { #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn spawn_broadcast_panic_many() { - let (tx, rx) = crossbeam_channel::unbounded(); - let (panic_tx, panic_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); + let (panic_tx, panic_rx) = channel(); let pool = ThreadPoolBuilder::new() .num_threads(7) .panic_handler(move |e| panic_tx.send(e).unwrap()) @@ -231,7 +232,7 @@ fn broadcast_sleep_race() { #[test] fn broadcast_after_spawn_broadcast() { - let (tx, rx) = crossbeam_channel::unbounded(); + let (tx, rx) = channel(); // Queue a non-blocking spawn_broadcast. crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); @@ -247,7 +248,7 @@ fn broadcast_after_spawn_broadcast() { #[test] fn broadcast_after_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn on a thread-local deque. crate::registry::in_worker(move |_, _| { diff --git a/src/latch.rs b/src/latch.rs index de43272..6c2e4fe 100644 --- a/src/latch.rs +++ b/src/latch.rs @@ -1,10 +1,11 @@ use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::Arc; use std::usize; use crate::registry::{Registry, WorkerThread}; +use crate::sync::{Condvar, Mutex}; /// We define various kinds of latches, which are all a primitive signaling /// mechanism. A latch starts as false. Eventually someone calls `set()` and @@ -84,13 +85,6 @@ impl CoreLatch { } } - /// Returns the address of this core latch as an integer. Used - /// for logging. - #[inline] - pub(super) fn addr(&self) -> usize { - self as *const CoreLatch as usize - } - /// Invoked by owning thread as it prepares to sleep. Returns true /// if the owning thread may proceed to fall asleep, false if the /// latch was set in the meantime. @@ -142,6 +136,13 @@ impl CoreLatch { } } +impl AsCoreLatch for CoreLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + self + } +} + /// Spin latches are the simplest, most efficient kind, but they do /// not support a `wait()` operation. They just have a boolean flag /// that becomes true when `set()` is called. @@ -269,62 +270,32 @@ impl Latch for LockLatch { } } -/// Counting latches are used to implement scopes. They track a -/// counter. Unlike other latches, calling `set()` does not -/// necessarily make the latch be considered `set()`; instead, it just -/// decrements the counter. The latch is only "set" (in the sense that -/// `probe()` returns true) once the counter reaches zero. +/// Once latches are used to implement one-time blocking, primarily +/// for the termination flag of the threads in the pool. /// -/// Note: like a `SpinLatch`, count laches are always associated with +/// Note: like a `SpinLatch`, once-latches are always associated with /// some registry that is probing them, which must be tickled when /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a /// reference to that registry. This is because in some cases the -/// registry owns the count-latch, and that would create a cycle. So a -/// `CountLatch` must be given a reference to its owning registry when +/// registry owns the once-latch, and that would create a cycle. So a +/// `OnceLatch` must be given a reference to its owning registry when /// it is set. For this reason, it does not implement the `Latch` /// trait (but it doesn't have to, as it is not used in those generic /// contexts). #[derive(Debug)] -pub(super) struct CountLatch { +pub(super) struct OnceLatch { core_latch: CoreLatch, - counter: AtomicUsize, } -impl CountLatch { - #[inline] - pub(super) fn new() -> CountLatch { - Self::with_count(1) - } - +impl OnceLatch { #[inline] - pub(super) fn with_count(n: usize) -> CountLatch { - CountLatch { + pub(super) fn new() -> OnceLatch { + Self { core_latch: CoreLatch::new(), - counter: AtomicUsize::new(n), - } - } - - #[inline] - pub(super) fn increment(&self) { - debug_assert!(!self.core_latch.probe()); - self.counter.fetch_add(1, Ordering::Relaxed); - } - - /// Decrements the latch counter by one. If this is the final - /// count, then the latch is **set**, and calls to `probe()` will - /// return true. Returns whether the latch was set. - #[inline] - pub(super) unsafe fn set(this: *const Self) -> bool { - if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - CoreLatch::set(&(*this).core_latch); - true - } else { - false } } - /// Decrements the latch counter by one and possibly set it. If - /// the latch is set, then the specific worker thread is tickled, + /// Set the latch, then tickle the specific worker thread, /// which should be the one that owns this latch. #[inline] pub(super) unsafe fn set_and_tickle_one( @@ -332,31 +303,81 @@ impl CountLatch { registry: &Registry, target_worker_index: usize, ) { - if Self::set(this) { + if CoreLatch::set(&(*this).core_latch) { registry.notify_worker_latch_is_set(target_worker_index); } } } -impl AsCoreLatch for CountLatch { +impl AsCoreLatch for OnceLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { &self.core_latch } } +/// Counting latches are used to implement scopes. They track a +/// counter. Unlike other latches, calling `set()` does not +/// necessarily make the latch be considered `set()`; instead, it just +/// decrements the counter. The latch is only "set" (in the sense that +/// `probe()` returns true) once the counter reaches zero. #[derive(Debug)] -pub(super) struct CountLockLatch { - lock_latch: LockLatch, +pub(super) struct CountLatch { counter: AtomicUsize, + kind: CountLatchKind, } -impl CountLockLatch { - #[inline] - pub(super) fn with_count(n: usize) -> CountLockLatch { - CountLockLatch { - lock_latch: LockLatch::new(), - counter: AtomicUsize::new(n), +enum CountLatchKind { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CoreLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: LockLatch }, +} + +impl std::fmt::Debug for CountLatchKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CountLatchKind::Stealing { latch, .. } => { + f.debug_tuple("Stealing").field(latch).finish() + } + CountLatchKind::Blocking { latch, .. } => { + f.debug_tuple("Blocking").field(latch).finish() + } + } + } +} + +impl CountLatch { + pub(super) fn new(owner: Option<&WorkerThread>) -> Self { + Self::with_count(1, owner) + } + + pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { + Self { + counter: AtomicUsize::new(count), + kind: match owner { + Some(owner) => CountLatchKind::Stealing { + latch: CoreLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => CountLatchKind::Blocking { + latch: LockLatch::new(), + }, + }, } } @@ -366,16 +387,42 @@ impl CountLockLatch { debug_assert!(old_counter != 0); } - pub(super) fn wait(&self) { - self.lock_latch.wait(); + pub(super) fn wait(&self, owner: Option<&WorkerThread>) { + match &self.kind { + CountLatchKind::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + CountLatchKind::Blocking { latch } => latch.wait(), + } } } -impl Latch for CountLockLatch { +impl Latch for CountLatch { #[inline] unsafe fn set(this: *const Self) { if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { - LockLatch::set(&(*this).lock_latch); + // NOTE: Once we call `set` on the internal `latch`, + // the target may proceed and invalidate `this`! + match (*this).kind { + CountLatchKind::Stealing { + ref latch, + ref registry, + worker_index, + } => { + let registry = Arc::clone(registry); + if CoreLatch::set(latch) { + registry.notify_worker_latch_is_set(worker_index); + } + } + CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), + } } } } diff --git a/src/lib.rs b/src/lib.rs index c9694ee..39df8a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,9 +73,8 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::str::FromStr; +use std::thread; -#[macro_use] -mod log; #[macro_use] mod private; @@ -104,6 +103,12 @@ pub use self::thread_pool::current_thread_index; pub use self::thread_pool::ThreadPool; pub use self::thread_pool::{yield_local, yield_now, Yield}; +#[cfg(not(feature = "web_spin_lock"))] +use std::sync; + +#[cfg(feature = "web_spin_lock")] +use wasm_sync as sync; + use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; /// Returns the maximum number of threads that Rayon supports in a single thread-pool. @@ -148,6 +153,7 @@ pub struct ThreadPoolBuildError { #[derive(Debug)] enum ErrorKind { GlobalPoolAlreadyInitialized, + CurrentThreadAlreadyInPool, IOError(io::Error), } @@ -175,6 +181,9 @@ pub struct ThreadPoolBuilder { /// If RAYON_NUM_THREADS is invalid or zero will use the default. num_threads: usize, + /// The thread we're building *from* will also be part of the pool. + use_current_thread: bool, + /// Custom closure, if any, to handle a panic that we cannot propagate /// anywhere else. panic_handler: Option>, @@ -228,6 +237,7 @@ impl Default for ThreadPoolBuilder { fn default() -> Self { ThreadPoolBuilder { num_threads: 0, + use_current_thread: false, panic_handler: None, get_thread_name: None, stack_size: None, @@ -284,12 +294,12 @@ where impl ThreadPoolBuilder { /// Creates a scoped `ThreadPool` initialized using this configuration. /// - /// This is a convenience function for building a pool using [`crossbeam::scope`] + /// This is a convenience function for building a pool using [`std::thread::scope`] /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). /// The threads in this pool will start by calling `wrapper`, which should /// do initialization and continue by calling `ThreadBuilder::run()`. /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// # Examples /// @@ -324,28 +334,22 @@ impl ThreadPoolBuilder { W: Fn(ThreadBuilder) + Sync, // expected to call `run()` F: FnOnce(&ThreadPool) -> R, { - let result = crossbeam_utils::thread::scope(|scope| { - let wrapper = &wrapper; + std::thread::scope(|scope| { let pool = self .spawn_handler(|thread| { - let mut builder = scope.builder(); + let mut builder = std::thread::Builder::new(); if let Some(name) = thread.name() { builder = builder.name(name.to_string()); } if let Some(size) = thread.stack_size() { builder = builder.stack_size(size); } - builder.spawn(move |_| wrapper(thread))?; + builder.spawn_scoped(scope, || wrapper(thread))?; Ok(()) }) .build()?; Ok(with_pool(&pool)) - }); - - match result { - Ok(result) => result, - Err(err) => unwind::resume_unwinding(err), - } + }) } } @@ -354,13 +358,11 @@ impl ThreadPoolBuilder { /// /// Note that the threads will not exit until after the pool is dropped. It /// is up to the caller to wait for thread termination if that is important - /// for any invariants. For instance, threads created in [`crossbeam::scope`] + /// for any invariants. For instance, threads created in [`std::thread::scope`] /// will be joined before that scope returns, and this will block indefinitely /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate /// until the entire process exits! /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html - /// /// # Examples /// /// A minimal spawn handler just needs to call `run()` from an independent thread. @@ -409,6 +411,7 @@ impl ThreadPoolBuilder { /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in /// [`build_scoped`](#method.build_scoped). /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// ``` @@ -445,6 +448,7 @@ impl ThreadPoolBuilder { spawn_handler: CustomSpawn::new(spawn), // ..self num_threads: self.num_threads, + use_current_thread: self.use_current_thread, panic_handler: self.panic_handler, get_thread_name: self.get_thread_name, stack_size: self.stack_size, @@ -465,12 +469,18 @@ impl ThreadPoolBuilder { if self.num_threads > 0 { self.num_threads } else { + let default = || { + thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }; + match env::var("RAYON_NUM_THREADS") .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => return x, - Some(x) if x == 0 => return num_cpus::get(), + Some(x @ 1..) => return x, + Some(0) => return default(), _ => {} } @@ -479,8 +489,8 @@ impl ThreadPoolBuilder { .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => x, - _ => num_cpus::get(), + Some(x @ 1..) => x, + _ => default(), } } } @@ -519,9 +529,8 @@ impl ThreadPoolBuilder { /// may change in the future, if you wish to rely on a fixed /// number of threads, you should use this function to specify /// that number. To reproduce the current default behavior, you - /// may wish to use the [`num_cpus` - /// crate](https://crates.io/crates/num_cpus) to query the number - /// of CPUs dynamically. + /// may wish to use [`std::thread::available_parallelism`] + /// to query the number of CPUs dynamically. /// /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment @@ -532,6 +541,24 @@ impl ThreadPoolBuilder { self } + /// Use the current thread as one of the threads in the pool. + /// + /// The current thread is guaranteed to be at index 0, and since the thread is not managed by + /// rayon, the spawn and exit handlers do not run for that thread. + /// + /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into + /// the thread-pool will generally not be picked up automatically by this thread unless you + /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. + /// + /// # Local thread-pools + /// + /// Using this in a local thread-pool means the registry will be leaked. In future versions + /// there might be a way of cleaning up the current-thread state. + pub fn use_current_thread(mut self) -> Self { + self.use_current_thread = true; + self + } + /// Returns a copy of the current panic handler. fn take_panic_handler(&mut self) -> Option> { self.panic_handler.take() @@ -734,18 +761,22 @@ impl ThreadPoolBuildError { const GLOBAL_POOL_ALREADY_INITIALIZED: &str = "The global thread pool has already been initialized."; +const CURRENT_THREAD_ALREADY_IN_POOL: &str = + "The current thread is already part of another thread pool."; + impl Error for ThreadPoolBuildError { #[allow(deprecated)] fn description(&self) -> &str { match self.kind { ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL, ErrorKind::IOError(ref e) => e.description(), } } fn source(&self) -> Option<&(dyn Error + 'static)> { match &self.kind { - ErrorKind::GlobalPoolAlreadyInitialized => None, + ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None, ErrorKind::IOError(e) => Some(e), } } @@ -754,6 +785,7 @@ impl Error for ThreadPoolBuildError { impl fmt::Display for ThreadPoolBuildError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.kind { + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f), ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), ErrorKind::IOError(e) => e.fmt(f), } @@ -771,6 +803,7 @@ impl fmt::Debug for ThreadPoolBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, + ref use_current_thread, ref get_thread_name, ref panic_handler, ref stack_size, @@ -795,6 +828,7 @@ impl fmt::Debug for ThreadPoolBuilder { f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) + .field("use_current_thread", use_current_thread) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) diff --git a/src/log.rs b/src/log.rs deleted file mode 100644 index 7b6daf0..0000000 --- a/src/log.rs +++ /dev/null @@ -1,413 +0,0 @@ -//! Debug Logging -//! -//! To use in a debug build, set the env var `RAYON_LOG` as -//! described below. In a release build, logs are compiled out by -//! default unless Rayon is built with `--cfg rayon_rs_log` (try -//! `RUSTFLAGS="--cfg rayon_rs_log"`). -//! -//! Note that logs are an internally debugging tool and their format -//! is considered unstable, as are the details of how to enable them. -//! -//! # Valid values for RAYON_LOG -//! -//! The `RAYON_LOG` variable can take on the following values: -//! -//! * `tail:` -- dumps the last 10,000 events into the given file; -//! useful for tracking down deadlocks -//! * `profile:` -- dumps only those events needed to reconstruct how -//! many workers are active at a given time -//! * `all:` -- dumps every event to the file; useful for debugging - -use crossbeam_channel::{self, Receiver, Sender}; -use std::collections::VecDeque; -use std::env; -use std::fs::File; -use std::io::{self, BufWriter, Write}; - -/// True if logs are compiled in. -pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -pub(super) enum Event { - /// Flushes events to disk, used to terminate benchmarking. - Flush, - - /// Indicates that a worker thread started execution. - ThreadStart { - worker: usize, - terminate_addr: usize, - }, - - /// Indicates that a worker thread started execution. - ThreadTerminate { worker: usize }, - - /// Indicates that a worker thread became idle, blocked on `latch_addr`. - ThreadIdle { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker thread found work to do, after - /// yield rounds. It should no longer be considered idle. - ThreadFoundWork { worker: usize, yields: u32 }, - - /// Indicates that a worker blocked on a latch observed that it was set. - /// - /// Internal debugging event that does not affect the state - /// machine. - ThreadSawLatchSet { worker: usize, latch_addr: usize }, - - /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal - /// sleep state that we saw at the time. - ThreadSleepy { worker: usize, jobs_counter: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because the latch was set. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, - - /// Indicates that the thread's attempt to fall asleep was - /// interrupted because a job was posted. (This is not, in and of - /// itself, a change to the thread state.) - ThreadSleepInterruptedByJob { worker: usize }, - - /// Indicates that an idle worker has gone to sleep. - ThreadSleeping { worker: usize, latch_addr: usize }, - - /// Indicates that a sleeping worker has awoken. - ThreadAwoken { worker: usize, latch_addr: usize }, - - /// Indicates that the given worker thread was notified it should - /// awaken. - ThreadNotify { worker: usize }, - - /// The given worker has pushed a job to its local deque. - JobPushed { worker: usize }, - - /// The given worker has popped a job from its local deque. - JobPopped { worker: usize }, - - /// The given worker has stolen a job from the deque of another. - JobStolen { worker: usize, victim: usize }, - - /// N jobs were injected into the global queue. - JobsInjected { count: usize }, - - /// A job was removed from the global queue. - JobUninjected { worker: usize }, - - /// A job was broadcasted to N threads. - JobBroadcast { count: usize }, - - /// When announcing a job, this was the value of the counters we observed. - /// - /// No effect on thread state, just a debugging event. - JobThreadCounts { - worker: usize, - num_idle: u16, - num_sleepers: u16, - }, -} - -/// Handle to the logging thread, if any. You can use this to deliver -/// logs. You can also clone it freely. -#[derive(Clone)] -pub(super) struct Logger { - sender: Option>, -} - -impl Logger { - pub(super) fn new(num_workers: usize) -> Logger { - if !LOG_ENABLED { - return Self::disabled(); - } - - // see the doc comment for the format - let env_log = match env::var("RAYON_LOG") { - Ok(s) => s, - Err(_) => return Self::disabled(), - }; - - let (sender, receiver) = crossbeam_channel::unbounded(); - - if let Some(filename) = env_log.strip_prefix("tail:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::tail_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else if env_log == "all" { - ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); - } else if let Some(filename) = env_log.strip_prefix("profile:") { - let filename = filename.to_string(); - ::std::thread::spawn(move || { - Self::profile_logger_thread(num_workers, filename, 10_000, receiver) - }); - } else { - panic!("RAYON_LOG should be 'tail:' or 'profile:'"); - } - - Logger { - sender: Some(sender), - } - } - - fn disabled() -> Logger { - Logger { sender: None } - } - - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> Event) { - if !LOG_ENABLED { - return; - } - - if let Some(sender) = &self.sender { - sender.send(event()).unwrap(); - } - } - - fn profile_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events = Vec::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - break; - } - - events.push(event); - if events.len() == capacity { - break; - } - } - - for event in events.drain(..) { - if state.simulate(&event) { - state.dump(&mut writer, &event).unwrap(); - } - } - - writer.flush().unwrap(); - } - } - - fn tail_logger_thread( - num_workers: usize, - log_filename: String, - capacity: usize, - receiver: Receiver, - ) { - let file = File::create(&log_filename) - .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); - - let mut writer = BufWriter::new(file); - let mut events: VecDeque = VecDeque::with_capacity(capacity); - let mut state = SimulatorState::new(num_workers); - let timeout = std::time::Duration::from_secs(30); - let mut skipped = false; - - loop { - while let Ok(event) = receiver.recv_timeout(timeout) { - if let Event::Flush = event { - // We ignore Flush events in tail mode -- - // we're really just looking for - // deadlocks. - continue; - } else { - if events.len() == capacity { - let event = events.pop_front().unwrap(); - state.simulate(&event); - skipped = true; - } - - events.push_back(event); - } - } - - if skipped { - writeln!(writer, "...").unwrap(); - skipped = false; - } - - for event in events.drain(..) { - // In tail mode, we dump *all* events out, whether or - // not they were 'interesting' to the state machine. - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - } - - writer.flush().unwrap(); - } - } - - fn all_logger_thread(num_workers: usize, receiver: Receiver) { - let stderr = std::io::stderr(); - let mut state = SimulatorState::new(num_workers); - - for event in receiver { - let mut writer = BufWriter::new(stderr.lock()); - state.simulate(&event); - state.dump(&mut writer, &event).unwrap(); - writer.flush().unwrap(); - } - } -} - -#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] -enum State { - Working, - Idle, - Notified, - Sleeping, - Terminated, -} - -impl State { - fn letter(&self) -> char { - match self { - State::Working => 'W', - State::Idle => 'I', - State::Notified => 'N', - State::Sleeping => 'S', - State::Terminated => 'T', - } - } -} - -struct SimulatorState { - local_queue_size: Vec, - thread_states: Vec, - injector_size: usize, -} - -impl SimulatorState { - fn new(num_workers: usize) -> Self { - Self { - local_queue_size: (0..num_workers).map(|_| 0).collect(), - thread_states: (0..num_workers).map(|_| State::Working).collect(), - injector_size: 0, - } - } - - fn simulate(&mut self, event: &Event) -> bool { - match *event { - Event::ThreadIdle { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Working); - self.thread_states[worker] = State::Idle; - true - } - - Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { - self.thread_states[worker] = State::Working; - true - } - - Event::ThreadTerminate { worker, .. } => { - self.thread_states[worker] = State::Terminated; - true - } - - Event::ThreadSleeping { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Idle); - self.thread_states[worker] = State::Sleeping; - true - } - - Event::ThreadAwoken { worker, .. } => { - assert_eq!(self.thread_states[worker], State::Notified); - self.thread_states[worker] = State::Idle; - true - } - - Event::JobPushed { worker } => { - self.local_queue_size[worker] += 1; - true - } - - Event::JobPopped { worker } => { - self.local_queue_size[worker] -= 1; - true - } - - Event::JobStolen { victim, .. } => { - self.local_queue_size[victim] -= 1; - true - } - - Event::JobsInjected { count } => { - self.injector_size += count; - true - } - - Event::JobUninjected { .. } => { - self.injector_size -= 1; - true - } - - Event::ThreadNotify { worker } => { - // Currently, this log event occurs while holding the - // thread lock, so we should *always* see it before - // the worker awakens. - assert_eq!(self.thread_states[worker], State::Sleeping); - self.thread_states[worker] = State::Notified; - true - } - - // remaining events are no-ops from pov of simulating the - // thread state - _ => false, - } - } - - fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { - let num_idle_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Idle) - .count(); - - let num_sleeping_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Sleeping) - .count(); - - let num_notified_threads = self - .thread_states - .iter() - .filter(|s| **s == State::Notified) - .count(); - - let num_pending_jobs: usize = self.local_queue_size.iter().sum(); - - write!(w, "{:2},", num_idle_threads)?; - write!(w, "{:2},", num_sleeping_threads)?; - write!(w, "{:2},", num_notified_threads)?; - write!(w, "{:4},", num_pending_jobs)?; - write!(w, "{:4},", self.injector_size)?; - - let event_str = format!("{:?}", event); - write!(w, r#""{:60}","#, event_str)?; - - for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { - write!(w, " T{:02},{}", i, state.letter(),)?; - - if *queue_size > 0 { - write!(w, ",{:03},", queue_size)?; - } else { - write!(w, ", ,")?; - } - } - - writeln!(w)?; - Ok(()) - } -} diff --git a/src/registry.rs b/src/registry.rs index 5d56ac9..46cd22b 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -1,8 +1,7 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; -use crate::log::Event::*; -use crate::log::Logger; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::sleep::Sleep; +use crate::sync::Mutex; use crate::unwind; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, @@ -17,7 +16,7 @@ use std::io; use std::mem; use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Once}; use std::thread; use std::usize; @@ -130,7 +129,6 @@ where } pub(super) struct Registry { - logger: Logger, thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, @@ -210,26 +208,7 @@ fn default_global_registry() -> Result, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new() - .num_threads(1) - .spawn_handler(|thread| { - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - unsafe { - WorkerThread::set_current(worker_thread); - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - } - - Ok(()) - }); - + let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; @@ -280,11 +259,9 @@ impl Registry { }) .unzip(); - let logger = Logger::new(n_threads); let registry = Arc::new(Registry { - logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(logger, n_threads), + sleep: Sleep::new(n_threads), injected_jobs: Injector::new(), broadcasts: Mutex::new(broadcasts), terminate_count: AtomicUsize::new(1), @@ -305,6 +282,25 @@ impl Registry { stealer, index, }; + + if index == 0 && builder.use_current_thread { + if !WorkerThread::current().is_null() { + return Err(ThreadPoolBuildError::new( + ErrorKind::CurrentThreadAlreadyInPool, + )); + } + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); + + unsafe { + WorkerThread::set_current(worker_thread); + Latch::set(®istry.thread_infos[index].primed); + } + continue; + } + if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } @@ -363,11 +359,6 @@ impl Registry { } } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.logger.log(event) - } - pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -426,8 +417,6 @@ impl Registry { /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_job: JobRef) { - self.log(|| JobsInjected { count: 1 }); - // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be @@ -442,22 +431,17 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); - self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); + self.sleep.new_injected_jobs(1, queue_was_empty); } fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } - fn pop_injected_job(&self, worker_index: usize) -> Option { + fn pop_injected_job(&self) -> Option { loop { match self.injected_jobs.steal() { - Steal::Success(job) => { - self.log(|| JobUninjected { - worker: worker_index, - }); - return Some(job); - } + Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } @@ -471,9 +455,6 @@ impl Registry { /// **Panics** if not given exactly as many jobs as there are threads. pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator) { assert_eq!(self.num_threads(), injected_jobs.len()); - self.log(|| JobBroadcast { - count: self.num_threads(), - }); { let broadcasts = self.broadcasts.lock().unwrap(); @@ -545,9 +526,6 @@ impl Registry { self.inject(job.as_job_ref()); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. - // flush accumulated logs as we exit the thread - self.logger.log(|| Flush); - job.into_result() }) } @@ -610,7 +588,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -640,10 +618,7 @@ struct ThreadInfo { /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. - /// - /// NB. We use a `CountLatch` here because it has no lifetimes and is - /// meant for async use, but the count never gets higher than one. - terminate: CountLatch, + terminate: OnceLatch, /// the "stealer" half of the worker's deque stealer: Stealer, @@ -654,7 +629,7 @@ impl ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), - terminate: CountLatch::new(), + terminate: OnceLatch::new(), stealer, } } @@ -737,11 +712,6 @@ impl WorkerThread { &self.registry } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.registry.logger.log(event) - } - /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { @@ -750,12 +720,9 @@ impl WorkerThread { #[inline] pub(super) unsafe fn push(&self, job: JobRef) { - self.log(|| JobPushed { worker: self.index }); let queue_was_empty = self.worker.is_empty(); self.worker.push(job); - self.registry - .sleep - .new_internal_jobs(self.index, 1, queue_was_empty); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); } #[inline] @@ -777,7 +744,6 @@ impl WorkerThread { let popped_job = self.worker.pop(); if popped_job.is_some() { - self.log(|| JobPopped { worker: self.index }); return popped_job; } @@ -813,31 +779,51 @@ impl WorkerThread { // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; - let mut idle_state = self.registry.sleep.start_looking(self.index, latch); - while !latch.probe() { - if let Some(job) = self.find_work() { - self.registry.sleep.work_found(idle_state); + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { self.execute(job); - idle_state = self.registry.sleep.start_looking(self.index, latch); - } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + continue; + } + + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + } } - } - // If we were sleepy, we are not anymore. We "found work" -- - // whatever the surrounding thread was doing before it had to - // wait. - self.registry.sleep.work_found(idle_state); + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + break; + } - self.log(|| ThreadSawLatchSet { - worker: self.index, - latch_addr: latch.addr(), - }); mem::forget(abort_guard); // successful execution, do not abort } + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + fn find_work(&self) -> Option { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers @@ -846,7 +832,7 @@ impl WorkerThread { // we take on something new. self.take_local_job() .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) + .or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { @@ -898,13 +884,7 @@ impl WorkerThread { .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Success(job) => { - self.log(|| JobStolen { - worker: self.index, - victim: victim_index, - }); - Some(job) - } + Steal::Success(job) => Some(job), Steal::Empty => None, Steal::Retry => { retry = true; @@ -940,24 +920,11 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.catch_unwind(|| handler(index)); } - let my_terminate_latch = ®istry.thread_infos[index].terminate; - worker_thread.log(|| ThreadStart { - worker: index, - terminate_addr: my_terminate_latch.as_core_latch().addr(), - }); - worker_thread.wait_until(my_terminate_latch); - - // Should not be any work left in our queue. - debug_assert!(worker_thread.take_local_job().is_none()); - - // let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); + worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. mem::forget(abort_guard); - worker_thread.log(|| ThreadTerminate { worker: index }); - // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { registry.catch_unwind(|| handler(index)); diff --git a/src/scope/mod.rs b/src/scope/mod.rs index f460dd7..b7163d1 100644 --- a/src/scope/mod.rs +++ b/src/scope/mod.rs @@ -7,7 +7,7 @@ use crate::broadcast::BroadcastContext; use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; -use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::latch::{CountLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::unwind; use std::any::Any; @@ -39,26 +39,6 @@ pub struct ScopeFifo<'scope> { fifos: Vec, } -pub(super) enum ScopeLatch { - /// A latch for scopes created on a rayon thread which will participate in work- - /// stealing while it waits for completion. This thread is not necessarily part - /// of the same registry as the scope itself! - Stealing { - latch: CountLatch, - /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool - /// with registry B, when a job completes in a thread of registry B, we may - /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. - /// That means we need a reference to registry A (since at that point we will - /// only have a reference to registry B), so we stash it here. - registry: Arc, - /// The index of the worker to wake in `registry` - worker_index: usize, - }, - - /// A latch for scopes created on a non-rayon thread which will block to wait. - Blocking { latch: CountLockLatch }, -} - struct ScopeBase<'scope> { /// thread registry where `scope()` was executed or where `in_place_scope()` /// should spawn jobs. @@ -69,7 +49,7 @@ struct ScopeBase<'scope> { panic: AtomicPtr>, /// latch to track job counts - job_completed_latch: ScopeLatch, + job_completed_latch: CountLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be @@ -650,21 +630,17 @@ impl<'scope> ScopeBase<'scope> { ScopeBase { registry: Arc::clone(registry), panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: ScopeLatch::new(owner), + job_completed_latch: CountLatch::new(owner), marker: PhantomData, } } - fn increment(&self) { - self.job_completed_latch.increment(); - } - fn heap_job_ref(&self, job: Box>) -> JobRef where FUNC: FnOnce() + Send + 'scope, { unsafe { - self.increment(); + self.job_completed_latch.increment(); job.into_job_ref() } } @@ -675,7 +651,7 @@ impl<'scope> ScopeBase<'scope> { { let n_threads = self.registry.num_threads(); let job_refs = (0..n_threads).map(|_| unsafe { - self.increment(); + self.job_completed_latch.increment(); ArcJob::as_job_ref(&job) }); @@ -710,17 +686,15 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - match unwind::halt_unwinding(func) { - Ok(r) => { - Latch::set(&(*this).job_completed_latch); - Some(r) - } + let result = match unwind::halt_unwinding(func) { + Ok(r) => Some(r), Err(err) => { (*this).job_panicked(err); - Latch::set(&(*this).job_completed_latch); None } - } + }; + Latch::set(&(*this).job_completed_latch); + result } fn job_panicked(&self, err: Box) { @@ -754,61 +728,6 @@ impl<'scope> ScopeBase<'scope> { } } -impl ScopeLatch { - fn new(owner: Option<&WorkerThread>) -> Self { - Self::with_count(1, owner) - } - - pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { - match owner { - Some(owner) => ScopeLatch::Stealing { - latch: CountLatch::with_count(count), - registry: Arc::clone(owner.registry()), - worker_index: owner.index(), - }, - None => ScopeLatch::Blocking { - latch: CountLockLatch::with_count(count), - }, - } - } - - fn increment(&self) { - match self { - ScopeLatch::Stealing { latch, .. } => latch.increment(), - ScopeLatch::Blocking { latch } => latch.increment(), - } - } - - pub(super) fn wait(&self, owner: Option<&WorkerThread>) { - match self { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => unsafe { - let owner = owner.expect("owner thread"); - debug_assert_eq!(registry.id(), owner.registry().id()); - debug_assert_eq!(*worker_index, owner.index()); - owner.wait_until(latch); - }, - ScopeLatch::Blocking { latch } => latch.wait(), - } - } -} - -impl Latch for ScopeLatch { - unsafe fn set(this: *const Self) { - match &*this { - ScopeLatch::Stealing { - latch, - registry, - worker_index, - } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), - ScopeLatch::Blocking { latch } => Latch::set(latch), - } - } -} - impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") @@ -830,21 +749,6 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> { } } -impl fmt::Debug for ScopeLatch { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ScopeLatch::Stealing { latch, .. } => fmt - .debug_tuple("ScopeLatch::Stealing") - .field(latch) - .finish(), - ScopeLatch::Blocking { latch } => fmt - .debug_tuple("ScopeLatch::Blocking") - .field(latch) - .finish(), - } - } -} - /// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. /// /// Unsafe code is still required to dereference the pointer, but that's fine in diff --git a/src/sleep/counters.rs b/src/sleep/counters.rs index f2a3de3..53d2c55 100644 --- a/src/sleep/counters.rs +++ b/src/sleep/counters.rs @@ -212,12 +212,12 @@ impl AtomicCounters { #[inline] fn select_thread(word: usize, shift: usize) -> usize { - ((word >> shift) as usize) & THREADS_MAX + (word >> shift) & THREADS_MAX } #[inline] fn select_jec(word: usize) -> usize { - (word >> JEC_SHIFT) as usize + word >> JEC_SHIFT } impl Counters { diff --git a/src/sleep/mod.rs b/src/sleep/mod.rs index af7225a..fa1f7be 100644 --- a/src/sleep/mod.rs +++ b/src/sleep/mod.rs @@ -2,11 +2,9 @@ //! for an overview. use crate::latch::CoreLatch; -use crate::log::Event::*; -use crate::log::Logger; +use crate::sync::{Condvar, Mutex}; use crossbeam_utils::CachePadded; use std::sync::atomic::Ordering; -use std::sync::{Condvar, Mutex}; use std::thread; use std::usize; @@ -22,8 +20,6 @@ use self::counters::{AtomicCounters, JobsEventCounter}; /// /// [`README.md`] README.md pub(super) struct Sleep { - logger: Logger, - /// One "sleep state" per worker. Used to track if a worker is sleeping and to have /// them block. worker_sleep_states: Vec>, @@ -62,22 +58,16 @@ const ROUNDS_UNTIL_SLEEPY: u32 = 32; const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; impl Sleep { - pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep { + pub(super) fn new(n_threads: usize) -> Sleep { assert!(n_threads <= THREADS_MAX); Sleep { - logger, worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), counters: AtomicCounters::new(), } } #[inline] - pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { - self.logger.log(|| ThreadIdle { - worker: worker_index, - latch_addr: latch.addr(), - }); - + pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { self.counters.add_inactive_thread(); IdleState { @@ -88,12 +78,7 @@ impl Sleep { } #[inline] - pub(super) fn work_found(&self, idle_state: IdleState) { - self.logger.log(|| ThreadFoundWork { - worker: idle_state.worker_index, - yields: idle_state.rounds, - }); - + pub(super) fn work_found(&self) { // If we were the last idle thread and other threads are still sleeping, // then we should wake up another thread. let threads_to_wake = self.counters.sub_inactive_thread(); @@ -111,7 +96,7 @@ impl Sleep { thread::yield_now(); idle_state.rounds += 1; } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { - idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index); + idle_state.jobs_counter = self.announce_sleepy(); idle_state.rounds += 1; thread::yield_now(); } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { @@ -124,16 +109,10 @@ impl Sleep { } #[cold] - fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter { - let counters = self - .counters - .increment_jobs_event_counter_if(JobsEventCounter::is_active); - let jobs_counter = counters.jobs_counter(); - self.logger.log(|| ThreadSleepy { - worker: worker_index, - jobs_counter: jobs_counter.as_usize(), - }); - jobs_counter + fn announce_sleepy(&self) -> JobsEventCounter { + self.counters + .increment_jobs_event_counter_if(JobsEventCounter::is_active) + .jobs_counter() } #[cold] @@ -146,11 +125,6 @@ impl Sleep { let worker_index = idle_state.worker_index; if !latch.get_sleepy() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - return; } @@ -161,11 +135,6 @@ impl Sleep { // Our latch was signalled. We should wake back up fully as we // will have some stuff to do. if !latch.fall_asleep() { - self.logger.log(|| ThreadSleepInterruptedByLatch { - worker: worker_index, - latch_addr: latch.addr(), - }); - idle_state.wake_fully(); return; } @@ -180,10 +149,6 @@ impl Sleep { // we didn't see it. We should return to just before the SLEEPY // state so we can do another search and (if we fail to find // work) go back to sleep. - self.logger.log(|| ThreadSleepInterruptedByJob { - worker: worker_index, - }); - idle_state.wake_partly(); latch.wake_up(); return; @@ -197,11 +162,6 @@ impl Sleep { // Successfully registered as asleep. - self.logger.log(|| ThreadSleeping { - worker: worker_index, - latch_addr: latch.addr(), - }); - // We have one last check for injected jobs to do. This protects against // deadlock in the very unlikely event that // @@ -232,11 +192,6 @@ impl Sleep { // Update other state: idle_state.wake_fully(); latch.wake_up(); - - self.logger.log(|| ThreadAwoken { - worker: worker_index, - latch_addr: latch.addr(), - }); } /// Notify the given thread that it should wake up (if it is @@ -254,24 +209,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_injected_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { + pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // This fence is needed to guarantee that threads // as they are about to fall asleep, observe any // new jobs that may have been injected. std::sync::atomic::fence(Ordering::SeqCst); - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + self.new_jobs(num_jobs, queue_was_empty) } /// Signals that `num_jobs` new jobs were pushed onto a thread's @@ -284,24 +231,16 @@ impl Sleep { /// /// # Parameters /// - /// - `source_worker_index` -- index of the thread that did the - /// push, or `usize::MAX` if this came from outside the thread - /// pool -- it is used only for logging. /// - `num_jobs` -- lower bound on number of jobs available for stealing. /// We'll try to get at least one thread per job. #[inline] - pub(super) fn new_internal_jobs( - &self, - source_worker_index: usize, - num_jobs: u32, - queue_was_empty: bool, - ) { - self.new_jobs(source_worker_index, num_jobs, queue_was_empty) + pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { + self.new_jobs(num_jobs, queue_was_empty) } /// Common helper for `new_injected_jobs` and `new_internal_jobs`. #[inline] - fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) { + fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { // Read the counters and -- if sleepy workers have announced themselves // -- announce that there is now work available. The final value of `counters` // with which we exit the loop thus corresponds to a state when @@ -311,12 +250,6 @@ impl Sleep { let num_awake_but_idle = counters.awake_but_idle_threads(); let num_sleepers = counters.sleeping_threads(); - self.logger.log(|| JobThreadCounts { - worker: source_worker_index, - num_idle: num_awake_but_idle as u16, - num_sleepers: num_sleepers as u16, - }); - if num_sleepers == 0 { // nobody to wake return; @@ -372,8 +305,6 @@ impl Sleep { // do. self.counters.sub_sleeping_thread(); - self.logger.log(|| ThreadNotify { worker: index }); - true } else { false diff --git a/src/spawn/mod.rs b/src/spawn/mod.rs index 1aa9edb..22c5898 100644 --- a/src/spawn/mod.rs +++ b/src/spawn/mod.rs @@ -4,8 +4,8 @@ use crate::unwind; use std::mem; use std::sync::Arc; -/// Fires off a task into the Rayon threadpool in the "static" or -/// "global" scope. Just like a standard thread, this task is not +/// Puts the task into the Rayon threadpool's job queue in the "static" +/// or "global" scope. Just like a standard thread, this task is not /// tied to the current stack frame, and hence it cannot hold any /// references other than those with `'static` lifetime. If you want /// to spawn a task that references stack data, use [the `scope()` diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index c37826e..5ae6e0f 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -80,6 +80,43 @@ impl ThreadPool { /// thread-local data from the current thread will not be /// accessible. /// + /// # Warning: execution order + /// + /// If the current thread is part of a different thread pool, it will try to + /// keep busy while the `op` completes in its target pool, similar to + /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may + /// potentially schedule other tasks to run on the current thread in the + /// meantime. For example + /// + /// ```rust + /// # use rayon_core as rayon; + /// fn main() { + /// rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap(); + /// let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap(); + /// let do_it = || { + /// print!("one "); + /// pool.install(||{}); + /// print!("two "); + /// }; + /// rayon::join(|| do_it(), || do_it()); + /// } + /// ``` + /// + /// Since we configured just one thread in the global pool, one might + /// expect `do_it()` to run sequentially, producing: + /// + /// ```ascii + /// one two one two + /// ``` + /// + /// However each call to `install()` yields implicitly, allowing rayon to + /// run multiple instances of `do_it()` concurrently on the single, global + /// thread. The following output would be equally valid: + /// + /// ```ascii + /// one one two two + /// ``` + /// /// # Panics /// /// If `op` should panic, that panic will be propagated. diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index 6143e57..88b3628 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -383,7 +383,7 @@ fn in_place_scope_fifo_no_deadlock() { #[test] fn yield_now_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); @@ -401,7 +401,7 @@ fn yield_now_to_spawn() { #[test] fn yield_local_to_spawn() { - let (tx, rx) = crossbeam_channel::bounded(1); + let (tx, rx) = channel(); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); diff --git a/tests/scoped_threadpool.rs b/tests/scoped_threadpool.rs index 534e8bb..9321471 100644 --- a/tests/scoped_threadpool.rs +++ b/tests/scoped_threadpool.rs @@ -93,7 +93,7 @@ fn build_scoped_tls_threadpool() { }, ) .expect("thread pool created"); - // Internally, `crossbeam::scope` will wait for the threads to exit before returning. + // Internally, `std::thread::scope` will wait for the threads to exit before returning. }); }); } diff --git a/tests/use_current_thread.rs b/tests/use_current_thread.rs new file mode 100644 index 0000000..ec801c9 --- /dev/null +++ b/tests/use_current_thread.rs @@ -0,0 +1,57 @@ +use rayon_core::ThreadPoolBuilder; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; + +#[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] +fn use_current_thread_basic() { + static JOIN_HANDLES: Mutex>> = Mutex::new(Vec::new()); + let pool = ThreadPoolBuilder::new() + .num_threads(2) + .use_current_thread() + .spawn_handler(|builder| { + let handle = thread::Builder::new().spawn(|| builder.run())?; + JOIN_HANDLES.lock().unwrap().push(handle); + Ok(()) + }) + .build() + .unwrap(); + assert_eq!(rayon_core::current_thread_index(), Some(0)); + assert_eq!( + JOIN_HANDLES.lock().unwrap().len(), + 1, + "Should only spawn one extra thread" + ); + + let another_pool = ThreadPoolBuilder::new() + .num_threads(2) + .use_current_thread() + .build(); + assert!( + another_pool.is_err(), + "Should error if the thread is already part of a pool" + ); + + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = Arc::clone(&pair); + pool.spawn(move || { + assert_ne!(rayon_core::current_thread_index(), Some(0)); + // This should execute even if the current thread is blocked, since we have two threads in + // the pool. + let &(ref started, ref condvar) = &*pair2; + *started.lock().unwrap() = true; + condvar.notify_one(); + }); + + let _guard = pair + .1 + .wait_while(pair.0.lock().unwrap(), |ran| !*ran) + .unwrap(); + std::mem::drop(pool); // Drop the pool. + + // Wait until all threads have actually exited. This is not really needed, other than to + // reduce noise of leak-checking tools. + for handle in std::mem::take(&mut *JOIN_HANDLES.lock().unwrap()) { + let _ = handle.join(); + } +} -- cgit v1.2.3