aboutsummaryrefslogtreecommitdiff
path: root/tests/mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/mpsc.rs')
-rw-r--r--tests/mpsc.rs58
1 files changed, 38 insertions, 20 deletions
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
index 61c5a50..da0899d 100644
--- a/tests/mpsc.rs
+++ b/tests/mpsc.rs
@@ -1,13 +1,13 @@
use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
-use futures::future::{FutureExt, poll_fn};
-use futures::stream::{Stream, StreamExt};
+use futures::future::{poll_fn, FutureExt};
+use futures::pin_mut;
use futures::sink::{Sink, SinkExt};
+use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
-use futures::pin_mut;
use futures_test::task::{new_count_waker, noop_context};
-use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
use std::thread;
trait AssertSend: Send {}
@@ -77,7 +77,7 @@ fn send_shared_recv() {
fn send_recv_threads() {
let (mut tx, rx) = mpsc::channel::<i32>(16);
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
block_on(tx.send(1)).unwrap();
});
@@ -200,11 +200,14 @@ fn tx_close_gets_none() {
#[test]
fn stress_shared_unbounded() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::unbounded::<i32>();
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -215,7 +218,7 @@ fn stress_shared_unbounded() {
for _ in 0..NTHREADS {
let tx = tx.clone();
- thread::spawn(move|| {
+ thread::spawn(move || {
for _ in 0..AMT {
tx.unbounded_send(1).unwrap();
}
@@ -229,11 +232,14 @@ fn stress_shared_unbounded() {
#[test]
fn stress_shared_bounded_hard() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 10000;
const NTHREADS: u32 = 8;
let (tx, rx) = mpsc::channel::<i32>(0);
- let t = thread::spawn(move|| {
+ let t = thread::spawn(move || {
let result: Vec<_> = block_on(rx.collect());
assert_eq!(result.len(), (AMT * NTHREADS) as usize);
for item in result {
@@ -259,6 +265,9 @@ fn stress_shared_bounded_hard() {
#[allow(clippy::same_item_push)]
#[test]
fn stress_receiver_multi_task_bounded_hard() {
+ #[cfg(miri)]
+ const AMT: usize = 100;
+ #[cfg(not(miri))]
const AMT: usize = 10_000;
const NTHREADS: u32 = 2;
@@ -297,9 +306,9 @@ fn stress_receiver_multi_task_bounded_hard() {
}
Poll::Ready(None) => {
*rx_opt = None;
- break
- },
- Poll::Pending => {},
+ break;
+ }
+ Poll::Pending => {}
}
}
} else {
@@ -311,7 +320,6 @@ fn stress_receiver_multi_task_bounded_hard() {
th.push(t);
}
-
for i in 0..AMT {
block_on(tx.send(i)).unwrap();
}
@@ -328,7 +336,12 @@ fn stress_receiver_multi_task_bounded_hard() {
/// after sender dropped.
#[test]
fn stress_drop_sender() {
- fn list() -> impl Stream<Item=i32> {
+ #[cfg(miri)]
+ const ITER: usize = 100;
+ #[cfg(not(miri))]
+ const ITER: usize = 10000;
+
+ fn list() -> impl Stream<Item = i32> {
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
block_on(send_one_two_three(tx));
@@ -336,7 +349,7 @@ fn stress_drop_sender() {
rx
}
- for _ in 0..10000 {
+ for _ in 0..ITER {
let v: Vec<_> = block_on(list().collect());
assert_eq!(v, vec![1, 2, 3]);
}
@@ -381,9 +394,12 @@ fn stress_close_receiver_iter() {
}
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn stress_close_receiver() {
- for _ in 0..10000 {
+ const ITER: usize = 10000;
+
+ for _ in 0..ITER {
stress_close_receiver_iter();
}
}
@@ -398,6 +414,9 @@ async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
#[allow(clippy::same_item_push)]
#[test]
fn stress_poll_ready() {
+ #[cfg(miri)]
+ const AMT: u32 = 100;
+ #[cfg(not(miri))]
const AMT: u32 = 1000;
const NTHREADS: u32 = 8;
@@ -407,9 +426,7 @@ fn stress_poll_ready() {
let mut threads = Vec::new();
for _ in 0..NTHREADS {
let sender = tx.clone();
- threads.push(thread::spawn(move || {
- block_on(stress_poll_ready_sender(sender, AMT))
- }));
+ threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
}
drop(tx);
@@ -427,6 +444,7 @@ fn stress_poll_ready() {
stress(16);
}
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn try_send_1() {
const N: usize = 3000;
@@ -436,7 +454,7 @@ fn try_send_1() {
for i in 0..N {
loop {
if tx.try_send(i).is_ok() {
- break
+ break;
}
}
}
@@ -542,8 +560,8 @@ fn is_connected_to() {
#[test]
fn hash_receiver() {
- use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
let mut hasher_a1 = DefaultHasher::new();
let mut hasher_a2 = DefaultHasher::new();