diff options
Diffstat (limited to 'tests/rt_common.rs')
-rw-r--r-- | tests/rt_common.rs | 242 |
1 files changed, 124 insertions, 118 deletions
diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 71101d4..74a94d5 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -2,45 +2,45 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -// Tests to run on both current-thread & therad-pool runtime variants. +// Tests to run on both current-thread & thread-pool runtime variants. macro_rules! rt_test { ($($t:tt)*) => { - mod basic_scheduler { + mod current_thread_scheduler { $($t)* - fn rt() -> Runtime { - tokio::runtime::Builder::new() - .basic_scheduler() + fn rt() -> Arc<Runtime> { + tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_4_threads { $($t)* - fn rt() -> Runtime { - tokio::runtime::Builder::new() - .threaded_scheduler() - .core_threads(4) + fn rt() -> Arc<Runtime> { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) .enable_all() .build() .unwrap() + .into() } } mod threaded_scheduler_1_thread { $($t)* - fn rt() -> Runtime { - tokio::runtime::Builder::new() - .threaded_scheduler() - .core_threads(1) + fn rt() -> Arc<Runtime> { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) .enable_all() .build() .unwrap() + .into() } } } @@ -72,7 +72,7 @@ rt_test! { #[test] fn block_on_sync() { - let mut rt = rt(); + let rt = rt(); let mut win = false; rt.block_on(async { @@ -82,41 +82,12 @@ rt_test! { assert!(win); } - #[test] - fn block_on_handle_sync() { - let rt = rt(); - - let mut win = false; - rt.handle().block_on(async { - win = true; - }); - - assert!(win); - } #[test] fn block_on_async() { - let mut rt = rt(); - - let out = rt.block_on(async { - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - thread::sleep(Duration::from_millis(50)); - tx.send("ZOMG").unwrap(); - }); - - assert_ok!(rx.await) - }); - - assert_eq!(out, "ZOMG"); - } - - #[test] - fn block_on_handle_async() { let rt = rt(); - let out = rt.handle().block_on(async { + let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); thread::spawn(move || { @@ -132,7 +103,7 @@ rt_test! { #[test] fn spawn_one_bg() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -149,7 +120,7 @@ rt_test! { #[test] fn spawn_one_join() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -172,7 +143,7 @@ rt_test! { #[test] fn spawn_two() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -199,7 +170,7 @@ rt_test! { const ITER: usize = 200; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { let (done_tx, mut done_rx) = mpsc::unbounded_channel(); @@ -232,7 +203,7 @@ rt_test! { out.push(i); } - out.sort(); + out.sort_unstable(); out }); @@ -249,7 +220,7 @@ rt_test! { const ITER: usize = 500; - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { tokio::spawn(async move { @@ -291,7 +262,7 @@ rt_test! { out.push(i); } - out.sort(); + out.sort_unstable(); out }).await.unwrap() }); @@ -305,7 +276,7 @@ rt_test! { #[test] fn spawn_await_chain() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async { assert_ok!(tokio::spawn(async { @@ -320,7 +291,7 @@ rt_test! { #[test] fn outstanding_tasks_dropped() { - let mut rt = rt(); + let rt = rt(); let cnt = Arc::new(()); @@ -343,16 +314,16 @@ rt_test! { #[test] #[should_panic] fn nested_rt() { - let mut rt1 = rt(); - let mut rt2 = rt(); + let rt1 = rt(); + let rt2 = rt(); rt1.block_on(async { rt2.block_on(async { "hello" }) }); } #[test] fn create_rt_in_block_on() { - let mut rt1 = rt(); - let mut rt2 = rt1.block_on(async { rt() }); + let rt1 = rt(); + let rt2 = rt1.block_on(async { rt() }); let out = rt2.block_on(async { "ZOMG" }); assert_eq!(out, "ZOMG"); @@ -360,7 +331,7 @@ rt_test! { #[test] fn complete_block_on_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx, rx) = oneshot::channel(); @@ -383,7 +354,7 @@ rt_test! { #[test] fn complete_task_under_load() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (tx1, rx1) = oneshot::channel(); @@ -412,8 +383,8 @@ rt_test! { #[test] fn spawn_from_other_thread_idle() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -432,8 +403,8 @@ rt_test! { #[test] fn spawn_from_other_thread_under_load() { - let mut rt = rt(); - let handle = rt.handle().clone(); + let rt = rt(); + let handle = rt.clone(); let (tx, rx) = oneshot::channel(); @@ -456,22 +427,22 @@ rt_test! { } #[test] - fn delay_at_root() { - let mut rt = rt(); + fn sleep_at_root() { + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); rt.block_on(async move { - time::delay_for(dur).await; + time::sleep(dur).await; }); assert!(now.elapsed() >= dur); } #[test] - fn delay_in_spawn() { - let mut rt = rt(); + fn sleep_in_spawn() { + let rt = rt(); let now = Instant::now(); let dur = Duration::from_millis(50); @@ -480,7 +451,7 @@ rt_test! { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - time::delay_for(dur).await; + time::sleep(dur).await; assert_ok!(tx.send(())); }); @@ -492,12 +463,12 @@ rt_test! { #[test] fn block_on_socket() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { let (tx, rx) = oneshot::channel(); - let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { @@ -512,7 +483,7 @@ rt_test! { #[test] fn spawn_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -527,7 +498,7 @@ rt_test! { #[test] fn spawn_blocking_from_blocking() { - let mut rt = rt(); + let rt = rt(); let out = rt.block_on(async move { let inner = assert_ok!(tokio::task::spawn_blocking(|| { @@ -541,8 +512,8 @@ rt_test! { } #[test] - fn delay_from_blocking() { - let mut rt = rt(); + fn sleep_from_blocking() { + let rt = rt(); rt.block_on(async move { assert_ok!(tokio::task::spawn_blocking(|| { @@ -552,7 +523,7 @@ rt_test! { // use the futures' block_on fn to make sure we aren't setting // any Tokio context futures::executor::block_on(async { - tokio::time::delay_for(dur).await; + tokio::time::sleep(dur).await; }); assert!(now.elapsed() >= dur); @@ -562,10 +533,10 @@ rt_test! { #[test] fn socket_from_blocking() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async move { - let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(listener.local_addr()); let peer = tokio::task::spawn_blocking(move || { @@ -584,20 +555,35 @@ rt_test! { } #[test] - fn spawn_blocking_after_shutdown() { + fn always_active_parker() { + // This test it to show that we will always have + // an active parker even if we call block_on concurrently + let rt = rt(); - let handle = rt.handle().clone(); + let rt2 = rt.clone(); - // Shutdown - drop(rt); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); - handle.enter(|| { - let res = task::spawn_blocking(|| unreachable!()); + let jh1 = thread::spawn(move || { + rt.block_on(async move { + rx2.await.unwrap(); + time::sleep(Duration::from_millis(5)).await; + tx1.send(()).unwrap(); + }); + }); - // Avoid using a tokio runtime - let out = futures::executor::block_on(res); - assert!(out.is_err()); + let jh2 = thread::spawn(move || { + rt2.block_on(async move { + tx2.send(()).unwrap(); + time::sleep(Duration::from_millis(5)).await; + rx1.await.unwrap(); + time::sleep(Duration::from_millis(5)).await; + }); }); + + jh1.join().unwrap(); + jh2.join().unwrap(); } #[test] @@ -615,7 +601,7 @@ rt_test! { // test is disabled. #[cfg(not(windows))] fn io_driver_called_when_under_load() { - let mut rt = rt(); + let rt = rt(); // Create a lot of constant load. The scheduler will always be busy. for _ in 0..100 { @@ -628,7 +614,7 @@ rt_test! { // Do some I/O work rt.block_on(async { - let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(listener.local_addr()); let srv = tokio::spawn(async move { @@ -651,7 +637,7 @@ rt_test! { #[test] fn client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); rt.block_on(async move { client_server(tx).await }); @@ -662,7 +648,7 @@ rt_test! { #[test] fn panic_in_task() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = oneshot::channel(); struct Boom(Option<oneshot::Sender<()>>); @@ -689,7 +675,7 @@ rt_test! { #[test] #[should_panic] fn panic_in_block_on() { - let mut rt = rt(); + let rt = rt(); rt.block_on(async { panic!() }); } @@ -709,10 +695,11 @@ rt_test! { #[test] fn enter_and_spawn() { - let mut rt = rt(); - let handle = rt.enter(|| { + let rt = rt(); + let handle = { + let _enter = rt.enter(); tokio::spawn(async {}) - }); + }; assert_ok!(rt.block_on(handle)); } @@ -739,7 +726,7 @@ rt_test! { } } - let mut rt = rt(); + let rt = rt(); let (drop_tx, drop_rx) = mpsc::channel(); let (run_tx, run_rx) = oneshot::channel(); @@ -775,17 +762,17 @@ rt_test! { let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); - let mut rt = rt(); + let rt = rt(); - let h1 = rt.handle().clone(); + let h1 = rt.clone(); - rt.handle().spawn(async move { + rt.spawn(async move { // Ensure a waker gets stored in oneshot 1. let _ = rx1.await; tx3.send(()).unwrap(); }); - rt.handle().spawn(async move { + rt.spawn(async move { // When this task is dropped, we'll be "closing remotes". // We spawn a new task that owns the `tx1`, to move its Drop // out of here. @@ -802,7 +789,7 @@ rt_test! { let _ = rx2.await; }); - rt.handle().spawn(async move { + rt.spawn(async move { let _ = rx3.await; // We'll never get here, but once task 3 drops, this will // force task 2 to re-schedule since it's waiting on oneshot 2. @@ -821,14 +808,16 @@ rt_test! { #[test] fn io_notify_while_shutting_down() { use std::net::Ipv6Addr; + use std::sync::Arc; for _ in 1..10 { - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); let addr = socket.local_addr().unwrap(); - let (mut recv_half, mut send_half) = socket.split(); + let send_half = Arc::new(socket); + let recv_half = send_half.clone(); tokio::spawn(async move { let mut buf = [0]; @@ -842,11 +831,11 @@ rt_test! { let buf = [0]; loop { send_half.send_to(&buf, &addr).await.unwrap(); - tokio::time::delay_for(Duration::from_millis(1)).await; + tokio::time::sleep(Duration::from_millis(1)).await; } }); - tokio::time::delay_for(Duration::from_millis(5)).await; + tokio::time::sleep(Duration::from_millis(5)).await; }); } } @@ -854,7 +843,7 @@ rt_test! { #[test] fn shutdown_timeout() { let (tx, rx) = oneshot::channel(); - let mut runtime = rt(); + let runtime = rt(); runtime.block_on(async move { task::spawn_blocking(move || { @@ -865,10 +854,25 @@ rt_test! { rx.await.unwrap(); }); - runtime.shutdown_timeout(Duration::from_millis(100)); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); + } + + #[test] + fn shutdown_wakeup_time() { + let runtime = rt(); + + runtime.block_on(async move { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000)); } + // This test is currently ignored on Windows because of a + // rust-lang issue in thread local storage destructors. + // See https://github.com/rust-lang/rust/issues/74875 #[test] + #[cfg(not(windows))] fn runtime_in_thread_local() { use std::cell::RefCell; use std::thread; @@ -879,7 +883,9 @@ rt_test! { thread::spawn(|| { R.with(|cell| { - *cell.borrow_mut() = Some(rt()); + let rt = rt(); + let rt = Arc::try_unwrap(rt).unwrap(); + *cell.borrow_mut() = Some(rt); }); let _rt = rt(); @@ -887,7 +893,7 @@ rt_test! { } async fn client_server(tx: mpsc::Sender<()>) { - let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); @@ -912,13 +918,13 @@ rt_test! { #[test] fn local_set_block_on_socket() { - let mut rt = rt(); + let rt = rt(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { + local.block_on(&rt, async move { let (tx, rx) = oneshot::channel(); - let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); task::spawn_local(async move { @@ -933,19 +939,19 @@ rt_test! { #[test] fn local_set_client_server_block_on() { - let mut rt = rt(); + let rt = rt(); let (tx, rx) = mpsc::channel(); let local = task::LocalSet::new(); - local.block_on(&mut rt, async move { client_server_local(tx).await }); + local.block_on(&rt, async move { client_server_local(tx).await }); assert_ok!(rx.try_recv()); assert_err!(rx.try_recv()); } async fn client_server_local(tx: mpsc::Sender<()>) { - let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); // Get the assigned address let addr = assert_ok!(server.local_addr()); @@ -972,7 +978,7 @@ rt_test! { fn coop() { use std::task::Poll::Ready; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { // Create a bunch of tasks @@ -981,7 +987,7 @@ rt_test! { }).collect::<Vec<_>>(); // Hope that all the tasks complete... - time::delay_for(Duration::from_millis(100)).await; + time::sleep(Duration::from_millis(100)).await; poll_fn(|cx| { // At least one task should not be ready @@ -1004,7 +1010,7 @@ rt_test! { const NUM: usize = 100; - let mut rt = rt(); + let rt = rt(); rt.block_on(async { let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); |