aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2021-04-21 15:58:31 +0200
committerJeff Vander Stoep <jeffv@google.com>2021-04-21 15:59:26 +0200
commit28f5548b7bc0734d62d4624bb87b2bc60c78d49a (patch)
treec39809fba43deb9ee717b07b2afaeb71ef36ca59 /tests
parent714f4488dbef671e82a443b31cc99ea7216dcebd (diff)
downloadtokio-28f5548b7bc0734d62d4624bb87b2bc60c78d49a.tar.gz
Update to 1.50.0
Test: atest Change-Id: I94e6acadea178b0b957fbf853a590f155d1bd973
Diffstat (limited to 'tests')
-rw-r--r--tests/async_send_sync.rs25
-rw-r--r--tests/io_copy_bidirectional.rs128
-rw-r--r--tests/macros_select.rs59
-rw-r--r--tests/rt_common.rs26
-rw-r--r--tests/rt_handle_block_on.rs511
-rw-r--r--tests/sync_notify.rs17
-rw-r--r--tests/sync_once_cell.rs268
-rw-r--r--tests/sync_oneshot.rs22
-rw-r--r--tests/sync_rwlock.rs2
-rw-r--r--tests/sync_semaphore_owned.rs31
-rw-r--r--tests/task_abort.rs67
-rw-r--r--tests/time_timeout.rs27
12 files changed, 1182 insertions, 1 deletions
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index 671fa4a..211c572 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -1,9 +1,12 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+#![allow(clippy::type_complexity)]
use std::cell::Cell;
+use std::future::Future;
use std::io::{Cursor, SeekFrom};
use std::net::SocketAddr;
+use std::pin::Pin;
use std::rc::Rc;
use tokio::net::TcpStream;
use tokio::time::{Duration, Instant};
@@ -265,6 +268,28 @@ async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync);
async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync);
async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = u8> + Send + Sync>>): Send & Sync);
+async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = u8>>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): !Send & !Sync);
+async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
+ _, fn() -> Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
+assert_value!(tokio::sync::OnceCell<u8>: Send & Sync);
+assert_value!(tokio::sync::OnceCell<Cell<u8>>: Send & !Sync);
+assert_value!(tokio::sync::OnceCell<Rc<u8>>: !Send & !Sync);
+
async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync);
async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync);
async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync);
diff --git a/tests/io_copy_bidirectional.rs b/tests/io_copy_bidirectional.rs
new file mode 100644
index 0000000..17c0597
--- /dev/null
+++ b/tests/io_copy_bidirectional.rs
@@ -0,0 +1,128 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use std::time::Duration;
+use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt};
+use tokio::net::TcpStream;
+use tokio::task::JoinHandle;
+
+async fn make_socketpair() -> (TcpStream, TcpStream) {
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let connector = TcpStream::connect(addr);
+ let acceptor = listener.accept();
+
+ let (c1, c2) = tokio::join!(connector, acceptor);
+
+ (c1.unwrap(), c2.unwrap().0)
+}
+
+async fn block_write(s: &mut TcpStream) -> usize {
+ static BUF: [u8; 2048] = [0; 2048];
+
+ let mut copied = 0;
+ loop {
+ tokio::select! {
+ result = s.write(&BUF) => {
+ copied += result.expect("write error")
+ },
+ _ = tokio::time::sleep(Duration::from_millis(100)) => {
+ break;
+ }
+ }
+ }
+
+ copied
+}
+
+async fn symmetric<F, Fut>(mut cb: F)
+where
+ F: FnMut(JoinHandle<io::Result<(u64, u64)>>, TcpStream, TcpStream) -> Fut,
+ Fut: std::future::Future<Output = ()>,
+{
+ // We run the test twice, with streams passed to copy_bidirectional in
+ // different orders, in order to ensure that the two arguments are
+ // interchangable.
+
+ let (a, mut a1) = make_socketpair().await;
+ let (b, mut b1) = make_socketpair().await;
+
+ let handle = tokio::spawn(async move { copy_bidirectional(&mut a1, &mut b1).await });
+ cb(handle, a, b).await;
+
+ let (a, mut a1) = make_socketpair().await;
+ let (b, mut b1) = make_socketpair().await;
+
+ let handle = tokio::spawn(async move { copy_bidirectional(&mut b1, &mut a1).await });
+
+ cb(handle, b, a).await;
+}
+
+#[tokio::test]
+async fn test_basic_transfer() {
+ symmetric(|_handle, mut a, mut b| async move {
+ a.write_all(b"test").await.unwrap();
+ let mut tmp = [0; 4];
+ b.read_exact(&mut tmp).await.unwrap();
+ assert_eq!(&tmp[..], b"test");
+ })
+ .await
+}
+
+#[tokio::test]
+async fn test_transfer_after_close() {
+ symmetric(|handle, mut a, mut b| async move {
+ AsyncWriteExt::shutdown(&mut a).await.unwrap();
+ b.read_to_end(&mut Vec::new()).await.unwrap();
+
+ b.write_all(b"quux").await.unwrap();
+ let mut tmp = [0; 4];
+ a.read_exact(&mut tmp).await.unwrap();
+ assert_eq!(&tmp[..], b"quux");
+
+ // Once both are closed, we should have our handle back
+ drop(b);
+
+ assert_eq!(handle.await.unwrap().unwrap(), (0, 4));
+ })
+ .await
+}
+
+#[tokio::test]
+async fn blocking_one_side_does_not_block_other() {
+ symmetric(|handle, mut a, mut b| async move {
+ block_write(&mut a).await;
+
+ b.write_all(b"quux").await.unwrap();
+ let mut tmp = [0; 4];
+ a.read_exact(&mut tmp).await.unwrap();
+ assert_eq!(&tmp[..], b"quux");
+
+ AsyncWriteExt::shutdown(&mut a).await.unwrap();
+
+ let mut buf = Vec::new();
+ b.read_to_end(&mut buf).await.unwrap();
+
+ drop(b);
+
+ assert_eq!(handle.await.unwrap().unwrap(), (buf.len() as u64, 4));
+ })
+ .await
+}
+
+#[tokio::test]
+async fn immediate_exit_on_error() {
+ symmetric(|handle, mut a, mut b| async move {
+ block_write(&mut a).await;
+
+ // Fill up the b->copy->a path. We expect that this will _not_ drain
+ // before we exit the copy task.
+ let _bytes_written = block_write(&mut b).await;
+
+ // Drop b. We should not wait for a to consume the data buffered in the
+ // copy loop, since b will be failing writes.
+ drop(b);
+ assert!(handle.await.unwrap().is_err());
+ })
+ .await
+}
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index 3359849..ea06d51 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -481,3 +481,62 @@ async fn mut_on_left_hand_side() {
.await;
assert_eq!(v, 2);
}
+
+#[tokio::test]
+async fn biased_one_not_ready() {
+ let (_tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = oneshot::channel::<i32>();
+ let (tx3, rx3) = oneshot::channel::<i32>();
+
+ tx2.send(2).unwrap();
+ tx3.send(3).unwrap();
+
+ let v = tokio::select! {
+ biased;
+
+ _ = rx1 => unreachable!(),
+ res = rx2 => {
+ assert_ok!(res)
+ },
+ _ = rx3 => {
+ panic!("This branch should never be activated because `rx2` should be polled before `rx3` due to `biased;`.")
+ }
+ };
+
+ assert_eq!(2, v);
+}
+
+#[tokio::test]
+async fn biased_eventually_ready() {
+ use tokio::task::yield_now;
+
+ let one = async {};
+ let two = async { yield_now().await };
+ let three = async { yield_now().await };
+
+ let mut count = 0u8;
+
+ tokio::pin!(one, two, three);
+
+ loop {
+ tokio::select! {
+ biased;
+
+ _ = &mut two, if count < 2 => {
+ count += 1;
+ assert_eq!(count, 2);
+ }
+ _ = &mut three, if count < 3 => {
+ count += 1;
+ assert_eq!(count, 3);
+ }
+ _ = &mut one, if count < 1 => {
+ count += 1;
+ assert_eq!(count, 1);
+ }
+ else => break,
+ }
+ }
+
+ assert_eq!(count, 3);
+}
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 9aef4b9..cb1d0f6 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -1017,6 +1017,32 @@ rt_test! {
});
}
+ #[test]
+ fn coop_unconstrained() {
+ use std::task::Poll::Ready;
+
+ let rt = rt();
+
+ rt.block_on(async {
+ // Create a bunch of tasks
+ let mut tasks = (0..1_000).map(|_| {
+ tokio::spawn(async { })
+ }).collect::<Vec<_>>();
+
+ // Hope that all the tasks complete...
+ time::sleep(Duration::from_millis(100)).await;
+
+ tokio::task::unconstrained(poll_fn(|cx| {
+ // All the tasks should be ready
+ for task in &mut tasks {
+ assert!(Pin::new(task).poll(cx).is_ready());
+ }
+
+ Ready(())
+ })).await;
+ });
+ }
+
// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
diff --git a/tests/rt_handle_block_on.rs b/tests/rt_handle_block_on.rs
new file mode 100644
index 0000000..5234258
--- /dev/null
+++ b/tests/rt_handle_block_on.rs
@@ -0,0 +1,511 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+// All io tests that deal with shutdown is currently ignored because there are known bugs in with
+// shutting down the io driver while concurrently registering new resources. See
+// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
+//
+// When this has been fixed we want to re-enable these tests.
+
+use std::time::Duration;
+use tokio::runtime::{Handle, Runtime};
+use tokio::sync::mpsc;
+use tokio::task::spawn_blocking;
+use tokio::{fs, net, time};
+
+macro_rules! multi_threaded_rt_test {
+ ($($t:tt)*) => {
+ mod threaded_scheduler_4_threads_only {
+ use super::*;
+
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(4)
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+
+ mod threaded_scheduler_1_thread_only {
+ use super::*;
+
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+ }
+}
+
+macro_rules! rt_test {
+ ($($t:tt)*) => {
+ mod current_thread_scheduler {
+ use super::*;
+
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+
+ mod threaded_scheduler_4_threads {
+ use super::*;
+
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(4)
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+
+ mod threaded_scheduler_1_thread {
+ use super::*;
+
+ $($t)*
+
+ fn rt() -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+ }
+ }
+}
+
+// ==== runtime independent futures ======
+
+#[test]
+fn basic() {
+ test_with_runtimes(|| {
+ let one = Handle::current().block_on(async { 1 });
+ assert_eq!(1, one);
+ });
+}
+
+#[test]
+fn bounded_mpsc_channel() {
+ test_with_runtimes(|| {
+ let (tx, mut rx) = mpsc::channel(1024);
+
+ Handle::current().block_on(tx.send(42)).unwrap();
+
+ let value = Handle::current().block_on(rx.recv()).unwrap();
+ assert_eq!(value, 42);
+ });
+}
+
+#[test]
+fn unbounded_mpsc_channel() {
+ test_with_runtimes(|| {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ let _ = tx.send(42);
+
+ let value = Handle::current().block_on(rx.recv()).unwrap();
+ assert_eq!(value, 42);
+ })
+}
+
+rt_test! {
+ // ==== spawn blocking futures ======
+
+ #[test]
+ fn basic_fs() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let contents = Handle::current()
+ .block_on(fs::read_to_string("Cargo.toml"))
+ .unwrap();
+ assert!(contents.contains("Cargo.toml"));
+ }
+
+ #[test]
+ fn fs_shutdown_before_started() {
+ let rt = rt();
+ let _enter = rt.enter();
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err: std::io::Error = Handle::current()
+ .block_on(fs::read_to_string("Cargo.toml"))
+ .unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+
+ let inner_err = err.get_ref().expect("no inner error");
+ assert_eq!(inner_err.to_string(), "background task failed");
+ }
+
+ #[test]
+ fn basic_spawn_blocking() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let answer = Handle::current()
+ .block_on(spawn_blocking(|| {
+ std::thread::sleep(Duration::from_millis(100));
+ 42
+ }))
+ .unwrap();
+
+ assert_eq!(answer, 42);
+ }
+
+ #[test]
+ fn spawn_blocking_after_shutdown_fails() {
+ let rt = rt();
+ let _enter = rt.enter();
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let join_err = Handle::current()
+ .block_on(spawn_blocking(|| {
+ std::thread::sleep(Duration::from_millis(100));
+ 42
+ }))
+ .unwrap_err();
+
+ assert!(join_err.is_cancelled());
+ }
+
+ #[test]
+ fn spawn_blocking_started_before_shutdown_continues() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let handle = spawn_blocking(|| {
+ std::thread::sleep(Duration::from_secs(1));
+ 42
+ });
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let answer = Handle::current().block_on(handle).unwrap();
+
+ assert_eq!(answer, 42);
+ }
+
+ // ==== net ======
+
+ #[test]
+ fn tcp_listener_bind() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ Handle::current()
+ .block_on(net::TcpListener::bind("127.0.0.1:0"))
+ .unwrap();
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[test]
+ fn tcp_listener_connect_after_shutdown() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err = Handle::current()
+ .block_on(net::TcpListener::bind("127.0.0.1:0"))
+ .unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(
+ err.get_ref().unwrap().to_string(),
+ "A Tokio 1.x context was found, but it is being shutdown.",
+ );
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[test]
+ fn tcp_listener_connect_before_shutdown() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let bind_future = net::TcpListener::bind("127.0.0.1:0");
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err = Handle::current().block_on(bind_future).unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(
+ err.get_ref().unwrap().to_string(),
+ "A Tokio 1.x context was found, but it is being shutdown.",
+ );
+ }
+
+ #[test]
+ fn udp_socket_bind() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ Handle::current()
+ .block_on(net::UdpSocket::bind("127.0.0.1:0"))
+ .unwrap();
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[test]
+ fn udp_stream_bind_after_shutdown() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err = Handle::current()
+ .block_on(net::UdpSocket::bind("127.0.0.1:0"))
+ .unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(
+ err.get_ref().unwrap().to_string(),
+ "A Tokio 1.x context was found, but it is being shutdown.",
+ );
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[test]
+ fn udp_stream_bind_before_shutdown() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let bind_future = net::UdpSocket::bind("127.0.0.1:0");
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err = Handle::current().block_on(bind_future).unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(
+ err.get_ref().unwrap().to_string(),
+ "A Tokio 1.x context was found, but it is being shutdown.",
+ );
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[cfg(unix)]
+ #[test]
+ fn unix_listener_bind_after_shutdown() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().join("socket");
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ let err = net::UnixListener::bind(path).unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(
+ err.get_ref().unwrap().to_string(),
+ "A Tokio 1.x context was found, but it is being shutdown.",
+ );
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[cfg(unix)]
+ #[test]
+ fn unix_listener_shutdown_after_bind() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().join("socket");
+
+ let listener = net::UnixListener::bind(path).unwrap();
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ // this should not timeout but fail immediately since the runtime has been shutdown
+ let err = Handle::current().block_on(listener.accept()).unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
+ }
+
+ // All io tests are ignored for now. See above why that is.
+ #[ignore]
+ #[cfg(unix)]
+ #[test]
+ fn unix_listener_shutdown_after_accept() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().join("socket");
+
+ let listener = net::UnixListener::bind(path).unwrap();
+
+ let accept_future = listener.accept();
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ // this should not timeout but fail immediately since the runtime has been shutdown
+ let err = Handle::current().block_on(accept_future).unwrap_err();
+
+ assert_eq!(err.kind(), std::io::ErrorKind::Other);
+ assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
+ }
+
+ // ==== nesting ======
+
+ #[test]
+ #[should_panic(
+ expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
+ )]
+ fn nesting() {
+ fn some_non_async_function() -> i32 {
+ Handle::current().block_on(time::sleep(Duration::from_millis(10)));
+ 1
+ }
+
+ let rt = rt();
+
+ rt.block_on(async { some_non_async_function() });
+ }
+}
+
+multi_threaded_rt_test! {
+ #[cfg(unix)]
+ #[test]
+ fn unix_listener_bind() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().join("socket");
+
+ let listener = net::UnixListener::bind(path).unwrap();
+
+ // this should timeout and not fail immediately since the runtime has not been shutdown
+ let _: tokio::time::error::Elapsed = Handle::current()
+ .block_on(tokio::time::timeout(
+ Duration::from_millis(10),
+ listener.accept(),
+ ))
+ .unwrap_err();
+ }
+
+ // ==== timers ======
+
+ // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
+ // one to drive the timers so they will just hang forever. Therefore they are not tested.
+
+ #[test]
+ fn sleep() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ Handle::current().block_on(time::sleep(Duration::from_millis(100)));
+ }
+
+ #[test]
+ #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
+ fn sleep_before_shutdown_panics() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ let f = time::sleep(Duration::from_millis(100));
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ Handle::current().block_on(f);
+ }
+
+ #[test]
+ #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
+ fn sleep_after_shutdown_panics() {
+ let rt = rt();
+ let _enter = rt.enter();
+
+ rt.shutdown_timeout(Duration::from_secs(1000));
+
+ Handle::current().block_on(time::sleep(Duration::from_millis(100)));
+ }
+}
+
+// ==== utils ======
+
+/// Create a new multi threaded runtime
+fn new_multi_thread(n: usize) -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(n)
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+/// Create a new single threaded runtime
+fn new_current_thread() -> Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+/// Utility to test things on both kinds of runtimes both before and after shutting it down.
+fn test_with_runtimes<F>(f: F)
+where
+ F: Fn(),
+{
+ {
+ println!("current thread runtime");
+
+ let rt = new_current_thread();
+ let _enter = rt.enter();
+ f();
+
+ println!("current thread runtime after shutdown");
+ rt.shutdown_timeout(Duration::from_secs(1000));
+ f();
+ }
+
+ {
+ println!("multi thread (1 thread) runtime");
+
+ let rt = new_multi_thread(1);
+ let _enter = rt.enter();
+ f();
+
+ println!("multi thread runtime after shutdown");
+ rt.shutdown_timeout(Duration::from_secs(1000));
+ f();
+ }
+
+ {
+ println!("multi thread (4 threads) runtime");
+
+ let rt = new_multi_thread(4);
+ let _enter = rt.enter();
+ f();
+
+ println!("multi thread runtime after shutdown");
+ rt.shutdown_timeout(Duration::from_secs(1000));
+ f();
+ }
+}
diff --git a/tests/sync_notify.rs b/tests/sync_notify.rs
index 8ffe020..6c6620b 100644
--- a/tests/sync_notify.rs
+++ b/tests/sync_notify.rs
@@ -134,3 +134,20 @@ fn notify_in_drop_after_wake() {
// Now, notifying **should not** deadlock
notify.notify_waiters();
}
+
+#[test]
+fn notify_one_after_dropped_all() {
+ let notify = Notify::new();
+ let mut notified1 = spawn(async { notify.notified().await });
+
+ assert_pending!(notified1.poll());
+
+ notify.notify_waiters();
+ notify.notify_one();
+
+ drop(notified1);
+
+ let mut notified2 = spawn(async { notify.notified().await });
+
+ assert_ready!(notified2.poll());
+}
diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs
new file mode 100644
index 0000000..60f50d2
--- /dev/null
+++ b/tests/sync_once_cell.rs
@@ -0,0 +1,268 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use std::mem;
+use std::ops::Drop;
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::time::Duration;
+use tokio::runtime;
+use tokio::sync::{OnceCell, SetError};
+use tokio::time;
+
+async fn func1() -> u32 {
+ 5
+}
+
+async fn func2() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ 10
+}
+
+async fn func_err() -> Result<u32, ()> {
+ Err(())
+}
+
+async fn func_ok() -> Result<u32, ()> {
+ Ok(10)
+}
+
+async fn func_panic() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ panic!();
+}
+
+async fn sleep_and_set() -> u32 {
+ // Simulate sleep by pausing time and waiting for another thread to
+ // resume clock when calling `set`, then finding the cell being initialized
+ // by this call
+ time::sleep(Duration::from_millis(2)).await;
+ 5
+}
+
+async fn advance_time_and_set(cell: &'static OnceCell<u32>, v: u32) -> Result<(), SetError<u32>> {
+ time::advance(Duration::from_millis(1)).await;
+ cell.set(v)
+}
+
+#[test]
+fn get_or_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+}
+
+#[test]
+fn get_or_init_panic() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
+
+ time::advance(Duration::from_millis(1)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+}
+
+#[test]
+fn set_and_get() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let _ = rt.spawn(async { ONCE.set(5) }).await;
+ let value = ONCE.get().unwrap();
+ assert_eq!(*value, 5);
+ });
+}
+
+#[test]
+fn get_uninit() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+ let uninit = ONCE.get();
+ assert!(uninit.is_none());
+}
+
+#[test]
+fn set_twice() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ let first = ONCE.set(5);
+ assert_eq!(first, Ok(()));
+ let second = ONCE.set(6);
+ assert!(second.err().unwrap().is_already_init_err());
+}
+
+#[test]
+fn set_while_initializing() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
+ let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
+
+ time::advance(Duration::from_millis(2)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert!(result2.err().unwrap().is_initializing_err());
+ });
+}
+
+#[test]
+fn get_or_try_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ assert!(result1.is_err());
+
+ let result2 = handle2.await.unwrap();
+ assert_eq!(*result2.unwrap(), 10);
+ });
+}
+
+#[test]
+fn drop_cell() {
+ static NUM_DROPS: AtomicU32 = AtomicU32::new(0);
+
+ struct Foo {}
+
+ let fooer = Foo {};
+
+ impl Drop for Foo {
+ fn drop(&mut self) {
+ NUM_DROPS.fetch_add(1, Ordering::Release);
+ }
+ }
+
+ {
+ let once_cell = OnceCell::new();
+ let prev = once_cell.set(fooer);
+ assert!(prev.is_ok())
+ }
+ assert!(NUM_DROPS.load(Ordering::Acquire) == 1);
+}
+
+#[test]
+fn drop_cell_new_with() {
+ static NUM_DROPS: AtomicU32 = AtomicU32::new(0);
+
+ struct Foo {}
+
+ let fooer = Foo {};
+
+ impl Drop for Foo {
+ fn drop(&mut self) {
+ NUM_DROPS.fetch_add(1, Ordering::Release);
+ }
+ }
+
+ {
+ let once_cell = OnceCell::new_with(Some(fooer));
+ assert!(once_cell.initialized());
+ }
+ assert!(NUM_DROPS.load(Ordering::Acquire) == 1);
+}
+
+#[test]
+fn drop_into_inner() {
+ static NUM_DROPS: AtomicU32 = AtomicU32::new(0);
+
+ struct Foo {}
+
+ let fooer = Foo {};
+
+ impl Drop for Foo {
+ fn drop(&mut self) {
+ NUM_DROPS.fetch_add(1, Ordering::Release);
+ }
+ }
+
+ let once_cell = OnceCell::new();
+ assert!(once_cell.set(fooer).is_ok());
+ let fooer = once_cell.into_inner();
+ let count = NUM_DROPS.load(Ordering::Acquire);
+ assert!(count == 0);
+ drop(fooer);
+ let count = NUM_DROPS.load(Ordering::Acquire);
+ assert!(count == 1);
+}
+
+#[test]
+fn drop_into_inner_new_with() {
+ static NUM_DROPS: AtomicU32 = AtomicU32::new(0);
+
+ struct Foo {}
+
+ let fooer = Foo {};
+
+ impl Drop for Foo {
+ fn drop(&mut self) {
+ NUM_DROPS.fetch_add(1, Ordering::Release);
+ }
+ }
+
+ let once_cell = OnceCell::new_with(Some(fooer));
+ let fooer = once_cell.into_inner();
+ let count = NUM_DROPS.load(Ordering::Acquire);
+ assert!(count == 0);
+ mem::drop(fooer);
+ let count = NUM_DROPS.load(Ordering::Acquire);
+ assert!(count == 1);
+}
diff --git a/tests/sync_oneshot.rs b/tests/sync_oneshot.rs
index 195c255..1aab810 100644
--- a/tests/sync_oneshot.rs
+++ b/tests/sync_oneshot.rs
@@ -2,6 +2,7 @@
#![cfg(feature = "full")]
use tokio::sync::oneshot;
+use tokio::sync::oneshot::error::TryRecvError;
use tokio_test::*;
use std::future::Future;
@@ -181,6 +182,27 @@ fn close_try_recv_poll() {
}
#[test]
+fn close_after_recv() {
+ let (tx, mut rx) = oneshot::channel::<i32>();
+
+ tx.send(17).unwrap();
+
+ assert_eq!(17, rx.try_recv().unwrap());
+ rx.close();
+}
+
+#[test]
+fn try_recv_after_completion() {
+ let (tx, mut rx) = oneshot::channel::<i32>();
+
+ tx.send(17).unwrap();
+
+ assert_eq!(17, rx.try_recv().unwrap());
+ assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
+ rx.close();
+}
+
+#[test]
fn drops_tasks() {
let (mut tx, mut rx) = oneshot::channel::<i32>();
let mut tx_task = task::spawn(());
diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs
index 872b845..e12052b 100644
--- a/tests/sync_rwlock.rs
+++ b/tests/sync_rwlock.rs
@@ -54,7 +54,7 @@ fn read_exclusive_pending() {
// should be made available when one of the shared acesses is dropped
#[test]
fn exhaust_reading() {
- let rwlock = RwLock::new(100);
+ let rwlock = RwLock::with_max_readers(100, 1024);
let mut reads = Vec::new();
loop {
let mut t = spawn(rwlock.read());
diff --git a/tests/sync_semaphore_owned.rs b/tests/sync_semaphore_owned.rs
index 8ed6209..478c3a3 100644
--- a/tests/sync_semaphore_owned.rs
+++ b/tests/sync_semaphore_owned.rs
@@ -16,6 +16,22 @@ fn try_acquire() {
assert!(p3.is_ok());
}
+#[test]
+fn try_acquire_many() {
+ let sem = Arc::new(Semaphore::new(42));
+ {
+ let p1 = sem.clone().try_acquire_many_owned(42);
+ assert!(p1.is_ok());
+ let p2 = sem.clone().try_acquire_owned();
+ assert!(p2.is_err());
+ }
+ let p3 = sem.clone().try_acquire_many_owned(32);
+ assert!(p3.is_ok());
+ let p4 = sem.clone().try_acquire_many_owned(10);
+ assert!(p4.is_ok());
+ assert!(sem.try_acquire_owned().is_err());
+}
+
#[tokio::test]
async fn acquire() {
let sem = Arc::new(Semaphore::new(1));
@@ -29,6 +45,21 @@ async fn acquire() {
}
#[tokio::test]
+async fn acquire_many() {
+ let semaphore = Arc::new(Semaphore::new(42));
+ let permit32 = semaphore.clone().try_acquire_many_owned(32).unwrap();
+ let (sender, receiver) = tokio::sync::oneshot::channel();
+ let join_handle = tokio::spawn(async move {
+ let _permit10 = semaphore.clone().acquire_many_owned(10).await.unwrap();
+ sender.send(()).unwrap();
+ let _permit32 = semaphore.acquire_many_owned(32).await.unwrap();
+ });
+ receiver.await.unwrap();
+ drop(permit32);
+ join_handle.await.unwrap();
+}
+
+#[tokio::test]
async fn add_permits() {
let sem = Arc::new(Semaphore::new(0));
let sem_clone = sem.clone();
diff --git a/tests/task_abort.rs b/tests/task_abort.rs
index e84f19c..1d72ac3 100644
--- a/tests/task_abort.rs
+++ b/tests/task_abort.rs
@@ -24,3 +24,70 @@ fn test_abort_without_panic_3157() {
let _ = handle.await;
});
}
+
+/// Checks that a suspended task can be aborted inside of a current_thread
+/// executor without panicking as reported in issue #3662:
+/// <https://github.com/tokio-rs/tokio/issues/3662>.
+#[test]
+fn test_abort_without_panic_3662() {
+ use std::sync::atomic::{AtomicBool, Ordering};
+ use std::sync::Arc;
+
+ struct DropCheck(Arc<AtomicBool>);
+
+ impl Drop for DropCheck {
+ fn drop(&mut self) {
+ self.0.store(true, Ordering::SeqCst);
+ }
+ }
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let drop_flag = Arc::new(AtomicBool::new(false));
+ let drop_check = DropCheck(drop_flag.clone());
+
+ let j = tokio::spawn(async move {
+ // NB: just grab the drop check here so that it becomes part of the
+ // task.
+ let _drop_check = drop_check;
+ futures::future::pending::<()>().await;
+ });
+
+ let drop_flag2 = drop_flag.clone();
+
+ let task = std::thread::spawn(move || {
+ // This runs in a separate thread so it doesn't have immediate
+ // thread-local access to the executor. It does however transition
+ // the underlying task to be completed, which will cause it to be
+ // dropped (in this thread no less).
+ assert!(!drop_flag2.load(Ordering::SeqCst));
+ j.abort();
+ // TODO: is this guaranteed at this point?
+ // assert!(drop_flag2.load(Ordering::SeqCst));
+ j
+ })
+ .join()
+ .unwrap();
+
+ assert!(drop_flag.load(Ordering::SeqCst));
+ let result = task.await;
+ assert!(result.unwrap_err().is_cancelled());
+
+ // Note: We do the following to trigger a deferred task cleanup.
+ //
+ // The relevant piece of code you want to look at is in:
+ // `Inner::block_on` of `basic_scheduler.rs`.
+ //
+ // We cause the cleanup to happen by having a poll return Pending once
+ // so that the scheduler can go into the "auxilliary tasks" mode, at
+ // which point the task is removed from the scheduler.
+ let i = tokio::spawn(async move {
+ tokio::task::yield_now().await;
+ });
+
+ i.await.unwrap();
+ });
+}
diff --git a/tests/time_timeout.rs b/tests/time_timeout.rs
index 4efcd8c..dbd80eb 100644
--- a/tests/time_timeout.rs
+++ b/tests/time_timeout.rs
@@ -75,6 +75,33 @@ async fn future_and_timeout_in_future() {
}
#[tokio::test]
+async fn very_large_timeout() {
+ time::pause();
+
+ // Not yet complete
+ let (tx, rx) = oneshot::channel();
+
+ // copy-paste unstable `Duration::MAX`
+ let duration_max = Duration::from_secs(u64::MAX) + Duration::from_nanos(999_999_999);
+
+ // Wrap it with a deadline
+ let mut fut = task::spawn(timeout(duration_max, rx));
+
+ // Ready!
+ assert_pending!(fut.poll());
+
+ // Turn the timer, it runs for the elapsed time
+ time::advance(Duration::from_secs(86400 * 365 * 10)).await;
+
+ assert_pending!(fut.poll());
+
+ // Complete the future
+ tx.send(()).unwrap();
+
+ assert_ready_ok!(fut.poll()).unwrap();
+}
+
+#[tokio::test]
async fn deadline_now_elapses() {
use futures::future::pending;