/// Full runtime loom tests. These are heavy tests and take significant time to /// run on CI. /// /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test. /// /// In order to speed up the C use crate::future::poll_fn; use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; use crate::{spawn, task}; use tokio_test::assert_ok; use loom::sync::atomic::{AtomicBool, AtomicUsize}; use loom::sync::Arc; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::task::{Context, Poll}; mod atomic_take { use loom::sync::atomic::AtomicBool; use std::mem::MaybeUninit; use std::sync::atomic::Ordering::SeqCst; pub(super) struct AtomicTake { inner: MaybeUninit, taken: AtomicBool, } impl AtomicTake { pub(super) fn new(value: T) -> Self { Self { inner: MaybeUninit::new(value), taken: AtomicBool::new(false), } } pub(super) fn take(&self) -> Option { // safety: Only one thread will see the boolean change from false // to true, so that thread is able to take the value. match self.taken.fetch_or(true, SeqCst) { false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) }, true => None, } } } impl Drop for AtomicTake { fn drop(&mut self) { drop(self.take()); } } } #[derive(Clone)] struct AtomicOneshot { value: std::sync::Arc>>, } impl AtomicOneshot { fn new(sender: oneshot::Sender) -> Self { Self { value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)), } } fn assert_send(&self, value: T) { self.value.take().unwrap().send(value); } } /// Tests are divided into groups to make the runs faster on CI. mod group_a { use super::*; #[test] fn racy_shutdown() { loom::model(|| { let pool = mk_pool(1); // here's the case we want to exercise: // // a worker that still has tasks in its local queue gets sent to the blocking pool (due to // block_in_place). the blocking pool is shut down, so drops the worker. the worker's // shutdown method never gets run. // // we do this by spawning two tasks on one worker, the first of which does block_in_place, // and then immediately drop the pool. pool.spawn(track(async { crate::task::block_in_place(|| {}); })); pool.spawn(track(async {})); drop(pool); }); } #[test] fn pool_multi_spawn() { loom::model(|| { let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); let (tx, rx) = oneshot::channel(); let tx1 = AtomicOneshot::new(tx); // Spawn a task let c2 = c1.clone(); let tx2 = tx1.clone(); pool.spawn(track(async move { spawn(track(async move { if 1 == c1.fetch_add(1, Relaxed) { tx1.assert_send(()); } })); })); // Spawn a second task pool.spawn(track(async move { spawn(track(async move { if 1 == c2.fetch_add(1, Relaxed) { tx2.assert_send(()); } })); })); rx.recv(); }); } fn only_blocking_inner(first_pending: bool) { loom::model(move || { let pool = mk_pool(1); let (block_tx, block_rx) = oneshot::channel(); pool.spawn(track(async move { crate::task::block_in_place(move || { block_tx.send(()); }); if first_pending { task::yield_now().await } })); block_rx.recv(); drop(pool); }); } #[test] fn only_blocking_without_pending() { only_blocking_inner(false) } #[test] fn only_blocking_with_pending() { only_blocking_inner(true) } } mod group_b { use super::*; fn blocking_and_regular_inner(first_pending: bool) { const NUM: usize = 3; loom::model(move || { let pool = mk_pool(1); let cnt = Arc::new(AtomicUsize::new(0)); let (block_tx, block_rx) = oneshot::channel(); let (done_tx, done_rx) = oneshot::channel(); let done_tx = AtomicOneshot::new(done_tx); pool.spawn(track(async move { crate::task::block_in_place(move || { block_tx.send(()); }); if first_pending { task::yield_now().await } })); for _ in 0..NUM { let cnt = cnt.clone(); let done_tx = done_tx.clone(); pool.spawn(track(async move { if NUM == cnt.fetch_add(1, Relaxed) + 1 { done_tx.assert_send(()); } })); } done_rx.recv(); block_rx.recv(); drop(pool); }); } #[test] fn blocking_and_regular() { blocking_and_regular_inner(false); } #[test] fn blocking_and_regular_with_pending() { blocking_and_regular_inner(true); } #[test] fn join_output() { loom::model(|| { let rt = mk_pool(1); rt.block_on(async { let t = crate::spawn(track(async { "hello" })); let out = assert_ok!(t.await); assert_eq!("hello", out.into_inner()); }); }); } #[test] fn poll_drop_handle_then_drop() { loom::model(|| { let rt = mk_pool(1); rt.block_on(async move { let mut t = crate::spawn(track(async { "hello" })); poll_fn(|cx| { let _ = Pin::new(&mut t).poll(cx); Poll::Ready(()) }) .await; }); }) } #[test] fn complete_block_on_under_load() { loom::model(|| { let pool = mk_pool(1); pool.block_on(async { // Trigger a re-schedule crate::spawn(track(async { for _ in 0..2 { task::yield_now().await; } })); gated2(true).await }); }); } #[test] fn shutdown_with_notification() { use crate::sync::oneshot; loom::model(|| { let rt = mk_pool(2); let (done_tx, done_rx) = oneshot::channel::<()>(); rt.spawn(track(async move { let (tx, rx) = oneshot::channel::<()>(); crate::spawn(async move { crate::task::spawn_blocking(move || { let _ = tx.send(()); }); let _ = done_rx.await; }); let _ = rx.await; let _ = done_tx.send(()); })); }); } } mod group_c { use super::*; #[test] fn pool_shutdown() { loom::model(|| { let pool = mk_pool(2); pool.spawn(track(async move { gated2(true).await; })); pool.spawn(track(async move { gated2(false).await; })); drop(pool); }); } } mod group_d { use super::*; #[test] fn pool_multi_notify() { loom::model(|| { let pool = mk_pool(2); let c1 = Arc::new(AtomicUsize::new(0)); let (done_tx, done_rx) = oneshot::channel(); let done_tx1 = AtomicOneshot::new(done_tx); let done_tx2 = done_tx1.clone(); // Spawn a task let c2 = c1.clone(); pool.spawn(track(async move { gated().await; gated().await; if 1 == c1.fetch_add(1, Relaxed) { done_tx1.assert_send(()); } })); // Spawn a second task pool.spawn(track(async move { gated().await; gated().await; if 1 == c2.fetch_add(1, Relaxed) { done_tx2.assert_send(()); } })); done_rx.recv(); }); } } fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) .build() .unwrap() } fn gated() -> impl Future { gated2(false) } fn gated2(thread: bool) -> impl Future { use loom::thread; use std::sync::Arc; let gate = Arc::new(AtomicBool::new(false)); let mut fired = false; poll_fn(move |cx| { if !fired { let gate = gate.clone(); let waker = cx.waker().clone(); if thread { thread::spawn(move || { gate.store(true, SeqCst); waker.wake_by_ref(); }); } else { spawn(track(async move { gate.store(true, SeqCst); waker.wake_by_ref(); })); } fired = true; return Poll::Pending; } if gate.load(SeqCst) { Poll::Ready("hello world") } else { Poll::Pending } }) } fn track(f: T) -> Track { Track { inner: f, arc: Arc::new(()), } } pin_project! { struct Track { #[pin] inner: T, // Arc is used to hook into loom's leak tracking. arc: Arc<()>, } } impl Track { fn into_inner(self) -> T { self.inner } } impl Future for Track { type Output = Track; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); Poll::Ready(Track { inner: ready!(me.inner.poll(cx)), arc: me.arc.clone(), }) } }