diff options
Diffstat (limited to 'tests/sync_mpsc.rs')
-rw-r--r-- | tests/sync_mpsc.rs | 164 |
1 files changed, 122 insertions, 42 deletions
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index 1947d26..6e87096 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -1,18 +1,21 @@ #![allow(clippy::redundant_clone)] #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] +#![cfg(feature = "sync")] -use std::thread; -use tokio::runtime::Runtime; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; -use tokio_test::task; -use tokio_test::{ - assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, -}; +#[cfg(tokio_wasm_not_wasi)] +use wasm_bindgen_test::wasm_bindgen_test as test; +#[cfg(tokio_wasm_not_wasi)] +use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; +use std::fmt; use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +#[cfg(not(tokio_wasm_not_wasi))] +use tokio::test as maybe_tokio_test; +use tokio_test::*; +#[cfg(not(tokio_wasm))] mod support { pub(crate) mod mpsc_stream; } @@ -21,7 +24,7 @@ trait AssertSend: Send {} impl AssertSend for mpsc::Sender<i32> {} impl AssertSend for mpsc::Receiver<i32> {} -#[tokio::test] +#[maybe_tokio_test] async fn send_recv_with_buffer() { let (tx, mut rx) = mpsc::channel::<i32>(16); @@ -46,6 +49,7 @@ async fn send_recv_with_buffer() { } #[tokio::test] +#[cfg(feature = "full")] async fn reserve_disarm() { let (tx, mut rx) = mpsc::channel::<i32>(2); let tx1 = tx.clone(); @@ -58,10 +62,10 @@ async fn reserve_disarm() { let permit2 = assert_ok!(tx2.reserve().await); // But a third should not be ready - let mut r3 = task::spawn(tx3.reserve()); + let mut r3 = tokio_test::task::spawn(tx3.reserve()); assert_pending!(r3.poll()); - let mut r4 = task::spawn(tx4.reserve()); + let mut r4 = tokio_test::task::spawn(tx4.reserve()); assert_pending!(r4.poll()); // Using one of the reserved slots should allow a new handle to become ready @@ -78,11 +82,12 @@ async fn reserve_disarm() { drop(permit2); assert!(r4.is_woken()); - let mut r1 = task::spawn(tx1.reserve()); + let mut r1 = tokio_test::task::spawn(tx1.reserve()); assert_pending!(r1.poll()); } #[tokio::test] +#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads async fn send_recv_stream_with_buffer() { use tokio_stream::StreamExt; @@ -100,6 +105,7 @@ async fn send_recv_stream_with_buffer() { } #[tokio::test] +#[cfg(feature = "full")] async fn async_send_recv_with_buffer() { let (tx, mut rx) = mpsc::channel(16); @@ -114,10 +120,11 @@ async fn async_send_recv_with_buffer() { } #[tokio::test] +#[cfg(feature = "full")] async fn start_send_past_cap() { use std::future::Future; - let mut t1 = task::spawn(()); + let mut t1 = tokio_test::task::spawn(()); let (tx1, mut rx) = mpsc::channel(1); let tx2 = tx1.clone(); @@ -128,7 +135,7 @@ async fn start_send_past_cap() { t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx))); { - let mut r2 = task::spawn(tx2.reserve()); + let mut r2 = tokio_test::task::spawn(tx2.reserve()); assert_pending!(r2.poll()); drop(r1); @@ -147,11 +154,12 @@ async fn start_send_past_cap() { #[test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn buffer_gteq_one() { mpsc::channel::<i32>(0); } -#[tokio::test] +#[maybe_tokio_test] async fn send_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); @@ -168,6 +176,7 @@ async fn send_recv_unbounded() { } #[tokio::test] +#[cfg(feature = "full")] async fn async_send_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel(); @@ -182,6 +191,7 @@ async fn async_send_recv_unbounded() { } #[tokio::test] +#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads async fn send_recv_stream_unbounded() { use tokio_stream::StreamExt; @@ -199,32 +209,32 @@ async fn send_recv_stream_unbounded() { assert_eq!(None, rx.next().await); } -#[tokio::test] +#[maybe_tokio_test] async fn no_t_bounds_buffer() { struct NoImpls; let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug - println!("{:?}", tx); + is_debug(&tx); // same with Receiver - println!("{:?}", rx); + is_debug(&rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); assert!(rx.recv().await.is_some()); } -#[tokio::test] +#[maybe_tokio_test] async fn no_t_bounds_unbounded() { struct NoImpls; let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug - println!("{:?}", tx); + is_debug(&tx); // same with Receiver - println!("{:?}", rx); + is_debug(&rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().send(NoImpls).is_ok()); @@ -232,6 +242,7 @@ async fn no_t_bounds_unbounded() { } #[tokio::test] +#[cfg(feature = "full")] async fn send_recv_buffer_limited() { let (tx, mut rx) = mpsc::channel::<i32>(1); @@ -242,7 +253,7 @@ async fn send_recv_buffer_limited() { p1.send(1); // Not ready - let mut p2 = task::spawn(tx.reserve()); + let mut p2 = tokio_test::task::spawn(tx.reserve()); assert_pending!(p2.poll()); // Take the value @@ -261,7 +272,7 @@ async fn send_recv_buffer_limited() { assert!(rx.recv().await.is_some()); } -#[tokio::test] +#[maybe_tokio_test] async fn recv_close_gets_none_idle() { let (tx, mut rx) = mpsc::channel::<i32>(10); @@ -273,12 +284,13 @@ async fn recv_close_gets_none_idle() { } #[tokio::test] +#[cfg(feature = "full")] async fn recv_close_gets_none_reserved() { let (tx1, mut rx) = mpsc::channel::<i32>(1); let tx2 = tx1.clone(); let permit1 = assert_ok!(tx1.reserve().await); - let mut permit2 = task::spawn(tx2.reserve()); + let mut permit2 = tokio_test::task::spawn(tx2.reserve()); assert_pending!(permit2.poll()); rx.close(); @@ -287,7 +299,7 @@ async fn recv_close_gets_none_reserved() { assert_ready_err!(permit2.poll()); { - let mut recv = task::spawn(rx.recv()); + let mut recv = tokio_test::task::spawn(rx.recv()); assert_pending!(recv.poll()); permit1.send(123); @@ -300,13 +312,13 @@ async fn recv_close_gets_none_reserved() { assert!(rx.recv().await.is_none()); } -#[tokio::test] +#[maybe_tokio_test] async fn tx_close_gets_none() { let (_, mut rx) = mpsc::channel::<i32>(10); assert!(rx.recv().await.is_none()); } -#[tokio::test] +#[maybe_tokio_test] async fn try_send_fail() { let (tx, mut rx) = mpsc::channel(1); @@ -327,7 +339,7 @@ async fn try_send_fail() { assert!(rx.recv().await.is_none()); } -#[tokio::test] +#[maybe_tokio_test] async fn try_send_fail_with_try_recv() { let (tx, mut rx) = mpsc::channel(1); @@ -348,7 +360,7 @@ async fn try_send_fail_with_try_recv() { assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); } -#[tokio::test] +#[maybe_tokio_test] async fn try_reserve_fails() { let (tx, mut rx) = mpsc::channel(1); @@ -372,6 +384,7 @@ async fn try_reserve_fails() { } #[tokio::test] +#[cfg(feature = "full")] async fn drop_permit_releases_permit() { // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. @@ -380,7 +393,7 @@ async fn drop_permit_releases_permit() { let permit = assert_ok!(tx1.reserve().await); - let mut reserve2 = task::spawn(tx2.reserve()); + let mut reserve2 = tokio_test::task::spawn(tx2.reserve()); assert_pending!(reserve2.poll()); drop(permit); @@ -389,7 +402,7 @@ async fn drop_permit_releases_permit() { assert_ready_ok!(reserve2.poll()); } -#[tokio::test] +#[maybe_tokio_test] async fn dropping_rx_closes_channel() { let (tx, rx) = mpsc::channel(100); @@ -439,48 +452,57 @@ fn unconsumed_messages_are_dropped() { } #[test] +#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads fn blocking_recv() { let (tx, mut rx) = mpsc::channel::<u8>(1); - let sync_code = thread::spawn(move || { + let sync_code = std::thread::spawn(move || { assert_eq!(Some(10), rx.blocking_recv()); }); - Runtime::new().unwrap().block_on(async move { - let _ = tx.send(10).await; - }); + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let _ = tx.send(10).await; + }); sync_code.join().unwrap() } #[tokio::test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding async fn blocking_recv_async() { let (_tx, mut rx) = mpsc::channel::<()>(1); let _ = rx.blocking_recv(); } #[test] +#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads fn blocking_send() { let (tx, mut rx) = mpsc::channel::<u8>(1); - let sync_code = thread::spawn(move || { + let sync_code = std::thread::spawn(move || { tx.blocking_send(10).unwrap(); }); - Runtime::new().unwrap().block_on(async move { - assert_eq!(Some(10), rx.recv().await); - }); + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + assert_eq!(Some(10), rx.recv().await); + }); sync_code.join().unwrap() } #[tokio::test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding async fn blocking_send_async() { let (tx, _rx) = mpsc::channel::<()>(1); let _ = tx.blocking_send(()); } #[tokio::test] +#[cfg(feature = "full")] async fn ready_close_cancel_bounded() { let (tx, mut rx) = mpsc::channel::<()>(100); let _tx2 = tx.clone(); @@ -489,7 +511,7 @@ async fn ready_close_cancel_bounded() { rx.close(); - let mut recv = task::spawn(rx.recv()); + let mut recv = tokio_test::task::spawn(rx.recv()); assert_pending!(recv.poll()); drop(permit); @@ -500,13 +522,14 @@ async fn ready_close_cancel_bounded() { } #[tokio::test] +#[cfg(feature = "full")] async fn permit_available_not_acquired_close() { let (tx1, mut rx) = mpsc::channel::<()>(1); let tx2 = tx1.clone(); let permit1 = assert_ok!(tx1.reserve().await); - let mut permit2 = task::spawn(tx2.reserve()); + let mut permit2 = tokio_test::task::spawn(tx2.reserve()); assert_pending!(permit2.poll()); rx.close(); @@ -597,3 +620,60 @@ fn try_recv_close_while_empty_unbounded() { drop(tx); assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); } + +#[tokio::test(start_paused = true)] +#[cfg(feature = "full")] +async fn recv_timeout() { + use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout}; + use tokio::time::Duration; + + let (tx, rx) = mpsc::channel(5); + + assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(())); + assert_eq!( + tx.send_timeout(60, Duration::from_secs(1)).await, + Err(Timeout(60)) + ); + + drop(rx); + assert_eq!( + tx.send_timeout(70, Duration::from_secs(1)).await, + Err(Closed(70)) + ); +} + +#[test] +#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +fn recv_timeout_panic() { + use futures::future::FutureExt; + use tokio::time::Duration; + + let (tx, _rx) = mpsc::channel(5); + tx.send_timeout(10, Duration::from_secs(1)).now_or_never(); +} + +// Tests that channel `capacity` changes and `max_capacity` stays the same +#[tokio::test] +async fn test_tx_capacity() { + let (tx, _rx) = mpsc::channel::<()>(10); + // both capacities are same before + assert_eq!(tx.capacity(), 10); + assert_eq!(tx.max_capacity(), 10); + + let _permit = tx.reserve().await.unwrap(); + // after reserve, only capacity should drop by one + assert_eq!(tx.capacity(), 9); + assert_eq!(tx.max_capacity(), 10); + + tx.send(()).await.unwrap(); + // after send, capacity should drop by one again + assert_eq!(tx.capacity(), 8); + assert_eq!(tx.max_capacity(), 10); +} + +fn is_debug<T: fmt::Debug>(_: &T) {} |