aboutsummaryrefslogtreecommitdiff
path: root/tests/rt_common.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rt_common.rs')
-rw-r--r--tests/rt_common.rs242
1 files changed, 124 insertions, 118 deletions
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 71101d4..74a94d5 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -2,45 +2,45 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-// Tests to run on both current-thread & therad-pool runtime variants.
+// Tests to run on both current-thread & thread-pool runtime variants.
macro_rules! rt_test {
($($t:tt)*) => {
- mod basic_scheduler {
+ mod current_thread_scheduler {
$($t)*
- fn rt() -> Runtime {
- tokio::runtime::Builder::new()
- .basic_scheduler()
+ fn rt() -> Arc<Runtime> {
+ tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
+ .into()
}
}
mod threaded_scheduler_4_threads {
$($t)*
- fn rt() -> Runtime {
- tokio::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(4)
+ fn rt() -> Arc<Runtime> {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(4)
.enable_all()
.build()
.unwrap()
+ .into()
}
}
mod threaded_scheduler_1_thread {
$($t)*
- fn rt() -> Runtime {
- tokio::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(1)
+ fn rt() -> Arc<Runtime> {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
.enable_all()
.build()
.unwrap()
+ .into()
}
}
}
@@ -72,7 +72,7 @@ rt_test! {
#[test]
fn block_on_sync() {
- let mut rt = rt();
+ let rt = rt();
let mut win = false;
rt.block_on(async {
@@ -82,41 +82,12 @@ rt_test! {
assert!(win);
}
- #[test]
- fn block_on_handle_sync() {
- let rt = rt();
-
- let mut win = false;
- rt.handle().block_on(async {
- win = true;
- });
-
- assert!(win);
- }
#[test]
fn block_on_async() {
- let mut rt = rt();
-
- let out = rt.block_on(async {
- let (tx, rx) = oneshot::channel();
-
- thread::spawn(move || {
- thread::sleep(Duration::from_millis(50));
- tx.send("ZOMG").unwrap();
- });
-
- assert_ok!(rx.await)
- });
-
- assert_eq!(out, "ZOMG");
- }
-
- #[test]
- fn block_on_handle_async() {
let rt = rt();
- let out = rt.handle().block_on(async {
+ let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
@@ -132,7 +103,7 @@ rt_test! {
#[test]
fn spawn_one_bg() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -149,7 +120,7 @@ rt_test! {
#[test]
fn spawn_one_join() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -172,7 +143,7 @@ rt_test! {
#[test]
fn spawn_two() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@@ -199,7 +170,7 @@ rt_test! {
const ITER: usize = 200;
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
@@ -232,7 +203,7 @@ rt_test! {
out.push(i);
}
- out.sort();
+ out.sort_unstable();
out
});
@@ -249,7 +220,7 @@ rt_test! {
const ITER: usize = 500;
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
tokio::spawn(async move {
@@ -291,7 +262,7 @@ rt_test! {
out.push(i);
}
- out.sort();
+ out.sort_unstable();
out
}).await.unwrap()
});
@@ -305,7 +276,7 @@ rt_test! {
#[test]
fn spawn_await_chain() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async {
assert_ok!(tokio::spawn(async {
@@ -320,7 +291,7 @@ rt_test! {
#[test]
fn outstanding_tasks_dropped() {
- let mut rt = rt();
+ let rt = rt();
let cnt = Arc::new(());
@@ -343,16 +314,16 @@ rt_test! {
#[test]
#[should_panic]
fn nested_rt() {
- let mut rt1 = rt();
- let mut rt2 = rt();
+ let rt1 = rt();
+ let rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
- let mut rt1 = rt();
- let mut rt2 = rt1.block_on(async { rt() });
+ let rt1 = rt();
+ let rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
@@ -360,7 +331,7 @@ rt_test! {
#[test]
fn complete_block_on_under_load() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
@@ -383,7 +354,7 @@ rt_test! {
#[test]
fn complete_task_under_load() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
@@ -412,8 +383,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_idle() {
- let mut rt = rt();
- let handle = rt.handle().clone();
+ let rt = rt();
+ let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@@ -432,8 +403,8 @@ rt_test! {
#[test]
fn spawn_from_other_thread_under_load() {
- let mut rt = rt();
- let handle = rt.handle().clone();
+ let rt = rt();
+ let handle = rt.clone();
let (tx, rx) = oneshot::channel();
@@ -456,22 +427,22 @@ rt_test! {
}
#[test]
- fn delay_at_root() {
- let mut rt = rt();
+ fn sleep_at_root() {
+ let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
- time::delay_for(dur).await;
+ time::sleep(dur).await;
});
assert!(now.elapsed() >= dur);
}
#[test]
- fn delay_in_spawn() {
- let mut rt = rt();
+ fn sleep_in_spawn() {
+ let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
@@ -480,7 +451,7 @@ rt_test! {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
- time::delay_for(dur).await;
+ time::sleep(dur).await;
assert_ok!(tx.send(()));
});
@@ -492,12 +463,12 @@ rt_test! {
#[test]
fn block_on_socket() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
- let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
@@ -512,7 +483,7 @@ rt_test! {
#[test]
fn spawn_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@@ -527,7 +498,7 @@ rt_test! {
#[test]
fn spawn_blocking_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
@@ -541,8 +512,8 @@ rt_test! {
}
#[test]
- fn delay_from_blocking() {
- let mut rt = rt();
+ fn sleep_from_blocking() {
+ let rt = rt();
rt.block_on(async move {
assert_ok!(tokio::task::spawn_blocking(|| {
@@ -552,7 +523,7 @@ rt_test! {
// use the futures' block_on fn to make sure we aren't setting
// any Tokio context
futures::executor::block_on(async {
- tokio::time::delay_for(dur).await;
+ tokio::time::sleep(dur).await;
});
assert!(now.elapsed() >= dur);
@@ -562,10 +533,10 @@ rt_test! {
#[test]
fn socket_from_blocking() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async move {
- let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(listener.local_addr());
let peer = tokio::task::spawn_blocking(move || {
@@ -584,20 +555,35 @@ rt_test! {
}
#[test]
- fn spawn_blocking_after_shutdown() {
+ fn always_active_parker() {
+ // This test it to show that we will always have
+ // an active parker even if we call block_on concurrently
+
let rt = rt();
- let handle = rt.handle().clone();
+ let rt2 = rt.clone();
- // Shutdown
- drop(rt);
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
- handle.enter(|| {
- let res = task::spawn_blocking(|| unreachable!());
+ let jh1 = thread::spawn(move || {
+ rt.block_on(async move {
+ rx2.await.unwrap();
+ time::sleep(Duration::from_millis(5)).await;
+ tx1.send(()).unwrap();
+ });
+ });
- // Avoid using a tokio runtime
- let out = futures::executor::block_on(res);
- assert!(out.is_err());
+ let jh2 = thread::spawn(move || {
+ rt2.block_on(async move {
+ tx2.send(()).unwrap();
+ time::sleep(Duration::from_millis(5)).await;
+ rx1.await.unwrap();
+ time::sleep(Duration::from_millis(5)).await;
+ });
});
+
+ jh1.join().unwrap();
+ jh2.join().unwrap();
}
#[test]
@@ -615,7 +601,7 @@ rt_test! {
// test is disabled.
#[cfg(not(windows))]
fn io_driver_called_when_under_load() {
- let mut rt = rt();
+ let rt = rt();
// Create a lot of constant load. The scheduler will always be busy.
for _ in 0..100 {
@@ -628,7 +614,7 @@ rt_test! {
// Do some I/O work
rt.block_on(async {
- let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(listener.local_addr());
let srv = tokio::spawn(async move {
@@ -651,7 +637,7 @@ rt_test! {
#[test]
fn client_server_block_on() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
@@ -662,7 +648,7 @@ rt_test! {
#[test]
fn panic_in_task() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = oneshot::channel();
struct Boom(Option<oneshot::Sender<()>>);
@@ -689,7 +675,7 @@ rt_test! {
#[test]
#[should_panic]
fn panic_in_block_on() {
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async { panic!() });
}
@@ -709,10 +695,11 @@ rt_test! {
#[test]
fn enter_and_spawn() {
- let mut rt = rt();
- let handle = rt.enter(|| {
+ let rt = rt();
+ let handle = {
+ let _enter = rt.enter();
tokio::spawn(async {})
- });
+ };
assert_ok!(rt.block_on(handle));
}
@@ -739,7 +726,7 @@ rt_test! {
}
}
- let mut rt = rt();
+ let rt = rt();
let (drop_tx, drop_rx) = mpsc::channel();
let (run_tx, run_rx) = oneshot::channel();
@@ -775,17 +762,17 @@ rt_test! {
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
- let mut rt = rt();
+ let rt = rt();
- let h1 = rt.handle().clone();
+ let h1 = rt.clone();
- rt.handle().spawn(async move {
+ rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
let _ = rx1.await;
tx3.send(()).unwrap();
});
- rt.handle().spawn(async move {
+ rt.spawn(async move {
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
@@ -802,7 +789,7 @@ rt_test! {
let _ = rx2.await;
});
- rt.handle().spawn(async move {
+ rt.spawn(async move {
let _ = rx3.await;
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
@@ -821,14 +808,16 @@ rt_test! {
#[test]
fn io_notify_while_shutting_down() {
use std::net::Ipv6Addr;
+ use std::sync::Arc;
for _ in 1..10 {
- let mut runtime = rt();
+ let runtime = rt();
runtime.block_on(async {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
let addr = socket.local_addr().unwrap();
- let (mut recv_half, mut send_half) = socket.split();
+ let send_half = Arc::new(socket);
+ let recv_half = send_half.clone();
tokio::spawn(async move {
let mut buf = [0];
@@ -842,11 +831,11 @@ rt_test! {
let buf = [0];
loop {
send_half.send_to(&buf, &addr).await.unwrap();
- tokio::time::delay_for(Duration::from_millis(1)).await;
+ tokio::time::sleep(Duration::from_millis(1)).await;
}
});
- tokio::time::delay_for(Duration::from_millis(5)).await;
+ tokio::time::sleep(Duration::from_millis(5)).await;
});
}
}
@@ -854,7 +843,7 @@ rt_test! {
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
- let mut runtime = rt();
+ let runtime = rt();
runtime.block_on(async move {
task::spawn_blocking(move || {
@@ -865,10 +854,25 @@ rt_test! {
rx.await.unwrap();
});
- runtime.shutdown_timeout(Duration::from_millis(100));
+ Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
+ }
+
+ #[test]
+ fn shutdown_wakeup_time() {
+ let runtime = rt();
+
+ runtime.block_on(async move {
+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+ });
+
+ Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
}
+ // This test is currently ignored on Windows because of a
+ // rust-lang issue in thread local storage destructors.
+ // See https://github.com/rust-lang/rust/issues/74875
#[test]
+ #[cfg(not(windows))]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;
@@ -879,7 +883,9 @@ rt_test! {
thread::spawn(|| {
R.with(|cell| {
- *cell.borrow_mut() = Some(rt());
+ let rt = rt();
+ let rt = Arc::try_unwrap(rt).unwrap();
+ *cell.borrow_mut() = Some(rt);
});
let _rt = rt();
@@ -887,7 +893,7 @@ rt_test! {
}
async fn client_server(tx: mpsc::Sender<()>) {
- let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
@@ -912,13 +918,13 @@ rt_test! {
#[test]
fn local_set_block_on_socket() {
- let mut rt = rt();
+ let rt = rt();
let local = task::LocalSet::new();
- local.block_on(&mut rt, async move {
+ local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
- let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
task::spawn_local(async move {
@@ -933,19 +939,19 @@ rt_test! {
#[test]
fn local_set_client_server_block_on() {
- let mut rt = rt();
+ let rt = rt();
let (tx, rx) = mpsc::channel();
let local = task::LocalSet::new();
- local.block_on(&mut rt, async move { client_server_local(tx).await });
+ local.block_on(&rt, async move { client_server_local(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
async fn client_server_local(tx: mpsc::Sender<()>) {
- let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
@@ -972,7 +978,7 @@ rt_test! {
fn coop() {
use std::task::Poll::Ready;
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
// Create a bunch of tasks
@@ -981,7 +987,7 @@ rt_test! {
}).collect::<Vec<_>>();
// Hope that all the tasks complete...
- time::delay_for(Duration::from_millis(100)).await;
+ time::sleep(Duration::from_millis(100)).await;
poll_fn(|cx| {
// At least one task should not be ready
@@ -1004,7 +1010,7 @@ rt_test! {
const NUM: usize = 100;
- let mut rt = rt();
+ let rt = rt();
rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();