diff options
Diffstat (limited to 'tests/sync_mpsc.rs')
-rw-r--r-- | tests/sync_mpsc.rs | 401 |
1 files changed, 197 insertions, 204 deletions
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index f02d90a..adefcb1 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -2,6 +2,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::thread; +use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio_test::task; @@ -15,74 +17,72 @@ trait AssertSend: Send {} impl AssertSend for mpsc::Sender<i32> {} impl AssertSend for mpsc::Receiver<i32> {} -#[test] -fn send_recv_with_buffer() { - let (tx, rx) = mpsc::channel::<i32>(16); - let mut tx = task::spawn(tx); - let mut rx = task::spawn(rx); +#[tokio::test] +async fn send_recv_with_buffer() { + let (tx, mut rx) = mpsc::channel::<i32>(16); // Using poll_ready / try_send - assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); - tx.try_send(1).unwrap(); + // let permit assert_ready_ok!(tx.reserve()); + let permit = tx.reserve().await.unwrap(); + permit.send(1); // Without poll_ready tx.try_send(2).unwrap(); drop(tx); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert_eq!(val, Some(1)); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert_eq!(val, Some(2)); - let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + let val = rx.recv().await; assert!(val.is_none()); } -#[test] -fn disarm() { - let (tx, rx) = mpsc::channel::<i32>(2); - let mut tx1 = task::spawn(tx.clone()); - let mut tx2 = task::spawn(tx.clone()); - let mut tx3 = task::spawn(tx.clone()); - let mut tx4 = task::spawn(tx); - let mut rx = task::spawn(rx); +#[tokio::test] +async fn reserve_disarm() { + let (tx, mut rx) = mpsc::channel::<i32>(2); + let tx1 = tx.clone(); + let tx2 = tx.clone(); + let tx3 = tx.clone(); + let tx4 = tx; // We should be able to `poll_ready` two handles without problem - assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); + let permit1 = assert_ok!(tx1.reserve().await); + let permit2 = assert_ok!(tx2.reserve().await); // But a third should not be ready - assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + let mut r3 = task::spawn(tx3.reserve()); + assert_pending!(r3.poll()); + + let mut r4 = task::spawn(tx4.reserve()); + assert_pending!(r4.poll()); // Using one of the reserved slots should allow a new handle to become ready - tx1.try_send(1).unwrap(); + permit1.send(1); + // We also need to receive for the slot to be free - let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); + assert!(!r3.is_woken()); + rx.recv().await.unwrap(); // Now there's a free slot! - assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); - - // Dropping a ready handle should also open up a slot - drop(tx2); - assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); - assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + assert!(r3.is_woken()); + assert!(!r4.is_woken()); - // Explicitly disarming a handle should also open a slot - assert!(tx3.disarm()); - assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + // Dropping a permit should also open up a slot + drop(permit2); + assert!(r4.is_woken()); - // Disarming a non-armed sender does not free up a slot - assert!(!tx3.disarm()); - assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + let mut r1 = task::spawn(tx1.reserve()); + assert_pending!(r1.poll()); } #[tokio::test] async fn send_recv_stream_with_buffer() { use tokio::stream::StreamExt; - let (mut tx, mut rx) = mpsc::channel::<i32>(16); + let (tx, mut rx) = mpsc::channel::<i32>(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -96,7 +96,7 @@ async fn send_recv_stream_with_buffer() { #[tokio::test] async fn async_send_recv_with_buffer() { - let (mut tx, mut rx) = mpsc::channel(16); + let (tx, mut rx) = mpsc::channel(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -108,37 +108,36 @@ async fn async_send_recv_with_buffer() { assert_eq!(None, rx.recv().await); } -#[test] -fn start_send_past_cap() { +#[tokio::test] +async fn start_send_past_cap() { + use std::future::Future; + let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - let mut t3 = task::spawn(()); - let (mut tx1, mut rx) = mpsc::channel(1); - let mut tx2 = tx1.clone(); + let (tx1, mut rx) = mpsc::channel(1); + let tx2 = tx1.clone(); assert_ok!(tx1.try_send(())); - t1.enter(|cx, _| { - assert_pending!(tx1.poll_ready(cx)); - }); + let mut r1 = Box::pin(tx1.reserve()); + t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx))); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); + { + let mut r2 = task::spawn(tx2.reserve()); + assert_pending!(r2.poll()); - drop(tx1); + drop(r1); - let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); - assert!(t2.is_woken()); - assert!(!t1.is_woken()); + assert!(r2.is_woken()); + assert!(!t1.is_woken()); + } + drop(tx1); drop(tx2); - let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); - assert!(val.is_none()); + assert!(rx.recv().await.is_none()); } #[test] @@ -147,26 +146,20 @@ fn buffer_gteq_one() { mpsc::channel::<i32>(0); } -#[test] -fn send_recv_unbounded() { - let mut t1 = task::spawn(()); - +#[tokio::test] +async fn send_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); // Using `try_send` assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some(1)); - - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some(2)); + assert_eq!(rx.recv().await, Some(1)); + assert_eq!(rx.recv().await, Some(2)); drop(tx); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_none()); + assert!(rx.recv().await.is_none()); } #[tokio::test] @@ -199,11 +192,10 @@ async fn send_recv_stream_unbounded() { assert_eq!(None, rx.next().await); } -#[test] -fn no_t_bounds_buffer() { +#[tokio::test] +async fn no_t_bounds_buffer() { struct NoImpls; - let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug @@ -213,15 +205,13 @@ fn no_t_bounds_buffer() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); } -#[test] -fn no_t_bounds_unbounded() { +#[tokio::test] +async fn no_t_bounds_unbounded() { struct NoImpls; - let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug @@ -231,133 +221,87 @@ fn no_t_bounds_unbounded() { // and sender should be Clone even though T isn't Clone assert!(tx.clone().send(NoImpls).is_ok()); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_some()); + assert!(rx.recv().await.is_some()); } -#[test] -fn send_recv_buffer_limited() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel::<i32>(1); - - // Run on a task context - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); - - // Send first message - assert_ok!(tx.try_send(1)); - - // Not ready - assert_pending!(tx.poll_ready(cx)); +#[tokio::test] +async fn send_recv_buffer_limited() { + let (tx, mut rx) = mpsc::channel::<i32>(1); - // Send second message - assert_err!(tx.try_send(1337)); - }); + // Reserve capacity + let p1 = assert_ok!(tx.reserve().await); - t2.enter(|cx, _| { - // Take the value - let val = assert_ready!(rx.poll_recv(cx)); - assert_eq!(Some(1), val); - }); + // Send first message + p1.send(1); - assert!(t1.is_woken()); + // Not ready + let mut p2 = task::spawn(tx.reserve()); + assert_pending!(p2.poll()); - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); + // Take the value + assert!(rx.recv().await.is_some()); - assert_ok!(tx.try_send(2)); + // Notified + assert!(p2.is_woken()); - // Not ready - assert_pending!(tx.poll_ready(cx)); - }); + // Trying to send fails + assert_err!(tx.try_send(1337)); - t2.enter(|cx, _| { - // Take the value - let val = assert_ready!(rx.poll_recv(cx)); - assert_eq!(Some(2), val); - }); + // Send second + let permit = assert_ready_ok!(p2.poll()); + permit.send(2); - t1.enter(|cx, _| { - assert_ready_ok!(tx.poll_ready(cx)); - }); + assert!(rx.recv().await.is_some()); } -#[test] -fn recv_close_gets_none_idle() { - let mut t1 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel::<i32>(10); +#[tokio::test] +async fn recv_close_gets_none_idle() { + let (tx, mut rx) = mpsc::channel::<i32>(10); rx.close(); - t1.enter(|cx, _| { - let val = assert_ready!(rx.poll_recv(cx)); - assert!(val.is_none()); - assert_ready_err!(tx.poll_ready(cx)); - }); -} - -#[test] -fn recv_close_gets_none_reserved() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - let mut t3 = task::spawn(()); + assert!(rx.recv().await.is_none()); - let (mut tx1, mut rx) = mpsc::channel::<i32>(1); - let mut tx2 = tx1.clone(); + assert_err!(tx.send(1).await); +} - assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); +#[tokio::test] +async fn recv_close_gets_none_reserved() { + let (tx1, mut rx) = mpsc::channel::<i32>(1); + let tx2 = tx1.clone(); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); + let permit1 = assert_ok!(tx1.reserve().await); + let mut permit2 = task::spawn(tx2.reserve()); + assert_pending!(permit2.poll()); rx.close(); - assert!(t2.is_woken()); - - t2.enter(|cx, _| { - assert_ready_err!(tx2.poll_ready(cx)); - }); - - t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); - - assert!(!t1.is_woken()); - assert!(!t2.is_woken()); + assert!(permit2.is_woken()); + assert_ready_err!(permit2.poll()); - assert_ok!(tx1.try_send(123)); + { + let mut recv = task::spawn(rx.recv()); + assert_pending!(recv.poll()); - assert!(t3.is_woken()); + permit1.send(123); + assert!(recv.is_woken()); - t3.enter(|cx, _| { - let v = assert_ready!(rx.poll_recv(cx)); + let v = assert_ready!(recv.poll()); assert_eq!(v, Some(123)); + } - let v = assert_ready!(rx.poll_recv(cx)); - assert!(v.is_none()); - }); + assert!(rx.recv().await.is_none()); } -#[test] -fn tx_close_gets_none() { - let mut t1 = task::spawn(()); - +#[tokio::test] +async fn tx_close_gets_none() { let (_, mut rx) = mpsc::channel::<i32>(10); - - // Run on a task context - t1.enter(|cx, _| { - let v = assert_ready!(rx.poll_recv(cx)); - assert!(v.is_none()); - }); + assert!(rx.recv().await.is_none()); } -#[test] -fn try_send_fail() { - let mut t1 = task::spawn(()); - - let (mut tx, mut rx) = mpsc::channel(1); +#[tokio::test] +async fn try_send_fail() { + let (tx, mut rx) = mpsc::channel(1); tx.try_send("hello").unwrap(); @@ -367,60 +311,48 @@ fn try_send_fail() { _ => panic!(), } - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some("hello")); + assert_eq!(rx.recv().await, Some("hello")); assert_ok!(tx.try_send("goodbye")); drop(tx); - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert_eq!(val, Some("goodbye")); - - let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); - assert!(val.is_none()); + assert_eq!(rx.recv().await, Some("goodbye")); + assert!(rx.recv().await.is_none()); } -#[test] -fn drop_tx_with_permit_releases_permit() { - let mut t1 = task::spawn(()); - let mut t2 = task::spawn(()); - +#[tokio::test] +async fn drop_permit_releases_permit() { // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. - let (mut tx1, _rx) = mpsc::channel::<i32>(1); - let mut tx2 = tx1.clone(); + let (tx1, _rx) = mpsc::channel::<i32>(1); + let tx2 = tx1.clone(); - assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + let permit = assert_ok!(tx1.reserve().await); - t2.enter(|cx, _| { - assert_pending!(tx2.poll_ready(cx)); - }); - - drop(tx1); + let mut reserve2 = task::spawn(tx2.reserve()); + assert_pending!(reserve2.poll()); - assert!(t2.is_woken()); + drop(permit); - assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); + assert!(reserve2.is_woken()); + assert_ready_ok!(reserve2.poll()); } -#[test] -fn dropping_rx_closes_channel() { - let mut t1 = task::spawn(()); - - let (mut tx, rx) = mpsc::channel(100); +#[tokio::test] +async fn dropping_rx_closes_channel() { + let (tx, rx) = mpsc::channel(100); let msg = Arc::new(()); assert_ok!(tx.try_send(msg.clone())); drop(rx); - assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); - + assert_err!(tx.reserve().await); assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn dropping_rx_closes_channel_for_try() { - let (mut tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100); let msg = Arc::new(()); tx.try_send(msg.clone()).unwrap(); @@ -442,7 +374,7 @@ fn dropping_rx_closes_channel_for_try() { fn unconsumed_messages_are_dropped() { let msg = Arc::new(()); - let (mut tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel(100); tx.try_send(msg.clone()).unwrap(); @@ -455,7 +387,7 @@ fn unconsumed_messages_are_dropped() { #[test] fn try_recv() { - let (mut tx, mut rx) = mpsc::channel(1); + let (tx, mut rx) = mpsc::channel(1); match rx.try_recv() { Err(TryRecvError::Empty) => {} _ => panic!(), @@ -490,3 +422,64 @@ fn try_recv_unbounded() { _ => panic!(), } } + +#[test] +fn blocking_recv() { + let (tx, mut rx) = mpsc::channel::<u8>(1); + + let sync_code = thread::spawn(move || { + assert_eq!(Some(10), rx.blocking_recv()); + }); + + Runtime::new().unwrap().block_on(async move { + let _ = tx.send(10).await; + }); + sync_code.join().unwrap() +} + +#[tokio::test] +#[should_panic] +async fn blocking_recv_async() { + let (_tx, mut rx) = mpsc::channel::<()>(1); + let _ = rx.blocking_recv(); +} + +#[test] +fn blocking_send() { + let (tx, mut rx) = mpsc::channel::<u8>(1); + + let sync_code = thread::spawn(move || { + tx.blocking_send(10).unwrap(); + }); + + Runtime::new().unwrap().block_on(async move { + assert_eq!(Some(10), rx.recv().await); + }); + sync_code.join().unwrap() +} + +#[tokio::test] +#[should_panic] +async fn blocking_send_async() { + let (tx, _rx) = mpsc::channel::<()>(1); + let _ = tx.blocking_send(()); +} + +#[tokio::test] +async fn ready_close_cancel_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + let permit = assert_ok!(tx.reserve().await); + + rx.close(); + + let mut recv = task::spawn(rx.recv()); + assert_pending!(recv.poll()); + + drop(permit); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert!(val.is_none()); +} |