diff options
Diffstat (limited to 'tests/rt_common.rs')
-rw-r--r-- | tests/rt_common.rs | 187 |
1 files changed, 138 insertions, 49 deletions
diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 3892998..abca8dd 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -21,7 +21,7 @@ macro_rules! rt_test { } } - #[cfg(not(tokio_wasi))] // Wasi doesn't support threads + #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads mod threaded_scheduler_4_threads { $($t)* @@ -37,7 +37,7 @@ macro_rules! rt_test { } } - #[cfg(not(tokio_wasi))] // Wasi doesn't support threads + #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads mod threaded_scheduler_1_thread { $($t)* @@ -52,6 +52,40 @@ macro_rules! rt_test { .into() } } + + #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads + #[cfg(tokio_unstable)] + mod alt_threaded_scheduler_4_threads { + $($t)* + + const NUM_WORKERS: usize = 4; + + fn rt() -> Arc<Runtime> { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + .into() + } + } + + #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads + #[cfg(tokio_unstable)] + mod alt_threaded_scheduler_1_thread { + $($t)* + + const NUM_WORKERS: usize = 1; + + fn rt() -> Arc<Runtime> { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + .into() + } + } } } @@ -695,12 +729,34 @@ rt_test! { /// Tests that yielded tasks are not scheduled until **after** resource /// drivers are polled. /// - /// Note: we may have to delete this test as it is not necessarily reliable. /// The OS does not guarantee when I/O events are delivered, so there may be - /// more yields than anticipated. + /// more yields than anticipated. This makes the test slightly flaky. To + /// help avoid flakiness, we run the test 10 times and only fail it after + /// 10 failures in a row. + /// + /// Note that if the test fails by panicking rather than by returning false, + /// then we fail it immediately. That kind of failure should not happen + /// spuriously. #[test] #[cfg(not(target_os="wasi"))] fn yield_defers_until_park() { + for _ in 0..10 { + if yield_defers_until_park_inner() { + // test passed + return; + } + + // Wait a bit and run the test again. + std::thread::sleep(std::time::Duration::from_secs(2)); + } + + panic!("yield_defers_until_park is failing consistently"); + } + + /// Implementation of `yield_defers_until_park` test. Returns `true` if the + /// test passed. + #[cfg(not(target_os="wasi"))] + fn yield_defers_until_park_inner() -> bool { use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use std::sync::Barrier; @@ -727,14 +783,16 @@ rt_test! { barrier.wait(); - tokio::spawn(async move { + let (fail_test, fail_test_recv) = oneshot::channel::<()>(); + + let jh = tokio::spawn(async move { // Create a TCP litener let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::join!( async { - // Done blocking intentionally + // Done in a blocking manner intentionally. let _socket = std::net::TcpStream::connect(addr).unwrap(); // Yield until connected @@ -744,7 +802,12 @@ rt_test! { cnt += 1; if cnt >= 10 { - panic!("yielded too many times; TODO: delete this test?"); + // yielded too many times; report failure and + // sleep forever so that the `fail_test` branch + // of the `select!` below triggers. + let _ = fail_test.send(()); + futures::future::pending::<()>().await; + break; } } }, @@ -753,8 +816,20 @@ rt_test! { flag.store(true, SeqCst); } ); - }).await.unwrap(); - }); + }); + + // Wait until the spawned task completes or fails. If no message is + // sent on `fail_test`, then the test succeeds. Otherwise, it fails. + let success = fail_test_recv.await.is_err(); + + if success { + // Check for panics in spawned task. + jh.abort(); + jh.await.unwrap(); + } + + success + }) } #[cfg(not(target_os="wasi"))] // Wasi does not support threads @@ -769,9 +844,9 @@ rt_test! { assert_err!(rx.try_recv()); } - #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")] + #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")] + #[cfg(panic = "unwind")] #[test] - #[cfg(not(target_os = "android"))] fn panic_in_task() { let rt = rt(); let (tx, rx) = oneshot::channel(); @@ -799,7 +874,7 @@ rt_test! { #[test] #[should_panic] - #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")] + #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")] fn panic_in_block_on() { let rt = rt(); rt.block_on(async { panic!() }); @@ -876,10 +951,6 @@ rt_test! { #[test] fn wake_while_rt_is_dropping() { use tokio::sync::Barrier; - use core::sync::atomic::{AtomicBool, Ordering}; - - let drop_triggered = Arc::new(AtomicBool::new(false)); - let set_drop_triggered = drop_triggered.clone(); struct OnDrop<F: FnMut()>(F); @@ -891,56 +962,39 @@ rt_test! { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); - let (tx3, rx3) = oneshot::channel(); - let barrier = Arc::new(Barrier::new(4)); + let barrier = Arc::new(Barrier::new(3)); let barrier1 = barrier.clone(); let barrier2 = barrier.clone(); - let barrier3 = barrier.clone(); let rt = rt(); rt.spawn(async move { + let mut tx2 = Some(tx2); + let _d = OnDrop(move || { + let _ = tx2.take().unwrap().send(()); + }); + // Ensure a waker gets stored in oneshot 1. let _ = tokio::join!(rx1, barrier1.wait()); - tx3.send(()).unwrap(); }); rt.spawn(async move { - let h1 = tokio::runtime::Handle::current(); - // When this task is dropped, we'll be "closing remotes". - // We spawn a new task that owns the `tx1`, to move its Drop - // out of here. - // - // Importantly, the oneshot 1 has a waker already stored, so - // the eventual drop here will try to re-schedule again. - let mut opt_tx1 = Some(tx1); + let mut tx1 = Some(tx1); let _d = OnDrop(move || { - let tx1 = opt_tx1.take().unwrap(); - h1.spawn(async move { - tx1.send(()).unwrap(); - }); - // Just a sanity check that this entire thing actually happened - set_drop_triggered.store(true, Ordering::Relaxed); + let _ = tx1.take().unwrap().send(()); }); - let _ = tokio::join!(rx2, barrier2.wait()); - }); - rt.spawn(async move { - let _ = tokio::join!(rx3, barrier3.wait()); - // We'll never get here, but once task 3 drops, this will - // force task 2 to re-schedule since it's waiting on oneshot 2. - tx2.send(()).unwrap(); + // Ensure a waker gets stored in oneshot 2. + let _ = tokio::join!(rx2, barrier2.wait()); }); // Wait until every oneshot channel has been polled. rt.block_on(barrier.wait()); - // Drop the rt + // Drop the rt. Regardless of which task is dropped first, its destructor will wake the + // other task. drop(rt); - - // Make sure that the spawn actually happened - assert!(drop_triggered.load(Ordering::Relaxed)); } #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind() @@ -1029,7 +1083,7 @@ rt_test! { // See https://github.com/rust-lang/rust/issues/74875 #[test] #[cfg(not(windows))] - #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")] + #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread; @@ -1074,7 +1128,7 @@ rt_test! { tx.send(()).unwrap(); } - #[cfg(not(tokio_wasi))] // Wasi does not support bind + #[cfg(not(target_os = "wasi"))] // Wasi does not support bind #[test] fn local_set_block_on_socket() { let rt = rt(); @@ -1096,7 +1150,7 @@ rt_test! { }); } - #[cfg(not(tokio_wasi))] // Wasi does not support bind + #[cfg(not(target_os = "wasi"))] // Wasi does not support bind #[test] fn local_set_client_server_block_on() { let rt = rt(); @@ -1110,7 +1164,7 @@ rt_test! { assert_err!(rx.try_recv()); } - #[cfg(not(tokio_wasi))] // Wasi does not support bind + #[cfg(not(target_os = "wasi"))] // Wasi does not support bind async fn client_server_local(tx: mpsc::Sender<()>) { let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); @@ -1277,4 +1331,39 @@ rt_test! { } }); } + + #[test] + #[cfg(not(target_os="wasi"))] + fn shutdown_concurrent_spawn() { + const NUM_TASKS: usize = 10_000; + for _ in 0..5 { + let (tx, rx) = std::sync::mpsc::channel(); + let rt = rt(); + + let mut txs = vec![]; + + for _ in 0..NUM_TASKS { + let (tx, rx) = tokio::sync::oneshot::channel(); + txs.push(tx); + rt.spawn(async move { + rx.await.unwrap(); + }); + } + + // Prime the tasks + rt.block_on(async { tokio::task::yield_now().await }); + + let th = std::thread::spawn(move || { + tx.send(()).unwrap(); + for tx in txs.drain(..) { + let _ = tx.send(()); + } + }); + + rx.recv().unwrap(); + drop(rt); + + th.join().unwrap(); + } + } } |