aboutsummaryrefslogtreecommitdiff
path: root/tests/stream_futures_ordered.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/stream_futures_ordered.rs')
-rw-r--r--tests/stream_futures_ordered.rs90
1 files changed, 88 insertions, 2 deletions
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs
index 84e0bcc..5a4a3e2 100644
--- a/tests/stream_futures_ordered.rs
+++ b/tests/stream_futures_ordered.rs
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{FuturesOrdered, StreamExt};
+use futures::task::Poll;
use futures_test::task::noop_context;
use std::any::Any;
@@ -26,7 +27,6 @@ fn works_1() {
assert_eq!(None, iter.next());
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -47,6 +47,69 @@ fn works_2() {
}
#[test]
+fn test_push_front() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // 1 and 2 should be received in order
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_front(d_rx);
+ d_tx.send(4).unwrap();
+
+ // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
+ // and then 3 after it
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
+fn test_push_back() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // All results should be received in order
+
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_back(d_rx);
+ d_tx.send(4).unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
fn from_iterator() {
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
.into_iter()
@@ -55,7 +118,6 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn queue_never_unblocked() {
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
@@ -84,3 +146,27 @@ fn queue_never_unblocked() {
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
+
+#[test]
+fn test_push_front_negative() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_front(a_rx);
+ stream.push_front(b_rx);
+ stream.push_front(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // These should all be recieved in reverse order
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+}