summaryrefslogtreecommitdiff
path: root/tests/chunks_timeout.rs
diff options
context:
space:
mode:
authorChris Wailes <chriswailes@google.com>2023-01-18 22:05:31 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-01-18 22:05:31 +0000
commitee83363fd0bd779825ad2ccb16e94e94b42a0f24 (patch)
tree32543bf48b06816c5b45896873d8b18c39047702 /tests/chunks_timeout.rs
parentfc78584e33e40cee146fc5b742057120a544bf48 (diff)
parent3b22d38b59cc15157c73f680b70ca054d03b217b (diff)
downloadtokio-stream-ee83363fd0bd779825ad2ccb16e94e94b42a0f24.tar.gz
Merge "Upgrade tokio-stream to 0.1.11" am: 3b22d38b59
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2346810 Change-Id: I4338187e8249a61108a400003217c23a09dff9e5 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
Diffstat (limited to 'tests/chunks_timeout.rs')
-rw-r--r--tests/chunks_timeout.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/tests/chunks_timeout.rs b/tests/chunks_timeout.rs
new file mode 100644
index 0000000..ffc7dea
--- /dev/null
+++ b/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::assert_pending;
+use tokio_test::task;
+
+use futures::FutureExt;
+use std::time::Duration;
+
+#[tokio::test(start_paused = true)]
+async fn usage() {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![4].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(4, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test(start_paused = true)]
+async fn full_chunk_with_timeout() {
+ let iter = vec![1, 2].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![3].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
+
+ let iter = vec![4].into_iter();
+ let stream2 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chain(stream2)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test]
+#[ignore]
+async fn real_time() {
+ let iter = vec![1, 2, 3, 4].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![5].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ assert_eq!(chunk_stream.next().await, Some(vec![5]));
+}