aboutsummaryrefslogtreecommitdiff
path: root/tests/sync_mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/sync_mpsc.rs')
-rw-r--r--tests/sync_mpsc.rs164
1 files changed, 122 insertions, 42 deletions
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs
index 1947d26..6e87096 100644
--- a/tests/sync_mpsc.rs
+++ b/tests/sync_mpsc.rs
@@ -1,18 +1,21 @@
#![allow(clippy::redundant_clone)]
#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
+#![cfg(feature = "sync")]
-use std::thread;
-use tokio::runtime::Runtime;
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
-use tokio_test::task;
-use tokio_test::{
- assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
-};
+#[cfg(tokio_wasm_not_wasi)]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+#[cfg(tokio_wasm_not_wasi)]
+use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
+use std::fmt;
use std::sync::Arc;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
+#[cfg(not(tokio_wasm_not_wasi))]
+use tokio::test as maybe_tokio_test;
+use tokio_test::*;
+#[cfg(not(tokio_wasm))]
mod support {
pub(crate) mod mpsc_stream;
}
@@ -21,7 +24,7 @@ trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
-#[tokio::test]
+#[maybe_tokio_test]
async fn send_recv_with_buffer() {
let (tx, mut rx) = mpsc::channel::<i32>(16);
@@ -46,6 +49,7 @@ async fn send_recv_with_buffer() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn reserve_disarm() {
let (tx, mut rx) = mpsc::channel::<i32>(2);
let tx1 = tx.clone();
@@ -58,10 +62,10 @@ async fn reserve_disarm() {
let permit2 = assert_ok!(tx2.reserve().await);
// But a third should not be ready
- let mut r3 = task::spawn(tx3.reserve());
+ let mut r3 = tokio_test::task::spawn(tx3.reserve());
assert_pending!(r3.poll());
- let mut r4 = task::spawn(tx4.reserve());
+ let mut r4 = tokio_test::task::spawn(tx4.reserve());
assert_pending!(r4.poll());
// Using one of the reserved slots should allow a new handle to become ready
@@ -78,11 +82,12 @@ async fn reserve_disarm() {
drop(permit2);
assert!(r4.is_woken());
- let mut r1 = task::spawn(tx1.reserve());
+ let mut r1 = tokio_test::task::spawn(tx1.reserve());
assert_pending!(r1.poll());
}
#[tokio::test]
+#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
async fn send_recv_stream_with_buffer() {
use tokio_stream::StreamExt;
@@ -100,6 +105,7 @@ async fn send_recv_stream_with_buffer() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn async_send_recv_with_buffer() {
let (tx, mut rx) = mpsc::channel(16);
@@ -114,10 +120,11 @@ async fn async_send_recv_with_buffer() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn start_send_past_cap() {
use std::future::Future;
- let mut t1 = task::spawn(());
+ let mut t1 = tokio_test::task::spawn(());
let (tx1, mut rx) = mpsc::channel(1);
let tx2 = tx1.clone();
@@ -128,7 +135,7 @@ async fn start_send_past_cap() {
t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
{
- let mut r2 = task::spawn(tx2.reserve());
+ let mut r2 = tokio_test::task::spawn(tx2.reserve());
assert_pending!(r2.poll());
drop(r1);
@@ -147,11 +154,12 @@ async fn start_send_past_cap() {
#[test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn buffer_gteq_one() {
mpsc::channel::<i32>(0);
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn send_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
@@ -168,6 +176,7 @@ async fn send_recv_unbounded() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn async_send_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -182,6 +191,7 @@ async fn async_send_recv_unbounded() {
}
#[tokio::test]
+#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
async fn send_recv_stream_unbounded() {
use tokio_stream::StreamExt;
@@ -199,32 +209,32 @@ async fn send_recv_stream_unbounded() {
assert_eq!(None, rx.next().await);
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn no_t_bounds_buffer() {
struct NoImpls;
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
- println!("{:?}", tx);
+ is_debug(&tx);
// same with Receiver
- println!("{:?}", rx);
+ is_debug(&rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
assert!(rx.recv().await.is_some());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn no_t_bounds_unbounded() {
struct NoImpls;
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
- println!("{:?}", tx);
+ is_debug(&tx);
// same with Receiver
- println!("{:?}", rx);
+ is_debug(&rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().send(NoImpls).is_ok());
@@ -232,6 +242,7 @@ async fn no_t_bounds_unbounded() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn send_recv_buffer_limited() {
let (tx, mut rx) = mpsc::channel::<i32>(1);
@@ -242,7 +253,7 @@ async fn send_recv_buffer_limited() {
p1.send(1);
// Not ready
- let mut p2 = task::spawn(tx.reserve());
+ let mut p2 = tokio_test::task::spawn(tx.reserve());
assert_pending!(p2.poll());
// Take the value
@@ -261,7 +272,7 @@ async fn send_recv_buffer_limited() {
assert!(rx.recv().await.is_some());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn recv_close_gets_none_idle() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
@@ -273,12 +284,13 @@ async fn recv_close_gets_none_idle() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn recv_close_gets_none_reserved() {
let (tx1, mut rx) = mpsc::channel::<i32>(1);
let tx2 = tx1.clone();
let permit1 = assert_ok!(tx1.reserve().await);
- let mut permit2 = task::spawn(tx2.reserve());
+ let mut permit2 = tokio_test::task::spawn(tx2.reserve());
assert_pending!(permit2.poll());
rx.close();
@@ -287,7 +299,7 @@ async fn recv_close_gets_none_reserved() {
assert_ready_err!(permit2.poll());
{
- let mut recv = task::spawn(rx.recv());
+ let mut recv = tokio_test::task::spawn(rx.recv());
assert_pending!(recv.poll());
permit1.send(123);
@@ -300,13 +312,13 @@ async fn recv_close_gets_none_reserved() {
assert!(rx.recv().await.is_none());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn tx_close_gets_none() {
let (_, mut rx) = mpsc::channel::<i32>(10);
assert!(rx.recv().await.is_none());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn try_send_fail() {
let (tx, mut rx) = mpsc::channel(1);
@@ -327,7 +339,7 @@ async fn try_send_fail() {
assert!(rx.recv().await.is_none());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn try_send_fail_with_try_recv() {
let (tx, mut rx) = mpsc::channel(1);
@@ -348,7 +360,7 @@ async fn try_send_fail_with_try_recv() {
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn try_reserve_fails() {
let (tx, mut rx) = mpsc::channel(1);
@@ -372,6 +384,7 @@ async fn try_reserve_fails() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn drop_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
@@ -380,7 +393,7 @@ async fn drop_permit_releases_permit() {
let permit = assert_ok!(tx1.reserve().await);
- let mut reserve2 = task::spawn(tx2.reserve());
+ let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
assert_pending!(reserve2.poll());
drop(permit);
@@ -389,7 +402,7 @@ async fn drop_permit_releases_permit() {
assert_ready_ok!(reserve2.poll());
}
-#[tokio::test]
+#[maybe_tokio_test]
async fn dropping_rx_closes_channel() {
let (tx, rx) = mpsc::channel(100);
@@ -439,48 +452,57 @@ fn unconsumed_messages_are_dropped() {
}
#[test]
+#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
fn blocking_recv() {
let (tx, mut rx) = mpsc::channel::<u8>(1);
- let sync_code = thread::spawn(move || {
+ let sync_code = std::thread::spawn(move || {
assert_eq!(Some(10), rx.blocking_recv());
});
- Runtime::new().unwrap().block_on(async move {
- let _ = tx.send(10).await;
- });
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(async move {
+ let _ = tx.send(10).await;
+ });
sync_code.join().unwrap()
}
#[tokio::test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
async fn blocking_recv_async() {
let (_tx, mut rx) = mpsc::channel::<()>(1);
let _ = rx.blocking_recv();
}
#[test]
+#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
fn blocking_send() {
let (tx, mut rx) = mpsc::channel::<u8>(1);
- let sync_code = thread::spawn(move || {
+ let sync_code = std::thread::spawn(move || {
tx.blocking_send(10).unwrap();
});
- Runtime::new().unwrap().block_on(async move {
- assert_eq!(Some(10), rx.recv().await);
- });
+ tokio::runtime::Runtime::new()
+ .unwrap()
+ .block_on(async move {
+ assert_eq!(Some(10), rx.recv().await);
+ });
sync_code.join().unwrap()
}
#[tokio::test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
async fn blocking_send_async() {
let (tx, _rx) = mpsc::channel::<()>(1);
let _ = tx.blocking_send(());
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn ready_close_cancel_bounded() {
let (tx, mut rx) = mpsc::channel::<()>(100);
let _tx2 = tx.clone();
@@ -489,7 +511,7 @@ async fn ready_close_cancel_bounded() {
rx.close();
- let mut recv = task::spawn(rx.recv());
+ let mut recv = tokio_test::task::spawn(rx.recv());
assert_pending!(recv.poll());
drop(permit);
@@ -500,13 +522,14 @@ async fn ready_close_cancel_bounded() {
}
#[tokio::test]
+#[cfg(feature = "full")]
async fn permit_available_not_acquired_close() {
let (tx1, mut rx) = mpsc::channel::<()>(1);
let tx2 = tx1.clone();
let permit1 = assert_ok!(tx1.reserve().await);
- let mut permit2 = task::spawn(tx2.reserve());
+ let mut permit2 = tokio_test::task::spawn(tx2.reserve());
assert_pending!(permit2.poll());
rx.close();
@@ -597,3 +620,60 @@ fn try_recv_close_while_empty_unbounded() {
drop(tx);
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
}
+
+#[tokio::test(start_paused = true)]
+#[cfg(feature = "full")]
+async fn recv_timeout() {
+ use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
+ use tokio::time::Duration;
+
+ let (tx, rx) = mpsc::channel(5);
+
+ assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
+ assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
+ assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
+ assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
+ assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
+ assert_eq!(
+ tx.send_timeout(60, Duration::from_secs(1)).await,
+ Err(Timeout(60))
+ );
+
+ drop(rx);
+ assert_eq!(
+ tx.send_timeout(70, Duration::from_secs(1)).await,
+ Err(Closed(70))
+ );
+}
+
+#[test]
+#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
+fn recv_timeout_panic() {
+ use futures::future::FutureExt;
+ use tokio::time::Duration;
+
+ let (tx, _rx) = mpsc::channel(5);
+ tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
+}
+
+// Tests that channel `capacity` changes and `max_capacity` stays the same
+#[tokio::test]
+async fn test_tx_capacity() {
+ let (tx, _rx) = mpsc::channel::<()>(10);
+ // both capacities are same before
+ assert_eq!(tx.capacity(), 10);
+ assert_eq!(tx.max_capacity(), 10);
+
+ let _permit = tx.reserve().await.unwrap();
+ // after reserve, only capacity should drop by one
+ assert_eq!(tx.capacity(), 9);
+ assert_eq!(tx.max_capacity(), 10);
+
+ tx.send(()).await.unwrap();
+ // after send, capacity should drop by one again
+ assert_eq!(tx.capacity(), 8);
+ assert_eq!(tx.max_capacity(), 10);
+}
+
+fn is_debug<T: fmt::Debug>(_: &T) {}