diff options
Diffstat (limited to 'tests/stream_futures_ordered.rs')
-rw-r--r-- | tests/stream_futures_ordered.rs | 90 |
1 files changed, 88 insertions, 2 deletions
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs index 84e0bcc..5a4a3e2 100644 --- a/tests/stream_futures_ordered.rs +++ b/tests/stream_futures_ordered.rs @@ -2,6 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; use futures_test::task::noop_context; use std::any::Any; @@ -26,7 +27,6 @@ fn works_1() { assert_eq!(None, iter.next()); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::<i32>(); @@ -47,6 +47,69 @@ fn works_2() { } #[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + +#[test] fn from_iterator() { let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] .into_iter() @@ -55,7 +118,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } -#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn queue_never_unblocked() { let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); @@ -84,3 +146,27 @@ fn queue_never_unblocked() { assert!(stream.poll_next_unpin(cx).is_pending()); assert!(stream.poll_next_unpin(cx).is_pending()); } + +#[test] +fn test_push_front_negative() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_front(a_rx); + stream.push_front(b_rx); + stream.push_front(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // These should all be recieved in reverse order + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); +} |