From 28f5548b7bc0734d62d4624bb87b2bc60c78d49a Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Wed, 21 Apr 2021 15:58:31 +0200 Subject: Update to 1.50.0 Test: atest Change-Id: I94e6acadea178b0b957fbf853a590f155d1bd973 --- tests/async_send_sync.rs | 25 ++ tests/io_copy_bidirectional.rs | 128 +++++++++++ tests/macros_select.rs | 59 +++++ tests/rt_common.rs | 26 +++ tests/rt_handle_block_on.rs | 511 +++++++++++++++++++++++++++++++++++++++++ tests/sync_notify.rs | 17 ++ tests/sync_once_cell.rs | 268 +++++++++++++++++++++ tests/sync_oneshot.rs | 22 ++ tests/sync_rwlock.rs | 2 +- tests/sync_semaphore_owned.rs | 31 +++ tests/task_abort.rs | 67 ++++++ tests/time_timeout.rs | 27 +++ 12 files changed, 1182 insertions(+), 1 deletion(-) create mode 100644 tests/io_copy_bidirectional.rs create mode 100644 tests/rt_handle_block_on.rs create mode 100644 tests/sync_once_cell.rs (limited to 'tests') diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index 671fa4a..211c572 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -1,9 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +#![allow(clippy::type_complexity)] use std::cell::Cell; +use std::future::Future; use std::io::{Cursor, SeekFrom}; use std::net::SocketAddr; +use std::pin::Pin; use std::rc::Rc; use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; @@ -265,6 +268,28 @@ async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin> + Send>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin>>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin> + Send>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin>>>): !Send & !Sync); +assert_value!(tokio::sync::OnceCell: Send & Sync); +assert_value!(tokio::sync::OnceCell>: Send & !Sync); +assert_value!(tokio::sync::OnceCell>: !Send & !Sync); + async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFutureSync<()>): Send & Sync); async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFutureSend<()>): Send & !Sync); async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFuture<()>): !Send & !Sync); diff --git a/tests/io_copy_bidirectional.rs b/tests/io_copy_bidirectional.rs new file mode 100644 index 0000000..17c0597 --- /dev/null +++ b/tests/io_copy_bidirectional.rs @@ -0,0 +1,128 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::time::Duration; +use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::task::JoinHandle; + +async fn make_socketpair() -> (TcpStream, TcpStream) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let connector = TcpStream::connect(addr); + let acceptor = listener.accept(); + + let (c1, c2) = tokio::join!(connector, acceptor); + + (c1.unwrap(), c2.unwrap().0) +} + +async fn block_write(s: &mut TcpStream) -> usize { + static BUF: [u8; 2048] = [0; 2048]; + + let mut copied = 0; + loop { + tokio::select! { + result = s.write(&BUF) => { + copied += result.expect("write error") + }, + _ = tokio::time::sleep(Duration::from_millis(100)) => { + break; + } + } + } + + copied +} + +async fn symmetric(mut cb: F) +where + F: FnMut(JoinHandle>, TcpStream, TcpStream) -> Fut, + Fut: std::future::Future, +{ + // We run the test twice, with streams passed to copy_bidirectional in + // different orders, in order to ensure that the two arguments are + // interchangable. + + let (a, mut a1) = make_socketpair().await; + let (b, mut b1) = make_socketpair().await; + + let handle = tokio::spawn(async move { copy_bidirectional(&mut a1, &mut b1).await }); + cb(handle, a, b).await; + + let (a, mut a1) = make_socketpair().await; + let (b, mut b1) = make_socketpair().await; + + let handle = tokio::spawn(async move { copy_bidirectional(&mut b1, &mut a1).await }); + + cb(handle, b, a).await; +} + +#[tokio::test] +async fn test_basic_transfer() { + symmetric(|_handle, mut a, mut b| async move { + a.write_all(b"test").await.unwrap(); + let mut tmp = [0; 4]; + b.read_exact(&mut tmp).await.unwrap(); + assert_eq!(&tmp[..], b"test"); + }) + .await +} + +#[tokio::test] +async fn test_transfer_after_close() { + symmetric(|handle, mut a, mut b| async move { + AsyncWriteExt::shutdown(&mut a).await.unwrap(); + b.read_to_end(&mut Vec::new()).await.unwrap(); + + b.write_all(b"quux").await.unwrap(); + let mut tmp = [0; 4]; + a.read_exact(&mut tmp).await.unwrap(); + assert_eq!(&tmp[..], b"quux"); + + // Once both are closed, we should have our handle back + drop(b); + + assert_eq!(handle.await.unwrap().unwrap(), (0, 4)); + }) + .await +} + +#[tokio::test] +async fn blocking_one_side_does_not_block_other() { + symmetric(|handle, mut a, mut b| async move { + block_write(&mut a).await; + + b.write_all(b"quux").await.unwrap(); + let mut tmp = [0; 4]; + a.read_exact(&mut tmp).await.unwrap(); + assert_eq!(&tmp[..], b"quux"); + + AsyncWriteExt::shutdown(&mut a).await.unwrap(); + + let mut buf = Vec::new(); + b.read_to_end(&mut buf).await.unwrap(); + + drop(b); + + assert_eq!(handle.await.unwrap().unwrap(), (buf.len() as u64, 4)); + }) + .await +} + +#[tokio::test] +async fn immediate_exit_on_error() { + symmetric(|handle, mut a, mut b| async move { + block_write(&mut a).await; + + // Fill up the b->copy->a path. We expect that this will _not_ drain + // before we exit the copy task. + let _bytes_written = block_write(&mut b).await; + + // Drop b. We should not wait for a to consume the data buffered in the + // copy loop, since b will be failing writes. + drop(b); + assert!(handle.await.unwrap().is_err()); + }) + .await +} diff --git a/tests/macros_select.rs b/tests/macros_select.rs index 3359849..ea06d51 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -481,3 +481,62 @@ async fn mut_on_left_hand_side() { .await; assert_eq!(v, 2); } + +#[tokio::test] +async fn biased_one_not_ready() { + let (_tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = oneshot::channel::(); + let (tx3, rx3) = oneshot::channel::(); + + tx2.send(2).unwrap(); + tx3.send(3).unwrap(); + + let v = tokio::select! { + biased; + + _ = rx1 => unreachable!(), + res = rx2 => { + assert_ok!(res) + }, + _ = rx3 => { + panic!("This branch should never be activated because `rx2` should be polled before `rx3` due to `biased;`.") + } + }; + + assert_eq!(2, v); +} + +#[tokio::test] +async fn biased_eventually_ready() { + use tokio::task::yield_now; + + let one = async {}; + let two = async { yield_now().await }; + let three = async { yield_now().await }; + + let mut count = 0u8; + + tokio::pin!(one, two, three); + + loop { + tokio::select! { + biased; + + _ = &mut two, if count < 2 => { + count += 1; + assert_eq!(count, 2); + } + _ = &mut three, if count < 3 => { + count += 1; + assert_eq!(count, 3); + } + _ = &mut one, if count < 1 => { + count += 1; + assert_eq!(count, 1); + } + else => break, + } + } + + assert_eq!(count, 3); +} diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 9aef4b9..cb1d0f6 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -1017,6 +1017,32 @@ rt_test! { }); } + #[test] + fn coop_unconstrained() { + use std::task::Poll::Ready; + + let rt = rt(); + + rt.block_on(async { + // Create a bunch of tasks + let mut tasks = (0..1_000).map(|_| { + tokio::spawn(async { }) + }).collect::>(); + + // Hope that all the tasks complete... + time::sleep(Duration::from_millis(100)).await; + + tokio::task::unconstrained(poll_fn(|cx| { + // All the tasks should be ready + for task in &mut tasks { + assert!(Pin::new(task).poll(cx).is_ready()); + } + + Ready(()) + })).await; + }); + } + // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test] diff --git a/tests/rt_handle_block_on.rs b/tests/rt_handle_block_on.rs new file mode 100644 index 0000000..5234258 --- /dev/null +++ b/tests/rt_handle_block_on.rs @@ -0,0 +1,511 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// All io tests that deal with shutdown is currently ignored because there are known bugs in with +// shutting down the io driver while concurrently registering new resources. See +// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details. +// +// When this has been fixed we want to re-enable these tests. + +use std::time::Duration; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use tokio::{fs, net, time}; + +macro_rules! multi_threaded_rt_test { + ($($t:tt)*) => { + mod threaded_scheduler_4_threads_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +macro_rules! rt_test { + ($($t:tt)*) => { + mod current_thread_scheduler { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_4_threads { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +// ==== runtime independent futures ====== + +#[test] +fn basic() { + test_with_runtimes(|| { + let one = Handle::current().block_on(async { 1 }); + assert_eq!(1, one); + }); +} + +#[test] +fn bounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::channel(1024); + + Handle::current().block_on(tx.send(42)).unwrap(); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }); +} + +#[test] +fn unbounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let _ = tx.send(42); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }) +} + +rt_test! { + // ==== spawn blocking futures ====== + + #[test] + fn basic_fs() { + let rt = rt(); + let _enter = rt.enter(); + + let contents = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap(); + assert!(contents.contains("Cargo.toml")); + } + + #[test] + fn fs_shutdown_before_started() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err: std::io::Error = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + + let inner_err = err.get_ref().expect("no inner error"); + assert_eq!(inner_err.to_string(), "background task failed"); + } + + #[test] + fn basic_spawn_blocking() { + let rt = rt(); + let _enter = rt.enter(); + + let answer = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap(); + + assert_eq!(answer, 42); + } + + #[test] + fn spawn_blocking_after_shutdown_fails() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let join_err = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap_err(); + + assert!(join_err.is_cancelled()); + } + + #[test] + fn spawn_blocking_started_before_shutdown_continues() { + let rt = rt(); + let _enter = rt.enter(); + + let handle = spawn_blocking(|| { + std::thread::sleep(Duration::from_secs(1)); + 42 + }); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let answer = Handle::current().block_on(handle).unwrap(); + + assert_eq!(answer, 42); + } + + // ==== net ====== + + #[test] + fn tcp_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap(); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn tcp_listener_connect_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn tcp_listener_connect_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::TcpListener::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_socket_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn udp_stream_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn udp_stream_bind_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::UdpSocket::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = net::UnixListener::bind(path).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_shutdown_after_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(listener.accept()).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_shutdown_after_accept() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + let accept_future = listener.accept(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(accept_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // ==== nesting ====== + + #[test] + #[should_panic( + expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks." + )] + fn nesting() { + fn some_non_async_function() -> i32 { + Handle::current().block_on(time::sleep(Duration::from_millis(10))); + 1 + } + + let rt = rt(); + + rt.block_on(async { some_non_async_function() }); + } +} + +multi_threaded_rt_test! { + #[cfg(unix)] + #[test] + fn unix_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + // this should timeout and not fail immediately since the runtime has not been shutdown + let _: tokio::time::error::Elapsed = Handle::current() + .block_on(tokio::time::timeout( + Duration::from_millis(10), + listener.accept(), + )) + .unwrap_err(); + } + + // ==== timers ====== + + // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no + // one to drive the timers so they will just hang forever. Therefore they are not tested. + + #[test] + fn sleep() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_before_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + let f = time::sleep(Duration::from_millis(100)); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(f); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_after_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } +} + +// ==== utils ====== + +/// Create a new multi threaded runtime +fn new_multi_thread(n: usize) -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() +} + +/// Create a new single threaded runtime +fn new_current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +/// Utility to test things on both kinds of runtimes both before and after shutting it down. +fn test_with_runtimes(f: F) +where + F: Fn(), +{ + { + println!("current thread runtime"); + + let rt = new_current_thread(); + let _enter = rt.enter(); + f(); + + println!("current thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (1 thread) runtime"); + + let rt = new_multi_thread(1); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (4 threads) runtime"); + + let rt = new_multi_thread(4); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } +} diff --git a/tests/sync_notify.rs b/tests/sync_notify.rs index 8ffe020..6c6620b 100644 --- a/tests/sync_notify.rs +++ b/tests/sync_notify.rs @@ -134,3 +134,20 @@ fn notify_in_drop_after_wake() { // Now, notifying **should not** deadlock notify.notify_waiters(); } + +#[test] +fn notify_one_after_dropped_all() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + + assert_pending!(notified1.poll()); + + notify.notify_waiters(); + notify.notify_one(); + + drop(notified1); + + let mut notified2 = spawn(async { notify.notified().await }); + + assert_ready!(notified2.poll()); +} diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs new file mode 100644 index 0000000..60f50d2 --- /dev/null +++ b/tests/sync_once_cell.rs @@ -0,0 +1,268 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::mem; +use std::ops::Drop; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; +use tokio::runtime; +use tokio::sync::{OnceCell, SetError}; +use tokio::time; + +async fn func1() -> u32 { + 5 +} + +async fn func2() -> u32 { + time::sleep(Duration::from_millis(1)).await; + 10 +} + +async fn func_err() -> Result { + Err(()) +} + +async fn func_ok() -> Result { + Ok(10) +} + +async fn func_panic() -> u32 { + time::sleep(Duration::from_millis(1)).await; + panic!(); +} + +async fn sleep_and_set() -> u32 { + // Simulate sleep by pausing time and waiting for another thread to + // resume clock when calling `set`, then finding the cell being initialized + // by this call + time::sleep(Duration::from_millis(2)).await; + 5 +} + +async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { + time::advance(Duration::from_millis(1)).await; + cell.set(v) +} + +#[test] +fn get_or_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} + +#[test] +fn get_or_init_panic() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); + + time::advance(Duration::from_millis(1)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} + +#[test] +fn set_and_get() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let _ = rt.spawn(async { ONCE.set(5) }).await; + let value = ONCE.get().unwrap(); + assert_eq!(*value, 5); + }); +} + +#[test] +fn get_uninit() { + static ONCE: OnceCell = OnceCell::const_new(); + let uninit = ONCE.get(); + assert!(uninit.is_none()); +} + +#[test] +fn set_twice() { + static ONCE: OnceCell = OnceCell::const_new(); + + let first = ONCE.set(5); + assert_eq!(first, Ok(())); + let second = ONCE.set(6); + assert!(second.err().unwrap().is_already_init_err()); +} + +#[test] +fn set_while_initializing() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); + let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); + + time::advance(Duration::from_millis(2)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert!(result2.err().unwrap().is_initializing_err()); + }); +} + +#[test] +fn get_or_try_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); + let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + assert!(result1.is_err()); + + let result2 = handle2.await.unwrap(); + assert_eq!(*result2.unwrap(), 10); + }); +} + +#[test] +fn drop_cell() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + { + let once_cell = OnceCell::new(); + let prev = once_cell.set(fooer); + assert!(prev.is_ok()) + } + assert!(NUM_DROPS.load(Ordering::Acquire) == 1); +} + +#[test] +fn drop_cell_new_with() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + { + let once_cell = OnceCell::new_with(Some(fooer)); + assert!(once_cell.initialized()); + } + assert!(NUM_DROPS.load(Ordering::Acquire) == 1); +} + +#[test] +fn drop_into_inner() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + let once_cell = OnceCell::new(); + assert!(once_cell.set(fooer).is_ok()); + let fooer = once_cell.into_inner(); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 0); + drop(fooer); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 1); +} + +#[test] +fn drop_into_inner_new_with() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + let once_cell = OnceCell::new_with(Some(fooer)); + let fooer = once_cell.into_inner(); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 0); + mem::drop(fooer); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 1); +} diff --git a/tests/sync_oneshot.rs b/tests/sync_oneshot.rs index 195c255..1aab810 100644 --- a/tests/sync_oneshot.rs +++ b/tests/sync_oneshot.rs @@ -2,6 +2,7 @@ #![cfg(feature = "full")] use tokio::sync::oneshot; +use tokio::sync::oneshot::error::TryRecvError; use tokio_test::*; use std::future::Future; @@ -180,6 +181,27 @@ fn close_try_recv_poll() { let _ = rx.poll(); } +#[test] +fn close_after_recv() { + let (tx, mut rx) = oneshot::channel::(); + + tx.send(17).unwrap(); + + assert_eq!(17, rx.try_recv().unwrap()); + rx.close(); +} + +#[test] +fn try_recv_after_completion() { + let (tx, mut rx) = oneshot::channel::(); + + tx.send(17).unwrap(); + + assert_eq!(17, rx.try_recv().unwrap()); + assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); + rx.close(); +} + #[test] fn drops_tasks() { let (mut tx, mut rx) = oneshot::channel::(); diff --git a/tests/sync_rwlock.rs b/tests/sync_rwlock.rs index 872b845..e12052b 100644 --- a/tests/sync_rwlock.rs +++ b/tests/sync_rwlock.rs @@ -54,7 +54,7 @@ fn read_exclusive_pending() { // should be made available when one of the shared acesses is dropped #[test] fn exhaust_reading() { - let rwlock = RwLock::new(100); + let rwlock = RwLock::with_max_readers(100, 1024); let mut reads = Vec::new(); loop { let mut t = spawn(rwlock.read()); diff --git a/tests/sync_semaphore_owned.rs b/tests/sync_semaphore_owned.rs index 8ed6209..478c3a3 100644 --- a/tests/sync_semaphore_owned.rs +++ b/tests/sync_semaphore_owned.rs @@ -16,6 +16,22 @@ fn try_acquire() { assert!(p3.is_ok()); } +#[test] +fn try_acquire_many() { + let sem = Arc::new(Semaphore::new(42)); + { + let p1 = sem.clone().try_acquire_many_owned(42); + assert!(p1.is_ok()); + let p2 = sem.clone().try_acquire_owned(); + assert!(p2.is_err()); + } + let p3 = sem.clone().try_acquire_many_owned(32); + assert!(p3.is_ok()); + let p4 = sem.clone().try_acquire_many_owned(10); + assert!(p4.is_ok()); + assert!(sem.try_acquire_owned().is_err()); +} + #[tokio::test] async fn acquire() { let sem = Arc::new(Semaphore::new(1)); @@ -28,6 +44,21 @@ async fn acquire() { j.await.unwrap(); } +#[tokio::test] +async fn acquire_many() { + let semaphore = Arc::new(Semaphore::new(42)); + let permit32 = semaphore.clone().try_acquire_many_owned(32).unwrap(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + let join_handle = tokio::spawn(async move { + let _permit10 = semaphore.clone().acquire_many_owned(10).await.unwrap(); + sender.send(()).unwrap(); + let _permit32 = semaphore.acquire_many_owned(32).await.unwrap(); + }); + receiver.await.unwrap(); + drop(permit32); + join_handle.await.unwrap(); +} + #[tokio::test] async fn add_permits() { let sem = Arc::new(Semaphore::new(0)); diff --git a/tests/task_abort.rs b/tests/task_abort.rs index e84f19c..1d72ac3 100644 --- a/tests/task_abort.rs +++ b/tests/task_abort.rs @@ -24,3 +24,70 @@ fn test_abort_without_panic_3157() { let _ = handle.await; }); } + +/// Checks that a suspended task can be aborted inside of a current_thread +/// executor without panicking as reported in issue #3662: +/// . +#[test] +fn test_abort_without_panic_3662() { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct DropCheck(Arc); + + impl Drop for DropCheck { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + rt.block_on(async move { + let drop_flag = Arc::new(AtomicBool::new(false)); + let drop_check = DropCheck(drop_flag.clone()); + + let j = tokio::spawn(async move { + // NB: just grab the drop check here so that it becomes part of the + // task. + let _drop_check = drop_check; + futures::future::pending::<()>().await; + }); + + let drop_flag2 = drop_flag.clone(); + + let task = std::thread::spawn(move || { + // This runs in a separate thread so it doesn't have immediate + // thread-local access to the executor. It does however transition + // the underlying task to be completed, which will cause it to be + // dropped (in this thread no less). + assert!(!drop_flag2.load(Ordering::SeqCst)); + j.abort(); + // TODO: is this guaranteed at this point? + // assert!(drop_flag2.load(Ordering::SeqCst)); + j + }) + .join() + .unwrap(); + + assert!(drop_flag.load(Ordering::SeqCst)); + let result = task.await; + assert!(result.unwrap_err().is_cancelled()); + + // Note: We do the following to trigger a deferred task cleanup. + // + // The relevant piece of code you want to look at is in: + // `Inner::block_on` of `basic_scheduler.rs`. + // + // We cause the cleanup to happen by having a poll return Pending once + // so that the scheduler can go into the "auxilliary tasks" mode, at + // which point the task is removed from the scheduler. + let i = tokio::spawn(async move { + tokio::task::yield_now().await; + }); + + i.await.unwrap(); + }); +} diff --git a/tests/time_timeout.rs b/tests/time_timeout.rs index 4efcd8c..dbd80eb 100644 --- a/tests/time_timeout.rs +++ b/tests/time_timeout.rs @@ -74,6 +74,33 @@ async fn future_and_timeout_in_future() { assert_ready_ok!(fut.poll()).unwrap(); } +#[tokio::test] +async fn very_large_timeout() { + time::pause(); + + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // copy-paste unstable `Duration::MAX` + let duration_max = Duration::from_secs(u64::MAX) + Duration::from_nanos(999_999_999); + + // Wrap it with a deadline + let mut fut = task::spawn(timeout(duration_max, rx)); + + // Ready! + assert_pending!(fut.poll()); + + // Turn the timer, it runs for the elapsed time + time::advance(Duration::from_secs(86400 * 365 * 10)).await; + + assert_pending!(fut.poll()); + + // Complete the future + tx.send(()).unwrap(); + + assert_ready_ok!(fut.poll()).unwrap(); +} + #[tokio::test] async fn deadline_now_elapses() { use futures::future::pending; -- cgit v1.2.3