diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/_require_full.rs | 8 | ||||
-rw-r--r-- | tests/buffered.rs | 4 | ||||
-rw-r--r-- | tests/io_driver.rs | 2 | ||||
-rw-r--r-- | tests/macros_join.rs | 2 | ||||
-rw-r--r-- | tests/macros_select.rs | 2 | ||||
-rw-r--r-- | tests/macros_try_join.rs | 2 | ||||
-rw-r--r-- | tests/rt_common.rs | 2 | ||||
-rw-r--r-- | tests/rt_metrics.rs | 19 | ||||
-rw-r--r-- | tests/support/leaked_buffers.rs | 6 | ||||
-rw-r--r-- | tests/support/panic.rs | 8 | ||||
-rw-r--r-- | tests/sync_broadcast.rs | 60 | ||||
-rw-r--r-- | tests/sync_once_cell.rs | 355 | ||||
-rw-r--r-- | tests/task_blocking.rs | 83 | ||||
-rw-r--r-- | tests/task_join_set.rs | 95 | ||||
-rw-r--r-- | tests/tcp_peek.rs | 2 |
15 files changed, 413 insertions, 237 deletions
diff --git a/tests/_require_full.rs b/tests/_require_full.rs index a339374..4b9698a 100644 --- a/tests/_require_full.rs +++ b/tests/_require_full.rs @@ -1,2 +1,8 @@ -#![cfg(not(any(feature = "full", tokio_wasm)))] +#[cfg(not(any(feature = "full", tokio_wasm)))] compile_error!("run main Tokio tests with `--features full`"); + +// CI sets `--cfg tokio_no_parking_lot` when trying to run tests with +// `parking_lot` disabled. This check prevents "silent failure" if `parking_lot` +// accidentally gets enabled. +#[cfg(all(tokio_no_parking_lot, feature = "parking_lot"))] +compile_error!("parking_lot feature enabled when it should not be"); diff --git a/tests/buffered.rs b/tests/buffered.rs index 19afebd..4251c3f 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -18,10 +18,10 @@ async fn echo_server() { let msg = "foo bar baz"; let t = thread::spawn(move || { - let mut s = assert_ok!(TcpStream::connect(&addr)); + let mut s = assert_ok!(TcpStream::connect(addr)); let t2 = thread::spawn(move || { - let mut s = assert_ok!(TcpStream::connect(&addr)); + let mut s = assert_ok!(TcpStream::connect(addr)); let mut b = vec![0; msg.len() * N]; assert_ok!(s.read_exact(&mut b)); b diff --git a/tests/io_driver.rs b/tests/io_driver.rs index 2ca5630..97018e0 100644 --- a/tests/io_driver.rs +++ b/tests/io_driver.rs @@ -80,7 +80,7 @@ fn test_drop_on_notify() { drop(task); // Establish a connection to the acceptor - let _s = TcpStream::connect(&addr).unwrap(); + let _s = TcpStream::connect(addr).unwrap(); // Force the reactor to turn rt.block_on(async {}); diff --git a/tests/macros_join.rs b/tests/macros_join.rs index 4441582..9e4d234 100644 --- a/tests/macros_join.rs +++ b/tests/macros_join.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] use std::sync::Arc; #[cfg(tokio_wasm_not_wasi)] diff --git a/tests/macros_select.rs b/tests/macros_select.rs index 60f3738..26d6fec 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] #[cfg(tokio_wasm_not_wasi)] use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; diff --git a/tests/macros_try_join.rs b/tests/macros_try_join.rs index 209516b..6c43222 100644 --- a/tests/macros_try_join.rs +++ b/tests/macros_try_join.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] use std::sync::Arc; diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 53248b2..3892998 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -661,7 +661,7 @@ rt_test! { loop { // Don't use Tokio's `yield_now()` to avoid special defer // logic. - let _: () = futures::future::poll_fn(|cx| { + futures::future::poll_fn::<(), _>(|cx| { cx.waker().wake_by_ref(); std::task::Poll::Pending }).await; diff --git a/tests/rt_metrics.rs b/tests/rt_metrics.rs index 2a9f998..fdb2fb5 100644 --- a/tests/rt_metrics.rs +++ b/tests/rt_metrics.rs @@ -31,6 +31,19 @@ fn num_idle_blocking_threads() { rt.block_on(async { time::sleep(Duration::from_millis(5)).await; }); + + // We need to wait until the blocking thread has become idle. Usually 5ms is + // enough for this to happen, but not always. When it isn't enough, sleep + // for another second. We don't always wait for a whole second since we want + // the test suite to finish quickly. + // + // Note that the timeout for idle threads to be killed is 10 seconds. + if 0 == rt.metrics().num_idle_blocking_threads() { + rt.block_on(async { + time::sleep(Duration::from_secs(1)).await; + }); + } + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); } @@ -128,7 +141,7 @@ fn worker_noop_count() { time::sleep(Duration::from_millis(1)).await; }); drop(rt); - assert!(2 <= metrics.worker_noop_count(0)); + assert!(0 < metrics.worker_noop_count(0)); let rt = threaded(); let metrics = rt.metrics(); @@ -136,8 +149,8 @@ fn worker_noop_count() { time::sleep(Duration::from_millis(1)).await; }); drop(rt); - assert!(1 <= metrics.worker_noop_count(0)); - assert!(1 <= metrics.worker_noop_count(1)); + assert!(0 < metrics.worker_noop_count(0)); + assert!(0 < metrics.worker_noop_count(1)); } #[test] diff --git a/tests/support/leaked_buffers.rs b/tests/support/leaked_buffers.rs index 3ee8a18..a6079fb 100644 --- a/tests/support/leaked_buffers.rs +++ b/tests/support/leaked_buffers.rs @@ -18,9 +18,9 @@ impl LeakedBuffers { } } pub unsafe fn create<'a>(&mut self, size: usize) -> &'a mut [u8] { - let mut new_mem = vec![0u8; size].into_boxed_slice(); - let slice = std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len()); + let new_mem = vec![0u8; size].into_boxed_slice(); self.leaked_vecs.push(new_mem); - slice + let new_mem = self.leaked_vecs.last_mut().unwrap(); + std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len()) } } diff --git a/tests/support/panic.rs b/tests/support/panic.rs index 7f60c76..df2f59d 100644 --- a/tests/support/panic.rs +++ b/tests/support/panic.rs @@ -1,9 +1,8 @@ -use parking_lot::{const_mutex, Mutex}; use std::panic; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> { - static PANIC_MUTEX: Mutex<()> = const_mutex(()); + static PANIC_MUTEX: Mutex<()> = Mutex::new(()); { let _guard = PANIC_MUTEX.lock(); @@ -16,6 +15,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri let panic_location = panic_info.location().unwrap(); panic_file .lock() + .unwrap() .clone_from(&Some(panic_location.file().to_string())); })); } @@ -26,7 +26,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri panic::set_hook(prev_hook); if result.is_err() { - panic_file.lock().clone() + panic_file.lock().unwrap().clone() } else { None } diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs index 2221fe3..cd66924 100644 --- a/tests/sync_broadcast.rs +++ b/tests/sync_broadcast.rs @@ -527,3 +527,63 @@ fn resubscribe_to_closed_channel() { let mut rx_resub = rx.resubscribe(); assert_closed!(rx_resub.try_recv()); } + +#[test] +fn sender_len() { + let (tx, mut rx1) = broadcast::channel(4); + let mut rx2 = tx.subscribe(); + + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx1); + assert_recv!(rx1); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx2); + + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + tx.send(4).unwrap(); + tx.send(5).unwrap(); + tx.send(6).unwrap(); + + assert_eq!(tx.len(), 4); + assert!(!tx.is_empty()); +} + +#[test] +#[cfg(not(tokio_wasm_not_wasi))] +fn sender_len_random() { + use rand::Rng; + + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + for _ in 0..1000 { + match rand::thread_rng().gen_range(0..4) { + 0 => { + let _ = rx1.try_recv(); + } + 1 => { + let _ = rx2.try_recv(); + } + _ => { + tx.send(0).unwrap(); + } + } + + let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); + assert_eq!(tx.len(), expected_len); + } +} diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs index 18eaf93..38dfa7c 100644 --- a/tests/sync_once_cell.rs +++ b/tests/sync_once_cell.rs @@ -4,178 +4,7 @@ use std::mem; use std::ops::Drop; use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::Duration; -use tokio::runtime; -use tokio::sync::{OnceCell, SetError}; -use tokio::time; - -async fn func1() -> u32 { - 5 -} - -async fn func2() -> u32 { - time::sleep(Duration::from_millis(1)).await; - 10 -} - -async fn func_err() -> Result<u32, ()> { - Err(()) -} - -async fn func_ok() -> Result<u32, ()> { - Ok(10) -} - -async fn func_panic() -> u32 { - time::sleep(Duration::from_millis(1)).await; - panic!(); -} - -async fn sleep_and_set() -> u32 { - // Simulate sleep by pausing time and waiting for another thread to - // resume clock when calling `set`, then finding the cell being initialized - // by this call - time::sleep(Duration::from_millis(2)).await; - 5 -} - -async fn advance_time_and_set(cell: &'static OnceCell<u32>, v: u32) -> Result<(), SetError<u32>> { - time::advance(Duration::from_millis(1)).await; - cell.set(v) -} - -#[test] -fn get_or_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); - - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); - - time::advance(Duration::from_millis(1)).await; - time::resume(); - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); -} - -#[test] -fn get_or_init_panic() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - rt.block_on(async { - time::pause(); - - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); - - time::advance(Duration::from_millis(1)).await; - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); -} - -#[test] -fn set_and_get() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - rt.block_on(async { - let _ = rt.spawn(async { ONCE.set(5) }).await; - let value = ONCE.get().unwrap(); - assert_eq!(*value, 5); - }); -} - -#[test] -fn get_uninit() { - static ONCE: OnceCell<u32> = OnceCell::const_new(); - let uninit = ONCE.get(); - assert!(uninit.is_none()); -} - -#[test] -fn set_twice() { - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - let first = ONCE.set(5); - assert_eq!(first, Ok(())); - let second = ONCE.set(6); - assert!(second.err().unwrap().is_already_init_err()); -} - -#[test] -fn set_while_initializing() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - rt.block_on(async { - time::pause(); - - let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); - let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); - - time::advance(Duration::from_millis(2)).await; - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert!(result2.err().unwrap().is_initializing_err()); - }); -} - -#[test] -fn get_or_try_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); - - static ONCE: OnceCell<u32> = OnceCell::const_new(); - - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); - let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); - - time::advance(Duration::from_millis(1)).await; - time::resume(); - - let result1 = handle1.await.unwrap(); - assert!(result1.is_err()); - - let result2 = handle2.await.unwrap(); - assert_eq!(*result2.unwrap(), 10); - }); -} +use tokio::sync::OnceCell; #[test] fn drop_cell() { @@ -272,3 +101,185 @@ fn from() { let cell = OnceCell::from(2); assert_eq!(*cell.get().unwrap(), 2); } + +#[cfg(feature = "parking_lot")] +mod parking_lot { + use super::*; + + use tokio::runtime; + use tokio::sync::SetError; + use tokio::time; + + use std::time::Duration; + + async fn func1() -> u32 { + 5 + } + + async fn func2() -> u32 { + time::sleep(Duration::from_millis(1)).await; + 10 + } + + async fn func_err() -> Result<u32, ()> { + Err(()) + } + + async fn func_ok() -> Result<u32, ()> { + Ok(10) + } + + async fn func_panic() -> u32 { + time::sleep(Duration::from_millis(1)).await; + panic!(); + } + + async fn sleep_and_set() -> u32 { + // Simulate sleep by pausing time and waiting for another thread to + // resume clock when calling `set`, then finding the cell being initialized + // by this call + time::sleep(Duration::from_millis(2)).await; + 5 + } + + async fn advance_time_and_set( + cell: &'static OnceCell<u32>, + v: u32, + ) -> Result<(), SetError<u32>> { + time::advance(Duration::from_millis(1)).await; + cell.set(v) + } + + #[test] + fn get_or_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); + } + + #[test] + fn get_or_init_panic() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); + + time::advance(Duration::from_millis(1)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); + } + + #[test] + fn set_and_get() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + rt.block_on(async { + let _ = rt.spawn(async { ONCE.set(5) }).await; + let value = ONCE.get().unwrap(); + assert_eq!(*value, 5); + }); + } + + #[test] + fn get_uninit() { + static ONCE: OnceCell<u32> = OnceCell::const_new(); + let uninit = ONCE.get(); + assert!(uninit.is_none()); + } + + #[test] + fn set_twice() { + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + let first = ONCE.set(5); + assert_eq!(first, Ok(())); + let second = ONCE.set(6); + assert!(second.err().unwrap().is_already_init_err()); + } + + #[test] + fn set_while_initializing() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); + let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); + + time::advance(Duration::from_millis(2)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert!(result2.err().unwrap().is_initializing_err()); + }); + } + + #[test] + fn get_or_try_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell<u32> = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); + let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + assert!(result1.is_err()); + + let result2 = handle2.await.unwrap(); + assert_eq!(*result2.unwrap(), 10); + }); + } +} diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index 2e0881c..d82a0e0 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads -use tokio::{runtime, task}; +use tokio::{runtime, task, time}; use tokio_test::assert_ok; use std::thread; @@ -227,3 +227,84 @@ fn coop_disabled_in_block_in_place_in_block_on() { done_rx.recv().unwrap().unwrap(); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_when_paused() { + // Do not auto-advance time when we have started a blocking task that has + // not yet finished. + time::timeout( + Duration::from_secs(3), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + + // Really: Do not auto-advance time, even if the timeout is short and the + // blocking task runs for longer than that. It doesn't matter: Tokio time + // is paused; system time is not. + time::timeout( + Duration::from_millis(1), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn unawaited_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + + // When this task finishes, time should auto-advance, even though the + // JoinHandle has not been awaited yet. + let a = task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + }); + + crate::time::sleep(Duration::from_secs(15)).await; + a.await.expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn panicking_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + let result = time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + panic!("blocking task panicked"); + }), + ) + .await + .expect("timeout should not trigger"); + assert!(result.is_err(), "blocking task should have panicked"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} diff --git a/tests/task_join_set.rs b/tests/task_join_set.rs index 20d4927..b1b6cf9 100644 --- a/tests/task_join_set.rs +++ b/tests/task_join_set.rs @@ -5,8 +5,6 @@ use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio::time::Duration; -use futures::future::FutureExt; - fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .build() @@ -156,49 +154,6 @@ fn runtime_gone() { .is_cancelled()); } -// This ensures that `join_next` works correctly when the coop budget is -// exhausted. -#[tokio::test(flavor = "current_thread")] -async fn join_set_coop() { - // Large enough to trigger coop. - const TASK_NUM: u32 = 1000; - - static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); - - let mut set = JoinSet::new(); - - for _ in 0..TASK_NUM { - set.spawn(async { - SEM.add_permits(1); - }); - } - - // Wait for all tasks to complete. - // - // Since this is a `current_thread` runtime, there's no race condition - // between the last permit being added and the task completing. - let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); - - let mut count = 0; - let mut coop_count = 0; - loop { - match set.join_next().now_or_never() { - Some(Some(Ok(()))) => {} - Some(Some(Err(err))) => panic!("failed: {}", err), - None => { - coop_count += 1; - tokio::task::yield_now().await; - continue; - } - Some(None) => break, - } - - count += 1; - } - assert!(coop_count >= 1); - assert_eq!(count, TASK_NUM); -} - #[tokio::test(start_paused = true)] async fn abort_all() { let mut set: JoinSet<()> = JoinSet::new(); @@ -228,3 +183,53 @@ async fn abort_all() { assert_eq!(count, 10); assert_eq!(set.len(), 0); } + +#[cfg(feature = "parking_lot")] +mod parking_lot { + use super::*; + + use futures::future::FutureExt; + + // This ensures that `join_next` works correctly when the coop budget is + // exhausted. + #[tokio::test(flavor = "current_thread")] + async fn join_set_coop() { + // Large enough to trigger coop. + const TASK_NUM: u32 = 1000; + + static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); + + let mut set = JoinSet::new(); + + for _ in 0..TASK_NUM { + set.spawn(async { + SEM.add_permits(1); + }); + } + + // Wait for all tasks to complete. + // + // Since this is a `current_thread` runtime, there's no race condition + // between the last permit being added and the task completing. + let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); + + let mut count = 0; + let mut coop_count = 0; + loop { + match set.join_next().now_or_never() { + Some(Some(Ok(()))) => {} + Some(Some(Err(err))) => panic!("failed: {}", err), + None => { + coop_count += 1; + tokio::task::yield_now().await; + continue; + } + Some(None) => break, + } + + count += 1; + } + assert!(coop_count >= 1); + assert_eq!(count, TASK_NUM); + } +} diff --git a/tests/tcp_peek.rs b/tests/tcp_peek.rs index 03813c2..b712023 100644 --- a/tests/tcp_peek.rs +++ b/tests/tcp_peek.rs @@ -15,7 +15,7 @@ async fn peek() { let addr = listener.local_addr().unwrap(); let t = thread::spawn(move || assert_ok!(listener.accept()).0); - let left = net::TcpStream::connect(&addr).unwrap(); + let left = net::TcpStream::connect(addr).unwrap(); let mut right = t.join().unwrap(); let _ = right.write(&[1, 2, 3, 4]).unwrap(); |