#![cfg(test)] use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; #[test] #[should_panic(expected = "Hello, world!")] fn panic_propagate() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); thread_pool.install(|| { panic!("Hello, world!"); }); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn workers_stop() { let registry; { // once we exit this block, thread-pool will be dropped let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); registry = thread_pool.install(|| { // do some work on these threads join_a_lot(22); Arc::clone(&thread_pool.registry) }); assert_eq!(registry.num_threads(), 22); } // once thread-pool is dropped, registry should terminate, which // should lead to worker threads stopping registry.wait_until_stopped(); } fn join_a_lot(n: usize) { if n > 0 { join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); } } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn sleeper_stop() { use std::{thread, time}; let registry; { // once we exit this block, thread-pool will be dropped let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); registry = Arc::clone(&thread_pool.registry); // Give time for at least some of the thread pool to fall asleep. thread::sleep(time::Duration::from_secs(1)); } // once thread-pool is dropped, registry should terminate, which // should lead to worker threads stopping registry.wait_until_stopped(); } /// Creates a start/exit handler that increments an atomic counter. fn count_handler() -> (Arc, impl Fn(usize)) { let count = Arc::new(AtomicUsize::new(0)); (Arc::clone(&count), move |_| { count.fetch_add(1, Ordering::SeqCst); }) } /// Wait until a counter is no longer shared, then return its value. fn wait_for_counter(mut counter: Arc) -> usize { use std::{thread, time}; for _ in 0..60 { counter = match Arc::try_unwrap(counter) { Ok(counter) => return counter.into_inner(), Err(counter) => { thread::sleep(time::Duration::from_secs(1)); counter } }; } // That's too long! panic!("Counter is still shared!"); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn failed_thread_stack() { // Note: we first tried to force failure with a `usize::MAX` stack, but // macOS and Windows weren't fazed, or at least didn't fail the way we want. // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a // 2GB stack, so it might not fail until the second thread. let stack_size = ::std::isize::MAX as usize; let (start_count, start_handler) = count_handler(); let (exit_count, exit_handler) = count_handler(); let builder = ThreadPoolBuilder::new() .num_threads(10) .stack_size(stack_size) .start_handler(start_handler) .exit_handler(exit_handler); let pool = builder.build(); assert!(pool.is_err(), "thread stack should have failed!"); // With such a huge stack, 64-bit will probably fail on the first thread; // 32-bit might manage the first 2GB, but certainly fail the second. let start_count = wait_for_counter(start_count); assert!(start_count <= 1); assert_eq!(start_count, wait_for_counter(exit_count)); } #[test] #[cfg_attr(not(panic = "unwind"), ignore)] fn panic_thread_name() { let (start_count, start_handler) = count_handler(); let (exit_count, exit_handler) = count_handler(); let builder = ThreadPoolBuilder::new() .num_threads(10) .start_handler(start_handler) .exit_handler(exit_handler) .thread_name(|i| { if i >= 5 { panic!(); } format!("panic_thread_name#{}", i) }); let pool = crate::unwind::halt_unwinding(|| builder.build()); assert!(pool.is_err(), "thread-name panic should propagate!"); // Assuming they're created in order, threads 0 through 4 should have // been started already, and then terminated by the panic. assert_eq!(5, wait_for_counter(start_count)); assert_eq!(5, wait_for_counter(exit_count)); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn self_install() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); // If the inner `install` blocks, then nothing will actually run it! assert!(pool.install(|| pool.install(|| true))); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install() { let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let ok = pool1.install(|| { // This creates a dependency from `pool1` -> `pool2` pool2.install(|| { // This creates a dependency from `pool2` -> `pool1` pool1.install(|| { // If they blocked on inter-pool installs, there would be no // threads left to run this! true }) }) }); assert!(ok); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn mutual_install_sleepy() { use std::{thread, time}; let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let ok = pool1.install(|| { // This creates a dependency from `pool1` -> `pool2` pool2.install(|| { // Give `pool1` time to fall asleep. thread::sleep(time::Duration::from_secs(1)); // This creates a dependency from `pool2` -> `pool1` pool1.install(|| { // Give `pool2` time to fall asleep. thread::sleep(time::Duration::from_secs(1)); // If they blocked on inter-pool installs, there would be no // threads left to run this! true }) }) }); assert!(ok); } #[test] #[allow(deprecated)] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn check_thread_pool_new() { let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); assert_eq!(pool.current_num_threads(), 22); } macro_rules! test_scope_order { ($scope:ident => $spawn:ident) => {{ let builder = ThreadPoolBuilder::new().num_threads(1); let pool = builder.build().unwrap(); pool.install(|| { let vec = Mutex::new(vec![]); pool.$scope(|scope| { let vec = &vec; for i in 0..10 { scope.$spawn(move |_| { vec.lock().unwrap().push(i); }); } }); vec.into_inner().unwrap() }) }}; } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_lifo_order() { let vec = test_scope_order!(scope => spawn); let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed assert_eq!(vec, expected); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn scope_fifo_order() { let vec = test_scope_order!(scope_fifo => spawn_fifo); let expected: Vec = (0..10).collect(); // FIFO -> natural order assert_eq!(vec, expected); } macro_rules! test_spawn_order { ($spawn:ident) => {{ let builder = ThreadPoolBuilder::new().num_threads(1); let pool = &builder.build().unwrap(); let (tx, rx) = channel(); pool.install(move || { for i in 0..10 { let tx = tx.clone(); pool.$spawn(move || { tx.send(i).unwrap(); }); } }); rx.iter().collect::>() }}; } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_lifo_order() { let vec = test_spawn_order!(spawn); let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed assert_eq!(vec, expected); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn spawn_fifo_order() { let vec = test_spawn_order!(spawn_fifo); let expected: Vec = (0..10).collect(); // FIFO -> natural order assert_eq!(vec, expected); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_scopes() { // Create matching scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) where OP: FnOnce(&[&Scope<'scope>]) + Send, { if let Some((pool, tail)) = pools.split_first() { pool.scope(move |s| { // This move reduces the reference lifetimes by variance to match s, // but the actual scopes are still tied to the invariant 'scope. let mut scopes = scopes; scopes.push(s); nest(tail, scopes, op) }) } else { (op)(&scopes) } } let pools: Vec<_> = (0..10) .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) .collect(); let counter = AtomicUsize::new(0); nest(&pools, vec![], |scopes| { for &s in scopes { s.spawn(|_| { // Our 'scope lets us borrow the counter in every pool. counter.fetch_add(1, Ordering::Relaxed); }); } }); assert_eq!(counter.into_inner(), pools.len()); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn nested_fifo_scopes() { // Create matching fifo scopes for every thread pool. fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) where OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, { if let Some((pool, tail)) = pools.split_first() { pool.scope_fifo(move |s| { // This move reduces the reference lifetimes by variance to match s, // but the actual scopes are still tied to the invariant 'scope. let mut scopes = scopes; scopes.push(s); nest(tail, scopes, op) }) } else { (op)(&scopes) } } let pools: Vec<_> = (0..10) .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) .collect(); let counter = AtomicUsize::new(0); nest(&pools, vec![], |scopes| { for &s in scopes { s.spawn_fifo(|_| { // Our 'scope lets us borrow the counter in every pool. counter.fetch_add(1, Ordering::Relaxed); }); } }); assert_eq!(counter.into_inner(), pools.len()); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); let rx_ref = ℞ pool.in_place_scope(move |s| { // With regular scopes this closure would never run because this scope op // itself would block the only worker thread. s.spawn(move |_| { tx.send(()).unwrap(); }); rx_ref.recv().unwrap(); }); } #[test] #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn in_place_scope_fifo_no_deadlock() { let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let (tx, rx) = channel(); let rx_ref = ℞ pool.in_place_scope_fifo(move |s| { // With regular scopes this closure would never run because this scope op // itself would block the only worker thread. s.spawn_fifo(move |_| { tx.send(()).unwrap(); }); rx_ref.recv().unwrap(); }); } #[test] fn yield_now_to_spawn() { let (tx, rx) = crossbeam_channel::bounded(1); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); // The single-threaded fallback mode (for wasm etc.) won't // get a chance to run the spawn if we never yield to it. crate::registry::in_worker(move |_, _| { crate::yield_now(); }); // The spawn **must** have started by now, but we still might have to wait // for it to finish if a different thread stole it first. assert_eq!(22, rx.recv().unwrap()); } #[test] fn yield_local_to_spawn() { let (tx, rx) = crossbeam_channel::bounded(1); // Queue a regular spawn. crate::spawn(move || tx.send(22).unwrap()); // The single-threaded fallback mode (for wasm etc.) won't // get a chance to run the spawn if we never yield to it. crate::registry::in_worker(move |_, _| { crate::yield_local(); }); // The spawn **must** have started by now, but we still might have to wait // for it to finish if a different thread stole it first. assert_eq!(22, rx.recv().unwrap()); }