diff options
Diffstat (limited to 'tests/stream_timeout.rs')
-rw-r--r-- | tests/stream_timeout.rs | 109 |
1 files changed, 0 insertions, 109 deletions
diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs deleted file mode 100644 index a787bba..0000000 --- a/tests/stream_timeout.rs +++ /dev/null @@ -1,109 +0,0 @@ -#![cfg(feature = "full")] - -use tokio::stream::{self, StreamExt}; -use tokio::time::{self, sleep, Duration}; -use tokio_test::*; - -use futures::StreamExt as _; - -async fn maybe_sleep(idx: i32) -> i32 { - if idx % 2 == 0 { - sleep(ms(200)).await; - } - idx -} - -fn ms(n: u64) -> Duration { - Duration::from_millis(n) -} - -#[tokio::test] -async fn basic_usage() { - time::pause(); - - // Items 2 and 4 time out. If we run the stream until it completes, - // we end up with the following items: - // - // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] - - let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); - let mut stream = task::spawn(stream); - - // First item completes immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - - // Second item is delayed 200ms, times out after 100ms - assert_pending!(stream.poll_next()); - - time::advance(ms(150)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); - - assert_pending!(stream.poll_next()); - - time::advance(ms(100)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(2))); - - // Third item is ready immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - - // Fourth item is delayed 200ms, times out after 100ms - assert_pending!(stream.poll_next()); - - time::advance(ms(60)).await; - assert_pending!(stream.poll_next()); // nothing ready yet - - time::advance(ms(60)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); // timeout! - - time::advance(ms(120)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(4))); - - // Done. - assert_ready_eq!(stream.poll_next(), None); -} - -#[tokio::test] -async fn return_elapsed_errors_only_once() { - time::pause(); - - let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); - let mut stream = task::spawn(stream); - - // First item completes immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - - // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` - // error is returned. - assert_pending!(stream.poll_next()); - // - time::advance(ms(50)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); // timeout! - - // deadline elapses again, but no error is returned - time::advance(ms(50)).await; - assert_pending!(stream.poll_next()); - - time::advance(ms(100)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(2))); - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - - // Done - assert_ready_eq!(stream.poll_next(), None); -} - -#[tokio::test] -async fn no_timeouts() { - let stream = stream::iter(vec![1, 3, 5]) - .then(maybe_sleep) - .timeout(ms(100)); - - let mut stream = task::spawn(stream); - - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - assert_ready_eq!(stream.poll_next(), Some(Ok(5))); - assert_ready_eq!(stream.poll_next(), None); -} |