diff options
Diffstat (limited to 'tests/mpsc.rs')
-rw-r--r-- | tests/mpsc.rs | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 61c5a50..da0899d 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -1,13 +1,13 @@ use futures::channel::{mpsc, oneshot}; use futures::executor::{block_on, block_on_stream}; -use futures::future::{FutureExt, poll_fn}; -use futures::stream::{Stream, StreamExt}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; use futures::task::{Context, Poll}; -use futures::pin_mut; use futures_test::task::{new_count_waker, noop_context}; -use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread; trait AssertSend: Send {} @@ -77,7 +77,7 @@ fn send_shared_recv() { fn send_recv_threads() { let (mut tx, rx) = mpsc::channel::<i32>(16); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { block_on(tx.send(1)).unwrap(); }); @@ -200,11 +200,14 @@ fn tx_close_gets_none() { #[test] fn stress_shared_unbounded() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::<i32>(); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -215,7 +218,7 @@ fn stress_shared_unbounded() { for _ in 0..NTHREADS { let tx = tx.clone(); - thread::spawn(move|| { + thread::spawn(move || { for _ in 0..AMT { tx.unbounded_send(1).unwrap(); } @@ -229,11 +232,14 @@ fn stress_shared_unbounded() { #[test] fn stress_shared_bounded_hard() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::<i32>(0); - let t = thread::spawn(move|| { + let t = thread::spawn(move || { let result: Vec<_> = block_on(rx.collect()); assert_eq!(result.len(), (AMT * NTHREADS) as usize); for item in result { @@ -259,6 +265,9 @@ fn stress_shared_bounded_hard() { #[allow(clippy::same_item_push)] #[test] fn stress_receiver_multi_task_bounded_hard() { + #[cfg(miri)] + const AMT: usize = 100; + #[cfg(not(miri))] const AMT: usize = 10_000; const NTHREADS: u32 = 2; @@ -297,9 +306,9 @@ fn stress_receiver_multi_task_bounded_hard() { } Poll::Ready(None) => { *rx_opt = None; - break - }, - Poll::Pending => {}, + break; + } + Poll::Pending => {} } } } else { @@ -311,7 +320,6 @@ fn stress_receiver_multi_task_bounded_hard() { th.push(t); } - for i in 0..AMT { block_on(tx.send(i)).unwrap(); } @@ -328,7 +336,12 @@ fn stress_receiver_multi_task_bounded_hard() { /// after sender dropped. #[test] fn stress_drop_sender() { - fn list() -> impl Stream<Item=i32> { + #[cfg(miri)] + const ITER: usize = 100; + #[cfg(not(miri))] + const ITER: usize = 10000; + + fn list() -> impl Stream<Item = i32> { let (tx, rx) = mpsc::channel(1); thread::spawn(move || { block_on(send_one_two_three(tx)); @@ -336,7 +349,7 @@ fn stress_drop_sender() { rx } - for _ in 0..10000 { + for _ in 0..ITER { let v: Vec<_> = block_on(list().collect()); assert_eq!(v, vec![1, 2, 3]); } @@ -381,9 +394,12 @@ fn stress_close_receiver_iter() { } } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn stress_close_receiver() { - for _ in 0..10000 { + const ITER: usize = 10000; + + for _ in 0..ITER { stress_close_receiver_iter(); } } @@ -398,6 +414,9 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { #[allow(clippy::same_item_push)] #[test] fn stress_poll_ready() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] const AMT: u32 = 1000; const NTHREADS: u32 = 8; @@ -407,9 +426,7 @@ fn stress_poll_ready() { let mut threads = Vec::new(); for _ in 0..NTHREADS { let sender = tx.clone(); - threads.push(thread::spawn(move || { - block_on(stress_poll_ready_sender(sender, AMT)) - })); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); } drop(tx); @@ -427,6 +444,7 @@ fn stress_poll_ready() { stress(16); } +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn try_send_1() { const N: usize = 3000; @@ -436,7 +454,7 @@ fn try_send_1() { for i in 0..N { loop { if tx.try_send(i).is_ok() { - break + break; } } } @@ -542,8 +560,8 @@ fn is_connected_to() { #[test] fn hash_receiver() { - use std::hash::Hasher; use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; let mut hasher_a1 = DefaultHasher::new(); let mut hasher_a2 = DefaultHasher::new(); |