aboutsummaryrefslogtreecommitdiff
path: root/tests/rt_common.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rt_common.rs')
-rw-r--r--tests/rt_common.rs258
1 files changed, 214 insertions, 44 deletions
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index e5fc7a9..3892998 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -2,13 +2,16 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-// Tests to run on both current-thread & thread-pool runtime variants.
+// Tests to run on both current-thread & multi-thread runtime variants.
macro_rules! rt_test {
($($t:tt)*) => {
mod current_thread_scheduler {
$($t)*
+ #[cfg(not(target_os="wasi"))]
+ const NUM_WORKERS: usize = 1;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -18,9 +21,12 @@ macro_rules! rt_test {
}
}
+ #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_4_threads {
$($t)*
+ const NUM_WORKERS: usize = 4;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
@@ -31,9 +37,12 @@ macro_rules! rt_test {
}
}
+ #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_1_thread {
$($t)*
+ const NUM_WORKERS: usize = 1;
+
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
@@ -55,18 +64,30 @@ fn send_sync_bound() {
}
rt_test! {
- use tokio::net::{TcpListener, TcpStream, UdpSocket};
+ #[cfg(not(target_os="wasi"))]
+ use tokio::net::{TcpListener, TcpStream};
+ #[cfg(not(target_os="wasi"))]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
+
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::{task, time};
- use tokio_test::{assert_err, assert_ok};
+
+ #[cfg(not(target_os="wasi"))]
+ use tokio_test::assert_err;
+ use tokio_test::assert_ok;
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
- use std::sync::{mpsc, Arc};
+
+ #[cfg(not(target_os="wasi"))]
+ use std::sync::mpsc;
+
+ use std::sync::Arc;
use std::task::{Context, Poll};
+
+ #[cfg(not(target_os="wasi"))]
use std::thread;
use std::time::{Duration, Instant};
@@ -83,6 +104,7 @@ rt_test! {
}
+ #[cfg(not(target_os="wasi"))]
#[test]
fn block_on_async() {
let rt = rt();
@@ -164,6 +186,7 @@ rt_test! {
assert_eq!(out, "ZOMG");
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_block_on() {
use tokio::sync::mpsc;
@@ -214,6 +237,7 @@ rt_test! {
}
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_task() {
use tokio::sync::mpsc;
@@ -226,14 +250,6 @@ rt_test! {
tokio::spawn(async move {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
- /*
- for _ in 0..100 {
- tokio::spawn(async move { });
- }
-
- tokio::task::yield_now().await;
- */
-
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
@@ -275,6 +291,31 @@ rt_test! {
}
#[test]
+ fn spawn_one_from_block_on_called_on_handle() {
+ let rt = rt();
+ let (tx, rx) = oneshot::channel();
+
+ #[allow(clippy::async_yields_async)]
+ let handle = rt.handle().block_on(async {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ "DONE"
+ })
+ });
+
+ let out = rt.block_on(async {
+ let msg = assert_ok!(rx.await);
+
+ let out = assert_ok!(handle.await);
+ assert_eq!(out, "DONE");
+
+ msg
+ });
+
+ assert_eq!(out, "ZOMG");
+ }
+
+ #[test]
fn spawn_await_chain() {
let rt = rt();
@@ -329,6 +370,7 @@ rt_test! {
assert_eq!(out, "ZOMG");
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_block_on_under_load() {
let rt = rt();
@@ -352,6 +394,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_task_under_load() {
let rt = rt();
@@ -381,6 +424,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_idle() {
let rt = rt();
@@ -401,6 +445,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_under_load() {
let rt = rt();
@@ -461,6 +506,7 @@ rt_test! {
assert!(now.elapsed() >= dur);
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn block_on_socket() {
let rt = rt();
@@ -481,6 +527,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_blocking() {
let rt = rt();
@@ -496,6 +543,7 @@ rt_test! {
assert_eq!(out, "hello")
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_blocking_from_blocking() {
let rt = rt();
@@ -511,6 +559,7 @@ rt_test! {
assert_eq!(out, "hello")
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn sleep_from_blocking() {
let rt = rt();
@@ -531,6 +580,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn socket_from_blocking() {
let rt = rt();
@@ -554,6 +604,7 @@ rt_test! {
});
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn always_active_parker() {
// This test it to show that we will always have
@@ -600,6 +651,7 @@ rt_test! {
// concern. There also isn't a great/obvious solution to take. For now, the
// test is disabled.
#[cfg(not(windows))]
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
fn io_driver_called_when_under_load() {
let rt = rt();
@@ -607,7 +659,12 @@ rt_test! {
for _ in 0..100 {
rt.spawn(async {
loop {
- tokio::task::yield_now().await;
+ // Don't use Tokio's `yield_now()` to avoid special defer
+ // logic.
+ futures::future::poll_fn::<(), _>(|cx| {
+ cx.waker().wake_by_ref();
+ std::task::Poll::Pending
+ }).await;
}
});
}
@@ -635,6 +692,72 @@ rt_test! {
});
}
+ /// Tests that yielded tasks are not scheduled until **after** resource
+ /// drivers are polled.
+ ///
+ /// Note: we may have to delete this test as it is not necessarily reliable.
+ /// The OS does not guarantee when I/O events are delivered, so there may be
+ /// more yields than anticipated.
+ #[test]
+ #[cfg(not(target_os="wasi"))]
+ fn yield_defers_until_park() {
+ use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
+ use std::sync::Barrier;
+
+ let rt = rt();
+
+ let flag = Arc::new(AtomicBool::new(false));
+ let barrier = Arc::new(Barrier::new(NUM_WORKERS));
+
+ rt.block_on(async {
+ // Make sure other workers cannot steal tasks
+ #[allow(clippy::reversed_empty_ranges)]
+ for _ in 0..(NUM_WORKERS-1) {
+ let flag = flag.clone();
+ let barrier = barrier.clone();
+
+ tokio::spawn(async move {
+ barrier.wait();
+
+ while !flag.load(SeqCst) {
+ std::thread::sleep(std::time::Duration::from_millis(1));
+ }
+ });
+ }
+
+ barrier.wait();
+
+ tokio::spawn(async move {
+ // Create a TCP litener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ tokio::join!(
+ async {
+ // Done blocking intentionally
+ let _socket = std::net::TcpStream::connect(addr).unwrap();
+
+ // Yield until connected
+ let mut cnt = 0;
+ while !flag.load(SeqCst){
+ tokio::task::yield_now().await;
+ cnt += 1;
+
+ if cnt >= 10 {
+ panic!("yielded too many times; TODO: delete this test?");
+ }
+ }
+ },
+ async {
+ let _ = listener.accept().await.unwrap();
+ flag.store(true, SeqCst);
+ }
+ );
+ }).await.unwrap();
+ });
+ }
+
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn client_server_block_on() {
let rt = rt();
@@ -646,6 +769,7 @@ rt_test! {
assert_err!(rx.try_recv());
}
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")]
#[test]
#[cfg(not(target_os = "android"))]
fn panic_in_task() {
@@ -675,11 +799,13 @@ rt_test! {
#[test]
#[should_panic]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
fn panic_in_block_on() {
let rt = rt();
rt.block_on(async { panic!() });
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
@@ -749,7 +875,11 @@ rt_test! {
#[test]
fn wake_while_rt_is_dropping() {
- use tokio::task;
+ use tokio::sync::Barrier;
+ use core::sync::atomic::{AtomicBool, Ordering};
+
+ let drop_triggered = Arc::new(AtomicBool::new(false));
+ let set_drop_triggered = drop_triggered.clone();
struct OnDrop<F: FnMut()>(F);
@@ -763,17 +893,21 @@ rt_test! {
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
- let rt = rt();
+ let barrier = Arc::new(Barrier::new(4));
+ let barrier1 = barrier.clone();
+ let barrier2 = barrier.clone();
+ let barrier3 = barrier.clone();
- let h1 = rt.clone();
+ let rt = rt();
rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
- let _ = rx1.await;
+ let _ = tokio::join!(rx1, barrier1.wait());
tx3.send(()).unwrap();
});
rt.spawn(async move {
+ let h1 = tokio::runtime::Handle::current();
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
@@ -786,36 +920,40 @@ rt_test! {
h1.spawn(async move {
tx1.send(()).unwrap();
});
+ // Just a sanity check that this entire thing actually happened
+ set_drop_triggered.store(true, Ordering::Relaxed);
});
- let _ = rx2.await;
+ let _ = tokio::join!(rx2, barrier2.wait());
});
rt.spawn(async move {
- let _ = rx3.await;
+ let _ = tokio::join!(rx3, barrier3.wait());
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
tx2.send(()).unwrap();
});
- // Tick the loop
- rt.block_on(async {
- task::yield_now().await;
- });
+ // Wait until every oneshot channel has been polled.
+ rt.block_on(barrier.wait());
// Drop the rt
drop(rt);
+
+ // Make sure that the spawn actually happened
+ assert!(drop_triggered.load(Ordering::Relaxed));
}
+ #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
#[test]
fn io_notify_while_shutting_down() {
- use std::net::Ipv6Addr;
+ use tokio::net::UdpSocket;
use std::sync::Arc;
for _ in 1..10 {
let runtime = rt();
runtime.block_on(async {
- let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
+ let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = socket.local_addr().unwrap();
let send_half = Arc::new(socket);
let recv_half = send_half.clone();
@@ -841,6 +979,7 @@ rt_test! {
}
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
@@ -858,6 +997,7 @@ rt_test! {
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout_0() {
let runtime = rt();
@@ -889,6 +1029,7 @@ rt_test! {
// See https://github.com/rust-lang/rust/issues/74875
#[test]
#[cfg(not(windows))]
+ #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;
@@ -908,6 +1049,7 @@ rt_test! {
}).join().unwrap();
}
+ #[cfg(not(target_os="wasi"))] // Wasi does not support bind
async fn client_server(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -932,6 +1074,7 @@ rt_test! {
tx.send(()).unwrap();
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_block_on_socket() {
let rt = rt();
@@ -953,6 +1096,7 @@ rt_test! {
});
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_client_server_block_on() {
let rt = rt();
@@ -966,6 +1110,7 @@ rt_test! {
assert_err!(rx.try_recv());
}
+ #[cfg(not(tokio_wasi))] // Wasi does not support bind
async fn client_server_local(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -993,22 +1138,22 @@ rt_test! {
#[test]
fn coop() {
use std::task::Poll::Ready;
+ use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
- // Create a bunch of tasks
- let mut tasks = (0..1_000).map(|_| {
- tokio::spawn(async { })
- }).collect::<Vec<_>>();
+ let (send, mut recv) = mpsc::unbounded_channel();
- // Hope that all the tasks complete...
- time::sleep(Duration::from_millis(100)).await;
+ // Send a bunch of messages.
+ for _ in 0..1_000 {
+ send.send(()).unwrap();
+ }
poll_fn(|cx| {
- // At least one task should not be ready
- for task in &mut tasks {
- if Pin::new(task).poll(cx).is_pending() {
+ // At least one response should return pending.
+ for _ in 0..1_000 {
+ if recv.poll_recv(cx).is_pending() {
return Ready(());
}
}
@@ -1021,22 +1166,22 @@ rt_test! {
#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;
+ use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
- // Create a bunch of tasks
- let mut tasks = (0..1_000).map(|_| {
- tokio::spawn(async { })
- }).collect::<Vec<_>>();
+ let (send, mut recv) = mpsc::unbounded_channel();
- // Hope that all the tasks complete...
- time::sleep(Duration::from_millis(100)).await;
+ // Send a bunch of messages.
+ for _ in 0..1_000 {
+ send.send(()).unwrap();
+ }
tokio::task::unconstrained(poll_fn(|cx| {
- // All the tasks should be ready
- for task in &mut tasks {
- assert!(Pin::new(task).poll(cx).is_ready());
+ // All the responses should be ready.
+ for _ in 0..1_000 {
+ assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
}
Ready(())
@@ -1044,6 +1189,31 @@ rt_test! {
});
}
+ #[cfg(tokio_unstable)]
+ #[test]
+ fn coop_consume_budget() {
+ let rt = rt();
+
+ rt.block_on(async {
+ poll_fn(|cx| {
+ let counter = Arc::new(std::sync::Mutex::new(0));
+ let counter_clone = Arc::clone(&counter);
+ let mut worker = Box::pin(async move {
+ // Consume the budget until a yield happens
+ for _ in 0..1000 {
+ *counter.lock().unwrap() += 1;
+ task::consume_budget().await
+ }
+ });
+ // Assert that the worker was yielded and it didn't manage
+ // to finish the whole work (assuming the total budget of 128)
+ assert!(Pin::new(&mut worker).poll(cx).is_pending());
+ assert!(*counter_clone.lock().unwrap() < 1000);
+ std::task::Poll::Ready(())
+ }).await;
+ });
+ }
+
// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
@@ -1061,7 +1231,7 @@ rt_test! {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
let mut tasks = vec![];
- // Spawn a bunch of tasks that ping ping between each other to
+ // Spawn a bunch of tasks that ping-pong between each other to
// saturate the runtime.
for _ in 0..NUM {
let (tx1, mut rx1) = mpsc::unbounded_channel();