aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/_require_full.rs8
-rw-r--r--tests/buffered.rs4
-rw-r--r--tests/io_driver.rs2
-rw-r--r--tests/macros_join.rs2
-rw-r--r--tests/macros_select.rs2
-rw-r--r--tests/macros_try_join.rs2
-rw-r--r--tests/rt_common.rs2
-rw-r--r--tests/rt_metrics.rs19
-rw-r--r--tests/support/leaked_buffers.rs6
-rw-r--r--tests/support/panic.rs8
-rw-r--r--tests/sync_broadcast.rs60
-rw-r--r--tests/sync_once_cell.rs355
-rw-r--r--tests/task_blocking.rs83
-rw-r--r--tests/task_join_set.rs95
-rw-r--r--tests/tcp_peek.rs2
15 files changed, 413 insertions, 237 deletions
diff --git a/tests/_require_full.rs b/tests/_require_full.rs
index a339374..4b9698a 100644
--- a/tests/_require_full.rs
+++ b/tests/_require_full.rs
@@ -1,2 +1,8 @@
-#![cfg(not(any(feature = "full", tokio_wasm)))]
+#[cfg(not(any(feature = "full", tokio_wasm)))]
compile_error!("run main Tokio tests with `--features full`");
+
+// CI sets `--cfg tokio_no_parking_lot` when trying to run tests with
+// `parking_lot` disabled. This check prevents "silent failure" if `parking_lot`
+// accidentally gets enabled.
+#[cfg(all(tokio_no_parking_lot, feature = "parking_lot"))]
+compile_error!("parking_lot feature enabled when it should not be");
diff --git a/tests/buffered.rs b/tests/buffered.rs
index 19afebd..4251c3f 100644
--- a/tests/buffered.rs
+++ b/tests/buffered.rs
@@ -18,10 +18,10 @@ async fn echo_server() {
let msg = "foo bar baz";
let t = thread::spawn(move || {
- let mut s = assert_ok!(TcpStream::connect(&addr));
+ let mut s = assert_ok!(TcpStream::connect(addr));
let t2 = thread::spawn(move || {
- let mut s = assert_ok!(TcpStream::connect(&addr));
+ let mut s = assert_ok!(TcpStream::connect(addr));
let mut b = vec![0; msg.len() * N];
assert_ok!(s.read_exact(&mut b));
b
diff --git a/tests/io_driver.rs b/tests/io_driver.rs
index 2ca5630..97018e0 100644
--- a/tests/io_driver.rs
+++ b/tests/io_driver.rs
@@ -80,7 +80,7 @@ fn test_drop_on_notify() {
drop(task);
// Establish a connection to the acceptor
- let _s = TcpStream::connect(&addr).unwrap();
+ let _s = TcpStream::connect(addr).unwrap();
// Force the reactor to turn
rt.block_on(async {});
diff --git a/tests/macros_join.rs b/tests/macros_join.rs
index 4441582..9e4d234 100644
--- a/tests/macros_join.rs
+++ b/tests/macros_join.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
use std::sync::Arc;
#[cfg(tokio_wasm_not_wasi)]
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index 60f3738..26d6fec 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
#[cfg(tokio_wasm_not_wasi)]
use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
diff --git a/tests/macros_try_join.rs b/tests/macros_try_join.rs
index 209516b..6c43222 100644
--- a/tests/macros_try_join.rs
+++ b/tests/macros_try_join.rs
@@ -1,5 +1,5 @@
#![cfg(feature = "macros")]
-#![allow(clippy::blacklisted_name)]
+#![allow(clippy::disallowed_names)]
use std::sync::Arc;
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 53248b2..3892998 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -661,7 +661,7 @@ rt_test! {
loop {
// Don't use Tokio's `yield_now()` to avoid special defer
// logic.
- let _: () = futures::future::poll_fn(|cx| {
+ futures::future::poll_fn::<(), _>(|cx| {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}).await;
diff --git a/tests/rt_metrics.rs b/tests/rt_metrics.rs
index 2a9f998..fdb2fb5 100644
--- a/tests/rt_metrics.rs
+++ b/tests/rt_metrics.rs
@@ -31,6 +31,19 @@ fn num_idle_blocking_threads() {
rt.block_on(async {
time::sleep(Duration::from_millis(5)).await;
});
+
+ // We need to wait until the blocking thread has become idle. Usually 5ms is
+ // enough for this to happen, but not always. When it isn't enough, sleep
+ // for another second. We don't always wait for a whole second since we want
+ // the test suite to finish quickly.
+ //
+ // Note that the timeout for idle threads to be killed is 10 seconds.
+ if 0 == rt.metrics().num_idle_blocking_threads() {
+ rt.block_on(async {
+ time::sleep(Duration::from_secs(1)).await;
+ });
+ }
+
assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}
@@ -128,7 +141,7 @@ fn worker_noop_count() {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
- assert!(2 <= metrics.worker_noop_count(0));
+ assert!(0 < metrics.worker_noop_count(0));
let rt = threaded();
let metrics = rt.metrics();
@@ -136,8 +149,8 @@ fn worker_noop_count() {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
- assert!(1 <= metrics.worker_noop_count(0));
- assert!(1 <= metrics.worker_noop_count(1));
+ assert!(0 < metrics.worker_noop_count(0));
+ assert!(0 < metrics.worker_noop_count(1));
}
#[test]
diff --git a/tests/support/leaked_buffers.rs b/tests/support/leaked_buffers.rs
index 3ee8a18..a6079fb 100644
--- a/tests/support/leaked_buffers.rs
+++ b/tests/support/leaked_buffers.rs
@@ -18,9 +18,9 @@ impl LeakedBuffers {
}
}
pub unsafe fn create<'a>(&mut self, size: usize) -> &'a mut [u8] {
- let mut new_mem = vec![0u8; size].into_boxed_slice();
- let slice = std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len());
+ let new_mem = vec![0u8; size].into_boxed_slice();
self.leaked_vecs.push(new_mem);
- slice
+ let new_mem = self.leaked_vecs.last_mut().unwrap();
+ std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len())
}
}
diff --git a/tests/support/panic.rs b/tests/support/panic.rs
index 7f60c76..df2f59d 100644
--- a/tests/support/panic.rs
+++ b/tests/support/panic.rs
@@ -1,9 +1,8 @@
-use parking_lot::{const_mutex, Mutex};
use std::panic;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
- static PANIC_MUTEX: Mutex<()> = const_mutex(());
+ static PANIC_MUTEX: Mutex<()> = Mutex::new(());
{
let _guard = PANIC_MUTEX.lock();
@@ -16,6 +15,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri
let panic_location = panic_info.location().unwrap();
panic_file
.lock()
+ .unwrap()
.clone_from(&Some(panic_location.file().to_string()));
}));
}
@@ -26,7 +26,7 @@ pub fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<Stri
panic::set_hook(prev_hook);
if result.is_err() {
- panic_file.lock().clone()
+ panic_file.lock().unwrap().clone()
} else {
None
}
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
index 2221fe3..cd66924 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
@@ -527,3 +527,63 @@ fn resubscribe_to_closed_channel() {
let mut rx_resub = rx.resubscribe();
assert_closed!(rx_resub.try_recv());
}
+
+#[test]
+fn sender_len() {
+ let (tx, mut rx1) = broadcast::channel(4);
+ let mut rx2 = tx.subscribe();
+
+ assert_eq!(tx.len(), 0);
+ assert!(tx.is_empty());
+
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ tx.send(3).unwrap();
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx1);
+ assert_recv!(rx1);
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx2);
+
+ assert_eq!(tx.len(), 2);
+ assert!(!tx.is_empty());
+
+ tx.send(4).unwrap();
+ tx.send(5).unwrap();
+ tx.send(6).unwrap();
+
+ assert_eq!(tx.len(), 4);
+ assert!(!tx.is_empty());
+}
+
+#[test]
+#[cfg(not(tokio_wasm_not_wasi))]
+fn sender_len_random() {
+ use rand::Rng;
+
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ for _ in 0..1000 {
+ match rand::thread_rng().gen_range(0..4) {
+ 0 => {
+ let _ = rx1.try_recv();
+ }
+ 1 => {
+ let _ = rx2.try_recv();
+ }
+ _ => {
+ tx.send(0).unwrap();
+ }
+ }
+
+ let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
+ assert_eq!(tx.len(), expected_len);
+ }
+}
diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs
index 18eaf93..38dfa7c 100644
--- a/tests/sync_once_cell.rs
+++ b/tests/sync_once_cell.rs
@@ -4,178 +4,7 @@
use std::mem;
use std::ops::Drop;
use std::sync::atomic::{AtomicU32, Ordering};
-use std::time::Duration;
-use tokio::runtime;
-use tokio::sync::{OnceCell, SetError};
-use tokio::time;
-
-async fn func1() -> u32 {
- 5
-}
-
-async fn func2() -> u32 {
- time::sleep(Duration::from_millis(1)).await;
- 10
-}
-
-async fn func_err() -> Result<u32, ()> {
- Err(())
-}
-
-async fn func_ok() -> Result<u32, ()> {
- Ok(10)
-}
-
-async fn func_panic() -> u32 {
- time::sleep(Duration::from_millis(1)).await;
- panic!();
-}
-
-async fn sleep_and_set() -> u32 {
- // Simulate sleep by pausing time and waiting for another thread to
- // resume clock when calling `set`, then finding the cell being initialized
- // by this call
- time::sleep(Duration::from_millis(2)).await;
- 5
-}
-
-async fn advance_time_and_set(cell: &'static OnceCell<u32>, v: u32) -> Result<(), SetError<u32>> {
- time::advance(Duration::from_millis(1)).await;
- cell.set(v)
-}
-
-#[test]
-fn get_or_init() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .start_paused(true)
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
- let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
-
- time::advance(Duration::from_millis(1)).await;
- time::resume();
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert_eq!(*result2, 5);
- });
-}
-
-#[test]
-fn get_or_init_panic() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- time::pause();
-
- let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
- let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
-
- time::advance(Duration::from_millis(1)).await;
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert_eq!(*result2, 5);
- });
-}
-
-#[test]
-fn set_and_get() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let _ = rt.spawn(async { ONCE.set(5) }).await;
- let value = ONCE.get().unwrap();
- assert_eq!(*value, 5);
- });
-}
-
-#[test]
-fn get_uninit() {
- static ONCE: OnceCell<u32> = OnceCell::const_new();
- let uninit = ONCE.get();
- assert!(uninit.is_none());
-}
-
-#[test]
-fn set_twice() {
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- let first = ONCE.set(5);
- assert_eq!(first, Ok(()));
- let second = ONCE.set(6);
- assert!(second.err().unwrap().is_already_init_err());
-}
-
-#[test]
-fn set_while_initializing() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- time::pause();
-
- let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
- let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
-
- time::advance(Duration::from_millis(2)).await;
-
- let result1 = handle1.await.unwrap();
- let result2 = handle2.await.unwrap();
-
- assert_eq!(*result1, 5);
- assert!(result2.err().unwrap().is_initializing_err());
- });
-}
-
-#[test]
-fn get_or_try_init() {
- let rt = runtime::Builder::new_current_thread()
- .enable_time()
- .start_paused(true)
- .build()
- .unwrap();
-
- static ONCE: OnceCell<u32> = OnceCell::const_new();
-
- rt.block_on(async {
- let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
- let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
-
- time::advance(Duration::from_millis(1)).await;
- time::resume();
-
- let result1 = handle1.await.unwrap();
- assert!(result1.is_err());
-
- let result2 = handle2.await.unwrap();
- assert_eq!(*result2.unwrap(), 10);
- });
-}
+use tokio::sync::OnceCell;
#[test]
fn drop_cell() {
@@ -272,3 +101,185 @@ fn from() {
let cell = OnceCell::from(2);
assert_eq!(*cell.get().unwrap(), 2);
}
+
+#[cfg(feature = "parking_lot")]
+mod parking_lot {
+ use super::*;
+
+ use tokio::runtime;
+ use tokio::sync::SetError;
+ use tokio::time;
+
+ use std::time::Duration;
+
+ async fn func1() -> u32 {
+ 5
+ }
+
+ async fn func2() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ 10
+ }
+
+ async fn func_err() -> Result<u32, ()> {
+ Err(())
+ }
+
+ async fn func_ok() -> Result<u32, ()> {
+ Ok(10)
+ }
+
+ async fn func_panic() -> u32 {
+ time::sleep(Duration::from_millis(1)).await;
+ panic!();
+ }
+
+ async fn sleep_and_set() -> u32 {
+ // Simulate sleep by pausing time and waiting for another thread to
+ // resume clock when calling `set`, then finding the cell being initialized
+ // by this call
+ time::sleep(Duration::from_millis(2)).await;
+ 5
+ }
+
+ async fn advance_time_and_set(
+ cell: &'static OnceCell<u32>,
+ v: u32,
+ ) -> Result<(), SetError<u32>> {
+ time::advance(Duration::from_millis(1)).await;
+ cell.set(v)
+ }
+
+ #[test]
+ fn get_or_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+ }
+
+ #[test]
+ fn get_or_init_panic() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await });
+
+ time::advance(Duration::from_millis(1)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert_eq!(*result2, 5);
+ });
+ }
+
+ #[test]
+ fn set_and_get() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let _ = rt.spawn(async { ONCE.set(5) }).await;
+ let value = ONCE.get().unwrap();
+ assert_eq!(*value, 5);
+ });
+ }
+
+ #[test]
+ fn get_uninit() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+ let uninit = ONCE.get();
+ assert!(uninit.is_none());
+ }
+
+ #[test]
+ fn set_twice() {
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ let first = ONCE.set(5);
+ assert_eq!(first, Ok(()));
+ let second = ONCE.set(6);
+ assert!(second.err().unwrap().is_already_init_err());
+ }
+
+ #[test]
+ fn set_while_initializing() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ time::pause();
+
+ let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await });
+ let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await });
+
+ time::advance(Duration::from_millis(2)).await;
+
+ let result1 = handle1.await.unwrap();
+ let result2 = handle2.await.unwrap();
+
+ assert_eq!(*result1, 5);
+ assert!(result2.err().unwrap().is_initializing_err());
+ });
+ }
+
+ #[test]
+ fn get_or_try_init() {
+ let rt = runtime::Builder::new_current_thread()
+ .enable_time()
+ .start_paused(true)
+ .build()
+ .unwrap();
+
+ static ONCE: OnceCell<u32> = OnceCell::const_new();
+
+ rt.block_on(async {
+ let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await });
+ let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await });
+
+ time::advance(Duration::from_millis(1)).await;
+ time::resume();
+
+ let result1 = handle1.await.unwrap();
+ assert!(result1.is_err());
+
+ let result2 = handle2.await.unwrap();
+ assert_eq!(*result2.unwrap(), 10);
+ });
+ }
+}
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index 2e0881c..d82a0e0 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
-use tokio::{runtime, task};
+use tokio::{runtime, task, time};
use tokio_test::assert_ok;
use std::thread;
@@ -227,3 +227,84 @@ fn coop_disabled_in_block_in_place_in_block_on() {
done_rx.recv().unwrap().unwrap();
}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn blocking_when_paused() {
+ // Do not auto-advance time when we have started a blocking task that has
+ // not yet finished.
+ time::timeout(
+ Duration::from_secs(3),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+
+ // Really: Do not auto-advance time, even if the timeout is short and the
+ // blocking task runs for longer than that. It doesn't matter: Tokio time
+ // is paused; system time is not.
+ time::timeout(
+ Duration::from_millis(1),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+ time::timeout(
+ Duration::from_secs(15),
+ task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
+ )
+ .await
+ .expect("timeout should not trigger")
+ .expect("blocking task should finish");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn unawaited_blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+
+ // When this task finishes, time should auto-advance, even though the
+ // JoinHandle has not been awaited yet.
+ let a = task::spawn_blocking(|| {
+ thread::sleep(Duration::from_millis(1));
+ });
+
+ crate::time::sleep(Duration::from_secs(15)).await;
+ a.await.expect("blocking task should finish");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
+
+#[cfg(feature = "test-util")]
+#[tokio::test(start_paused = true)]
+async fn panicking_blocking_task_wakes_paused_runtime() {
+ let t0 = std::time::Instant::now();
+ let result = time::timeout(
+ Duration::from_secs(15),
+ task::spawn_blocking(|| {
+ thread::sleep(Duration::from_millis(1));
+ panic!("blocking task panicked");
+ }),
+ )
+ .await
+ .expect("timeout should not trigger");
+ assert!(result.is_err(), "blocking task should have panicked");
+ assert!(
+ t0.elapsed() < Duration::from_secs(10),
+ "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
+ );
+}
diff --git a/tests/task_join_set.rs b/tests/task_join_set.rs
index 20d4927..b1b6cf9 100644
--- a/tests/task_join_set.rs
+++ b/tests/task_join_set.rs
@@ -5,8 +5,6 @@ use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio::time::Duration;
-use futures::future::FutureExt;
-
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
@@ -156,49 +154,6 @@ fn runtime_gone() {
.is_cancelled());
}
-// This ensures that `join_next` works correctly when the coop budget is
-// exhausted.
-#[tokio::test(flavor = "current_thread")]
-async fn join_set_coop() {
- // Large enough to trigger coop.
- const TASK_NUM: u32 = 1000;
-
- static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
-
- let mut set = JoinSet::new();
-
- for _ in 0..TASK_NUM {
- set.spawn(async {
- SEM.add_permits(1);
- });
- }
-
- // Wait for all tasks to complete.
- //
- // Since this is a `current_thread` runtime, there's no race condition
- // between the last permit being added and the task completing.
- let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
-
- let mut count = 0;
- let mut coop_count = 0;
- loop {
- match set.join_next().now_or_never() {
- Some(Some(Ok(()))) => {}
- Some(Some(Err(err))) => panic!("failed: {}", err),
- None => {
- coop_count += 1;
- tokio::task::yield_now().await;
- continue;
- }
- Some(None) => break,
- }
-
- count += 1;
- }
- assert!(coop_count >= 1);
- assert_eq!(count, TASK_NUM);
-}
-
#[tokio::test(start_paused = true)]
async fn abort_all() {
let mut set: JoinSet<()> = JoinSet::new();
@@ -228,3 +183,53 @@ async fn abort_all() {
assert_eq!(count, 10);
assert_eq!(set.len(), 0);
}
+
+#[cfg(feature = "parking_lot")]
+mod parking_lot {
+ use super::*;
+
+ use futures::future::FutureExt;
+
+ // This ensures that `join_next` works correctly when the coop budget is
+ // exhausted.
+ #[tokio::test(flavor = "current_thread")]
+ async fn join_set_coop() {
+ // Large enough to trigger coop.
+ const TASK_NUM: u32 = 1000;
+
+ static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
+
+ let mut set = JoinSet::new();
+
+ for _ in 0..TASK_NUM {
+ set.spawn(async {
+ SEM.add_permits(1);
+ });
+ }
+
+ // Wait for all tasks to complete.
+ //
+ // Since this is a `current_thread` runtime, there's no race condition
+ // between the last permit being added and the task completing.
+ let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
+
+ let mut count = 0;
+ let mut coop_count = 0;
+ loop {
+ match set.join_next().now_or_never() {
+ Some(Some(Ok(()))) => {}
+ Some(Some(Err(err))) => panic!("failed: {}", err),
+ None => {
+ coop_count += 1;
+ tokio::task::yield_now().await;
+ continue;
+ }
+ Some(None) => break,
+ }
+
+ count += 1;
+ }
+ assert!(coop_count >= 1);
+ assert_eq!(count, TASK_NUM);
+ }
+}
diff --git a/tests/tcp_peek.rs b/tests/tcp_peek.rs
index 03813c2..b712023 100644
--- a/tests/tcp_peek.rs
+++ b/tests/tcp_peek.rs
@@ -15,7 +15,7 @@ async fn peek() {
let addr = listener.local_addr().unwrap();
let t = thread::spawn(move || assert_ok!(listener.accept()).0);
- let left = net::TcpStream::connect(&addr).unwrap();
+ let left = net::TcpStream::connect(addr).unwrap();
let mut right = t.join().unwrap();
let _ = right.write(&[1, 2, 3, 4]).unwrap();