aboutsummaryrefslogtreecommitdiff
path: root/tests/rt_common.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rt_common.rs')
-rw-r--r--tests/rt_common.rs187
1 files changed, 138 insertions, 49 deletions
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 3892998..abca8dd 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -21,7 +21,7 @@ macro_rules! rt_test {
}
}
- #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
+ #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
mod threaded_scheduler_4_threads {
$($t)*
@@ -37,7 +37,7 @@ macro_rules! rt_test {
}
}
- #[cfg(not(tokio_wasi))] // Wasi doesn't support threads
+ #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
mod threaded_scheduler_1_thread {
$($t)*
@@ -52,6 +52,40 @@ macro_rules! rt_test {
.into()
}
}
+
+ #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
+ #[cfg(tokio_unstable)]
+ mod alt_threaded_scheduler_4_threads {
+ $($t)*
+
+ const NUM_WORKERS: usize = 4;
+
+ fn rt() -> Arc<Runtime> {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(4)
+ .enable_all()
+ .build()
+ .unwrap()
+ .into()
+ }
+ }
+
+ #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
+ #[cfg(tokio_unstable)]
+ mod alt_threaded_scheduler_1_thread {
+ $($t)*
+
+ const NUM_WORKERS: usize = 1;
+
+ fn rt() -> Arc<Runtime> {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .enable_all()
+ .build()
+ .unwrap()
+ .into()
+ }
+ }
}
}
@@ -695,12 +729,34 @@ rt_test! {
/// Tests that yielded tasks are not scheduled until **after** resource
/// drivers are polled.
///
- /// Note: we may have to delete this test as it is not necessarily reliable.
/// The OS does not guarantee when I/O events are delivered, so there may be
- /// more yields than anticipated.
+ /// more yields than anticipated. This makes the test slightly flaky. To
+ /// help avoid flakiness, we run the test 10 times and only fail it after
+ /// 10 failures in a row.
+ ///
+ /// Note that if the test fails by panicking rather than by returning false,
+ /// then we fail it immediately. That kind of failure should not happen
+ /// spuriously.
#[test]
#[cfg(not(target_os="wasi"))]
fn yield_defers_until_park() {
+ for _ in 0..10 {
+ if yield_defers_until_park_inner() {
+ // test passed
+ return;
+ }
+
+ // Wait a bit and run the test again.
+ std::thread::sleep(std::time::Duration::from_secs(2));
+ }
+
+ panic!("yield_defers_until_park is failing consistently");
+ }
+
+ /// Implementation of `yield_defers_until_park` test. Returns `true` if the
+ /// test passed.
+ #[cfg(not(target_os="wasi"))]
+ fn yield_defers_until_park_inner() -> bool {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Barrier;
@@ -727,14 +783,16 @@ rt_test! {
barrier.wait();
- tokio::spawn(async move {
+ let (fail_test, fail_test_recv) = oneshot::channel::<()>();
+
+ let jh = tokio::spawn(async move {
// Create a TCP litener
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::join!(
async {
- // Done blocking intentionally
+ // Done in a blocking manner intentionally.
let _socket = std::net::TcpStream::connect(addr).unwrap();
// Yield until connected
@@ -744,7 +802,12 @@ rt_test! {
cnt += 1;
if cnt >= 10 {
- panic!("yielded too many times; TODO: delete this test?");
+ // yielded too many times; report failure and
+ // sleep forever so that the `fail_test` branch
+ // of the `select!` below triggers.
+ let _ = fail_test.send(());
+ futures::future::pending::<()>().await;
+ break;
}
}
},
@@ -753,8 +816,20 @@ rt_test! {
flag.store(true, SeqCst);
}
);
- }).await.unwrap();
- });
+ });
+
+ // Wait until the spawned task completes or fails. If no message is
+ // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
+ let success = fail_test_recv.await.is_err();
+
+ if success {
+ // Check for panics in spawned task.
+ jh.abort();
+ jh.await.unwrap();
+ }
+
+ success
+ })
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
@@ -769,9 +844,9 @@ rt_test! {
assert_err!(rx.try_recv());
}
- #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")]
+ #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
+ #[cfg(panic = "unwind")]
#[test]
- #[cfg(not(target_os = "android"))]
fn panic_in_task() {
let rt = rt();
let (tx, rx) = oneshot::channel();
@@ -799,7 +874,7 @@ rt_test! {
#[test]
#[should_panic]
- #[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
+ #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
fn panic_in_block_on() {
let rt = rt();
rt.block_on(async { panic!() });
@@ -876,10 +951,6 @@ rt_test! {
#[test]
fn wake_while_rt_is_dropping() {
use tokio::sync::Barrier;
- use core::sync::atomic::{AtomicBool, Ordering};
-
- let drop_triggered = Arc::new(AtomicBool::new(false));
- let set_drop_triggered = drop_triggered.clone();
struct OnDrop<F: FnMut()>(F);
@@ -891,56 +962,39 @@ rt_test! {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
- let (tx3, rx3) = oneshot::channel();
- let barrier = Arc::new(Barrier::new(4));
+ let barrier = Arc::new(Barrier::new(3));
let barrier1 = barrier.clone();
let barrier2 = barrier.clone();
- let barrier3 = barrier.clone();
let rt = rt();
rt.spawn(async move {
+ let mut tx2 = Some(tx2);
+ let _d = OnDrop(move || {
+ let _ = tx2.take().unwrap().send(());
+ });
+
// Ensure a waker gets stored in oneshot 1.
let _ = tokio::join!(rx1, barrier1.wait());
- tx3.send(()).unwrap();
});
rt.spawn(async move {
- let h1 = tokio::runtime::Handle::current();
- // When this task is dropped, we'll be "closing remotes".
- // We spawn a new task that owns the `tx1`, to move its Drop
- // out of here.
- //
- // Importantly, the oneshot 1 has a waker already stored, so
- // the eventual drop here will try to re-schedule again.
- let mut opt_tx1 = Some(tx1);
+ let mut tx1 = Some(tx1);
let _d = OnDrop(move || {
- let tx1 = opt_tx1.take().unwrap();
- h1.spawn(async move {
- tx1.send(()).unwrap();
- });
- // Just a sanity check that this entire thing actually happened
- set_drop_triggered.store(true, Ordering::Relaxed);
+ let _ = tx1.take().unwrap().send(());
});
- let _ = tokio::join!(rx2, barrier2.wait());
- });
- rt.spawn(async move {
- let _ = tokio::join!(rx3, barrier3.wait());
- // We'll never get here, but once task 3 drops, this will
- // force task 2 to re-schedule since it's waiting on oneshot 2.
- tx2.send(()).unwrap();
+ // Ensure a waker gets stored in oneshot 2.
+ let _ = tokio::join!(rx2, barrier2.wait());
});
// Wait until every oneshot channel has been polled.
rt.block_on(barrier.wait());
- // Drop the rt
+ // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
+ // other task.
drop(rt);
-
- // Make sure that the spawn actually happened
- assert!(drop_triggered.load(Ordering::Relaxed));
}
#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
@@ -1029,7 +1083,7 @@ rt_test! {
// See https://github.com/rust-lang/rust/issues/74875
#[test]
#[cfg(not(windows))]
- #[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")]
+ #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;
@@ -1074,7 +1128,7 @@ rt_test! {
tx.send(()).unwrap();
}
- #[cfg(not(tokio_wasi))] // Wasi does not support bind
+ #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
#[test]
fn local_set_block_on_socket() {
let rt = rt();
@@ -1096,7 +1150,7 @@ rt_test! {
});
}
- #[cfg(not(tokio_wasi))] // Wasi does not support bind
+ #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
#[test]
fn local_set_client_server_block_on() {
let rt = rt();
@@ -1110,7 +1164,7 @@ rt_test! {
assert_err!(rx.try_recv());
}
- #[cfg(not(tokio_wasi))] // Wasi does not support bind
+ #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
async fn client_server_local(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
@@ -1277,4 +1331,39 @@ rt_test! {
}
});
}
+
+ #[test]
+ #[cfg(not(target_os="wasi"))]
+ fn shutdown_concurrent_spawn() {
+ const NUM_TASKS: usize = 10_000;
+ for _ in 0..5 {
+ let (tx, rx) = std::sync::mpsc::channel();
+ let rt = rt();
+
+ let mut txs = vec![];
+
+ for _ in 0..NUM_TASKS {
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ txs.push(tx);
+ rt.spawn(async move {
+ rx.await.unwrap();
+ });
+ }
+
+ // Prime the tasks
+ rt.block_on(async { tokio::task::yield_now().await });
+
+ let th = std::thread::spawn(move || {
+ tx.send(()).unwrap();
+ for tx in txs.drain(..) {
+ let _ = tx.send(());
+ }
+ });
+
+ rx.recv().unwrap();
+ drop(rt);
+
+ th.join().unwrap();
+ }
+ }
}