aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-01-14 17:23:22 -0800
committerJeff Vander Stoep <jeffv@google.com>2021-01-15 20:44:08 +0100
commit290fc4903cd00fc31d93e0ecd49c402e6833c569 (patch)
tree4a9646d2ab712bae1ead875992160c7248588daf /tests
parent84cad6596f48e471881980dcba7df9cb5b4b0139 (diff)
downloadtokio-290fc4903cd00fc31d93e0ecd49c402e6833c569.tar.gz
Upgrade rust/crates/tokio to 1.0.2platform-tools-31.0.0
Test: make Change-Id: Ic48ff709bade266749eac8c146856901ce78da7f
Diffstat (limited to 'tests')
-rw-r--r--tests/async_send_sync.rs28
-rw-r--r--tests/buffered.rs3
-rw-r--r--tests/fs_dir.rs32
-rw-r--r--tests/fs_file.rs2
-rw-r--r--tests/fs_file_mocked.rs2
-rw-r--r--tests/fs_link.rs4
-rw-r--r--tests/io_async_fd.rs23
-rw-r--r--tests/io_lines.rs16
-rw-r--r--tests/macros_select.rs8
-rw-r--r--tests/process_issue_2174.rs2
-rw-r--r--tests/process_kill_on_drop.rs2
-rw-r--r--tests/rt_basic.rs15
-rw-r--r--tests/rt_common.rs17
-rw-r--r--tests/rt_threaded.rs29
-rw-r--r--tests/stream_chain.rs95
-rw-r--r--tests/stream_collect.rs137
-rw-r--r--tests/stream_empty.rs11
-rw-r--r--tests/stream_fuse.rs50
-rw-r--r--tests/stream_iter.rs18
-rw-r--r--tests/stream_merge.rs78
-rw-r--r--tests/stream_once.rs12
-rw-r--r--tests/stream_pending.rs14
-rw-r--r--tests/stream_stream_map.rs372
-rw-r--r--tests/stream_timeout.rs109
-rw-r--r--tests/support/mpsc_stream.rs42
-rw-r--r--tests/sync_broadcast.rs40
-rw-r--r--tests/sync_mpsc.rs74
-rw-r--r--tests/sync_mutex.rs7
-rw-r--r--tests/sync_mutex_owned.rs7
-rw-r--r--tests/sync_semaphore.rs14
-rw-r--r--tests/task_abort.rs26
-rw-r--r--tests/task_blocking.rs14
-rw-r--r--tests/tcp_accept.rs4
-rw-r--r--tests/tcp_connect.rs2
-rw-r--r--tests/tcp_echo.rs2
-rw-r--r--tests/tcp_into_std.rs44
-rw-r--r--tests/tcp_shutdown.rs5
-rw-r--r--tests/time_interval.rs14
-rw-r--r--tests/time_pause.rs33
-rw-r--r--tests/time_rt.rs2
-rw-r--r--tests/time_sleep.rs221
-rw-r--r--tests/time_throttle.rs28
-rw-r--r--tests/udp.rs10
-rw-r--r--tests/uds_datagram.rs96
-rw-r--r--tests/uds_split.rs2
-rw-r--r--tests/uds_stream.rs200
46 files changed, 810 insertions, 1156 deletions
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index 2ee3857..671fa4a 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -14,8 +14,7 @@ type BoxFutureSync<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> +
type BoxFutureSend<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>;
#[allow(dead_code)]
type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T>>>;
-#[allow(dead_code)]
-type BoxStream<T> = std::pin::Pin<Box<dyn tokio::stream::Stream<Item = T>>>;
+
#[allow(dead_code)]
type BoxAsyncRead = std::pin::Pin<Box<dyn tokio::io::AsyncBufRead>>;
#[allow(dead_code)]
@@ -94,6 +93,14 @@ macro_rules! assert_value {
AmbiguousIfSync::some_item(&f);
};
};
+ ($type:ty: Unpin) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f: $type = todo!();
+ require_unpin(&f);
+ };
+ };
}
macro_rules! async_assert_fn {
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => {
@@ -222,10 +229,6 @@ async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync);
#[cfg(unix)]
async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync);
-async_assert_fn!(tokio::stream::empty<Rc<u8>>(): Send & Sync);
-async_assert_fn!(tokio::stream::pending<Rc<u8>>(): Send & Sync);
-async_assert_fn!(tokio::stream::iter(std::vec::IntoIter<u8>): Send & Sync);
-
async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync);
async_assert_fn!(tokio::sync::Mutex<u8>::lock(_): Send & Sync);
async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock(_): Send & Sync);
@@ -285,13 +288,12 @@ async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sy
async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync);
async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync);
-async_assert_fn!(tokio::stream::StreamExt::next(&mut BoxStream<()>): !Unpin);
-async_assert_fn!(tokio::stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin);
-async_assert_fn!(tokio::stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin);
-async_assert_fn!(tokio::stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin);
-async_assert_fn!(tokio::stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin);
-async_assert_fn!(tokio::stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin);
-
+assert_value!(tokio::time::Interval: Unpin);
+async_assert_fn!(tokio::time::sleep(Duration): !Unpin);
+async_assert_fn!(tokio::time::sleep_until(Instant): !Unpin);
+async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Unpin);
+async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Unpin);
+async_assert_fn!(tokio::time::Interval::tick(_): !Unpin);
async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec<u8>): !Unpin);
async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin);
async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin);
diff --git a/tests/buffered.rs b/tests/buffered.rs
index 97ba00c..98b6d5f 100644
--- a/tests/buffered.rs
+++ b/tests/buffered.rs
@@ -2,7 +2,6 @@
#![cfg(feature = "full")]
use tokio::net::TcpListener;
-use tokio::prelude::*;
use tokio_test::assert_ok;
use std::io::prelude::*;
@@ -41,7 +40,7 @@ async fn echo_server() {
let (mut a, _) = assert_ok!(srv.accept().await);
let (mut b, _) = assert_ok!(srv.accept().await);
- let n = assert_ok!(io::copy(&mut a, &mut b).await);
+ let n = assert_ok!(tokio::io::copy(&mut a, &mut b).await);
let (expected, t2) = t.join().unwrap();
let actual = t2.join().unwrap();
diff --git a/tests/fs_dir.rs b/tests/fs_dir.rs
index 6355ef0..21efe8c 100644
--- a/tests/fs_dir.rs
+++ b/tests/fs_dir.rs
@@ -85,35 +85,3 @@ async fn read_inherent() {
vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]
);
}
-
-#[tokio::test]
-async fn read_stream() {
- use tokio::stream::StreamExt;
-
- let base_dir = tempdir().unwrap();
-
- let p = base_dir.path();
- std::fs::create_dir(p.join("aa")).unwrap();
- std::fs::create_dir(p.join("bb")).unwrap();
- std::fs::create_dir(p.join("cc")).unwrap();
-
- let files = Arc::new(Mutex::new(Vec::new()));
-
- let f = files.clone();
- let p = p.to_path_buf();
-
- let mut entries = fs::read_dir(p).await.unwrap();
-
- while let Some(res) = entries.next().await {
- let e = assert_ok!(res);
- let s = e.file_name().to_str().unwrap().to_string();
- f.lock().unwrap().push(s);
- }
-
- let mut files = files.lock().unwrap();
- files.sort(); // because the order is not guaranteed
- assert_eq!(
- *files,
- vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]
- );
-}
diff --git a/tests/fs_file.rs b/tests/fs_file.rs
index d5b56e6..bf2f1d7 100644
--- a/tests/fs_file.rs
+++ b/tests/fs_file.rs
@@ -2,7 +2,7 @@
#![cfg(feature = "full")]
use tokio::fs::File;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio_test::task;
use std::io::prelude::*;
diff --git a/tests/fs_file_mocked.rs b/tests/fs_file_mocked.rs
index edb74a7..7771532 100644
--- a/tests/fs_file_mocked.rs
+++ b/tests/fs_file_mocked.rs
@@ -62,7 +62,7 @@ pub(crate) mod sync {
}
use fs::sys;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use std::io::SeekFrom;
diff --git a/tests/fs_link.rs b/tests/fs_link.rs
index cbbe27e..2ef666f 100644
--- a/tests/fs_link.rs
+++ b/tests/fs_link.rs
@@ -48,9 +48,7 @@ async fn test_symlink() {
let src_2 = src.clone();
let dst_2 = dst.clone();
- assert!(fs::os::unix::symlink(src_2.clone(), dst_2.clone())
- .await
- .is_ok());
+ assert!(fs::symlink(src_2.clone(), dst_2.clone()).await.is_ok());
let mut content = String::new();
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
index f8dc65f..d1586bb 100644
--- a/tests/io_async_fd.rs
+++ b/tests/io_async_fd.rs
@@ -201,7 +201,10 @@ async fn reset_readable() {
let mut guard = readable.await.unwrap();
- guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap();
+ guard
+ .try_io(|_| afd_a.get_ref().read(&mut [0]))
+ .unwrap()
+ .unwrap();
// `a` is not readable, but the reactor still thinks it is
// (because we have not observed a not-ready error yet)
@@ -233,12 +236,10 @@ async fn reset_writable() {
let mut guard = afd_a.writable().await.unwrap();
// Write until we get a WouldBlock. This also clears the ready state.
- loop {
- if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) {
- assert_eq!(ErrorKind::WouldBlock, e.kind());
- break;
- }
- }
+ while guard
+ .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
+ .is_ok()
+ {}
// Writable state should be cleared now.
let writable = afd_a.writable();
@@ -313,9 +314,7 @@ async fn reregister() {
}
#[tokio::test]
-async fn with_poll() {
- use std::task::Poll;
-
+async fn try_io() {
let (a, mut b) = socketpair();
b.write_all(b"0").unwrap();
@@ -327,13 +326,13 @@ async fn with_poll() {
afd_a.get_ref().read_exact(&mut [0]).unwrap();
// Should not clear the readable state
- let _ = guard.with_poll(|| Poll::Ready(()));
+ let _ = guard.try_io(|_| Ok(()));
// Still readable...
let _ = afd_a.readable().await.unwrap();
// Should clear the readable state
- let _ = guard.with_poll(|| Poll::Pending::<()>);
+ let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
// Assert not readable
let readable = afd_a.readable();
diff --git a/tests/io_lines.rs b/tests/io_lines.rs
index 2f6b339..9996d81 100644
--- a/tests/io_lines.rs
+++ b/tests/io_lines.rs
@@ -17,19 +17,3 @@ async fn lines_inherent() {
assert_eq!(b, "");
assert!(assert_ok!(st.next_line().await).is_none());
}
-
-#[tokio::test]
-async fn lines_stream() {
- use tokio::stream::StreamExt;
-
- let rd: &[u8] = b"hello\r\nworld\n\n";
- let mut st = rd.lines();
-
- let b = assert_ok!(st.next().await.unwrap());
- assert_eq!(b, "hello");
- let b = assert_ok!(st.next().await.unwrap());
- assert_eq!(b, "world");
- let b = assert_ok!(st.next().await.unwrap());
- assert_eq!(b, "");
- assert!(st.next().await.is_none());
-}
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index cc214bb..3359849 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -359,12 +359,14 @@ async fn join_with_select() {
async fn use_future_in_if_condition() {
use tokio::time::{self, Duration};
- let mut sleep = time::sleep(Duration::from_millis(50));
+ let sleep = time::sleep(Duration::from_millis(50));
+ tokio::pin!(sleep);
tokio::select! {
- _ = &mut sleep, if !sleep.is_elapsed() => {
+ _ = time::sleep(Duration::from_millis(50)), if false => {
+ panic!("if condition ignored")
}
- _ = async { 1 } => {
+ _ = async { 1u32 } => {
}
}
}
diff --git a/tests/process_issue_2174.rs b/tests/process_issue_2174.rs
index 6ee7d1a..5ee9dc0 100644
--- a/tests/process_issue_2174.rs
+++ b/tests/process_issue_2174.rs
@@ -11,7 +11,7 @@
use std::process::Stdio;
use std::time::Duration;
-use tokio::prelude::*;
+use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::time;
use tokio_test::assert_err;
diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs
index f67bb23..00f5c6d 100644
--- a/tests/process_kill_on_drop.rs
+++ b/tests/process_kill_on_drop.rs
@@ -10,7 +10,7 @@ use tokio_test::assert_ok;
#[tokio::test]
async fn kill_on_drop() {
- let mut cmd = Command::new("sh");
+ let mut cmd = Command::new("bash");
cmd.args(&[
"-c",
"
diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs
index 7b5b622..977a838 100644
--- a/tests/rt_basic.rs
+++ b/tests/rt_basic.rs
@@ -2,12 +2,16 @@
#![cfg(feature = "full")]
use tokio::runtime::Runtime;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
use std::thread;
use std::time::Duration;
+mod support {
+ pub(crate) mod mpsc_stream;
+}
+
#[test]
fn spawned_task_does_not_progress_without_block_on() {
let (tx, mut rx) = oneshot::channel();
@@ -36,7 +40,7 @@ fn no_extra_poll() {
Arc,
};
use std::task::{Context, Poll};
- use tokio::stream::{Stream, StreamExt};
+ use tokio_stream::{Stream, StreamExt};
pin_project! {
struct TrackPolls<S> {
@@ -58,8 +62,8 @@ fn no_extra_poll() {
}
}
- let (tx, rx) = mpsc::unbounded_channel();
- let mut rx = TrackPolls {
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
+ let rx = TrackPolls {
npolls: Arc::new(AtomicUsize::new(0)),
s: rx,
};
@@ -67,6 +71,9 @@ fn no_extra_poll() {
let rt = rt();
+ // TODO: could probably avoid this, but why not.
+ let mut rx = Box::pin(rx);
+
rt.spawn(async move { while rx.next().await.is_some() {} });
rt.block_on(async {
tokio::task::yield_now().await;
diff --git a/tests/rt_common.rs b/tests/rt_common.rs
index 74a94d5..66e6f2c 100644
--- a/tests/rt_common.rs
+++ b/tests/rt_common.rs
@@ -56,7 +56,7 @@ fn send_sync_bound() {
rt_test! {
use tokio::net::{TcpListener, TcpStream, UdpSocket};
- use tokio::prelude::*;
+ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::{task, time};
@@ -858,6 +858,21 @@ rt_test! {
}
#[test]
+ fn shutdown_timeout_0() {
+ let runtime = rt();
+
+ runtime.block_on(async move {
+ task::spawn_blocking(move || {
+ thread::sleep(Duration::from_secs(10_000));
+ });
+ });
+
+ let now = Instant::now();
+ Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
+ assert!(now.elapsed().as_secs() < 1);
+ }
+
+ #[test]
fn shutdown_wakeup_time() {
let runtime = rt();
diff --git a/tests/rt_threaded.rs b/tests/rt_threaded.rs
index 90ebf6a..19b381c 100644
--- a/tests/rt_threaded.rs
+++ b/tests/rt_threaded.rs
@@ -331,7 +331,7 @@ fn coop_and_block_in_place() {
// runtime worker yielded as part of `block_in_place` and guarantees the
// same thread will reclaim the worker at the end of the
// `block_in_place` call.
- .max_threads(1)
+ .max_blocking_threads(1)
.build()
.unwrap();
@@ -375,13 +375,36 @@ fn coop_and_block_in_place() {
// Testing this does not panic
#[test]
-fn max_threads() {
+fn max_blocking_threads() {
let _rt = tokio::runtime::Builder::new_multi_thread()
- .max_threads(1)
+ .max_blocking_threads(1)
.build()
.unwrap();
}
+#[test]
+#[should_panic]
+fn max_blocking_threads_set_to_zero() {
+ let _rt = tokio::runtime::Builder::new_multi_thread()
+ .max_blocking_threads(0)
+ .build()
+ .unwrap();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn hang_on_shutdown() {
+ let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
+ tokio::spawn(async move {
+ tokio::task::block_in_place(|| sync_rx.recv().ok());
+ });
+
+ tokio::spawn(async {
+ tokio::time::sleep(std::time::Duration::from_secs(2)).await;
+ drop(sync_tx);
+ });
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+}
+
fn rt() -> Runtime {
Runtime::new().unwrap()
}
diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs
deleted file mode 100644
index 98461a8..0000000
--- a/tests/stream_chain.rs
+++ /dev/null
@@ -1,95 +0,0 @@
-use tokio::stream::{self, Stream, StreamExt};
-use tokio::sync::mpsc;
-use tokio_test::{assert_pending, assert_ready, task};
-
-#[tokio::test]
-async fn basic_usage() {
- let one = stream::iter(vec![1, 2, 3]);
- let two = stream::iter(vec![4, 5, 6]);
-
- let mut stream = one.chain(two);
-
- assert_eq!(stream.size_hint(), (6, Some(6)));
- assert_eq!(stream.next().await, Some(1));
-
- assert_eq!(stream.size_hint(), (5, Some(5)));
- assert_eq!(stream.next().await, Some(2));
-
- assert_eq!(stream.size_hint(), (4, Some(4)));
- assert_eq!(stream.next().await, Some(3));
-
- assert_eq!(stream.size_hint(), (3, Some(3)));
- assert_eq!(stream.next().await, Some(4));
-
- assert_eq!(stream.size_hint(), (2, Some(2)));
- assert_eq!(stream.next().await, Some(5));
-
- assert_eq!(stream.size_hint(), (1, Some(1)));
- assert_eq!(stream.next().await, Some(6));
-
- assert_eq!(stream.size_hint(), (0, Some(0)));
- assert_eq!(stream.next().await, None);
-
- assert_eq!(stream.size_hint(), (0, Some(0)));
- assert_eq!(stream.next().await, None);
-}
-
-#[tokio::test]
-async fn pending_first() {
- let (tx1, rx1) = mpsc::unbounded_channel();
- let (tx2, rx2) = mpsc::unbounded_channel();
-
- let mut stream = task::spawn(rx1.chain(rx2));
- assert_eq!(stream.size_hint(), (0, None));
-
- assert_pending!(stream.poll_next());
-
- tx2.send(2).unwrap();
- assert!(!stream.is_woken());
-
- assert_pending!(stream.poll_next());
-
- tx1.send(1).unwrap();
- assert!(stream.is_woken());
- assert_eq!(Some(1), assert_ready!(stream.poll_next()));
-
- assert_pending!(stream.poll_next());
-
- drop(tx1);
-
- assert_eq!(stream.size_hint(), (0, None));
-
- assert!(stream.is_woken());
- assert_eq!(Some(2), assert_ready!(stream.poll_next()));
-
- assert_eq!(stream.size_hint(), (0, None));
-
- drop(tx2);
-
- assert_eq!(stream.size_hint(), (0, None));
- assert_eq!(None, assert_ready!(stream.poll_next()));
-}
-
-#[test]
-fn size_overflow() {
- struct Monster;
-
- impl tokio::stream::Stream for Monster {
- type Item = ();
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<()>> {
- panic!()
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
- }
- }
-
- let m1 = Monster;
- let m2 = Monster;
- let m = m1.chain(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
-}
diff --git a/tests/stream_collect.rs b/tests/stream_collect.rs
deleted file mode 100644
index 7ab1a34..0000000
--- a/tests/stream_collect.rs
+++ /dev/null
@@ -1,137 +0,0 @@
-use tokio::stream::{self, StreamExt};
-use tokio::sync::mpsc;
-use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
-
-#[allow(clippy::let_unit_value)]
-#[tokio::test]
-async fn empty_unit() {
- // Drains the stream.
- let mut iter = vec![(), (), ()].into_iter();
- let _: () = stream::iter(&mut iter).collect().await;
- assert!(iter.next().is_none());
-}
-
-#[tokio::test]
-async fn empty_vec() {
- let coll: Vec<u32> = stream::empty().collect().await;
- assert!(coll.is_empty());
-}
-
-#[tokio::test]
-async fn empty_box_slice() {
- let coll: Box<[u32]> = stream::empty().collect().await;
- assert!(coll.is_empty());
-}
-
-#[tokio::test]
-async fn empty_string() {
- let coll: String = stream::empty::<&str>().collect().await;
- assert!(coll.is_empty());
-}
-
-#[tokio::test]
-async fn empty_result() {
- let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
- assert_eq!(Ok(vec![]), coll);
-}
-
-#[tokio::test]
-async fn collect_vec_items() {
- let (tx, rx) = mpsc::unbounded_channel();
- let mut fut = task::spawn(rx.collect::<Vec<i32>>());
-
- assert_pending!(fut.poll());
-
- tx.send(1).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- tx.send(2).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- drop(tx);
- assert!(fut.is_woken());
- let coll = assert_ready!(fut.poll());
- assert_eq!(vec![1, 2], coll);
-}
-
-#[tokio::test]
-async fn collect_string_items() {
- let (tx, rx) = mpsc::unbounded_channel();
- let mut fut = task::spawn(rx.collect::<String>());
-
- assert_pending!(fut.poll());
-
- tx.send("hello ".to_string()).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- tx.send("world".to_string()).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- drop(tx);
- assert!(fut.is_woken());
- let coll = assert_ready!(fut.poll());
- assert_eq!("hello world", coll);
-}
-
-#[tokio::test]
-async fn collect_str_items() {
- let (tx, rx) = mpsc::unbounded_channel();
- let mut fut = task::spawn(rx.collect::<String>());
-
- assert_pending!(fut.poll());
-
- tx.send("hello ").unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- tx.send("world").unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- drop(tx);
- assert!(fut.is_woken());
- let coll = assert_ready!(fut.poll());
- assert_eq!("hello world", coll);
-}
-
-#[tokio::test]
-async fn collect_results_ok() {
- let (tx, rx) = mpsc::unbounded_channel();
- let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
-
- assert_pending!(fut.poll());
-
- tx.send(Ok("hello ")).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- tx.send(Ok("world")).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- drop(tx);
- assert!(fut.is_woken());
- let coll = assert_ready_ok!(fut.poll());
- assert_eq!("hello world", coll);
-}
-
-#[tokio::test]
-async fn collect_results_err() {
- let (tx, rx) = mpsc::unbounded_channel();
- let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
-
- assert_pending!(fut.poll());
-
- tx.send(Ok("hello ")).unwrap();
- assert!(fut.is_woken());
- assert_pending!(fut.poll());
-
- tx.send(Err("oh no")).unwrap();
- assert!(fut.is_woken());
- let err = assert_ready_err!(fut.poll());
- assert_eq!("oh no", err);
-}
diff --git a/tests/stream_empty.rs b/tests/stream_empty.rs
deleted file mode 100644
index f278076..0000000
--- a/tests/stream_empty.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-use tokio::stream::{self, Stream, StreamExt};
-
-#[tokio::test]
-async fn basic_usage() {
- let mut stream = stream::empty::<i32>();
-
- for _ in 0..2 {
- assert_eq!(stream.size_hint(), (0, Some(0)));
- assert_eq!(None, stream.next().await);
- }
-}
diff --git a/tests/stream_fuse.rs b/tests/stream_fuse.rs
deleted file mode 100644
index 9d7d969..0000000
--- a/tests/stream_fuse.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use tokio::stream::{Stream, StreamExt};
-
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-// a stream which alternates between Some and None
-struct Alternate {
- state: i32,
-}
-
-impl Stream for Alternate {
- type Item = i32;
-
- fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
- let val = self.state;
- self.state += 1;
-
- // if it's even, Some(i32), else None
- if val % 2 == 0 {
- Poll::Ready(Some(val))
- } else {
- Poll::Ready(None)
- }
- }
-}
-
-#[tokio::test]
-async fn basic_usage() {
- let mut stream = Alternate { state: 0 };
-
- // the stream goes back and forth
- assert_eq!(stream.next().await, Some(0));
- assert_eq!(stream.next().await, None);
- assert_eq!(stream.next().await, Some(2));
- assert_eq!(stream.next().await, None);
-
- // however, once it is fused
- let mut stream = stream.fuse();
-
- assert_eq!(stream.size_hint(), (0, None));
- assert_eq!(stream.next().await, Some(4));
-
- assert_eq!(stream.size_hint(), (0, None));
- assert_eq!(stream.next().await, None);
-
- // it will always return `None` after the first time.
- assert_eq!(stream.size_hint(), (0, Some(0)));
- assert_eq!(stream.next().await, None);
- assert_eq!(stream.size_hint(), (0, Some(0)));
-}
diff --git a/tests/stream_iter.rs b/tests/stream_iter.rs
deleted file mode 100644
index 45148a7..0000000
--- a/tests/stream_iter.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-use tokio::stream;
-use tokio_test::task;
-
-use std::iter;
-
-#[tokio::test]
-async fn coop() {
- let mut stream = task::spawn(stream::iter(iter::repeat(1)));
-
- for _ in 0..10_000 {
- if stream.poll_next().is_pending() {
- assert!(stream.is_woken());
- return;
- }
- }
-
- panic!("did not yield");
-}
diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs
deleted file mode 100644
index 45ecdcb..0000000
--- a/tests/stream_merge.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-use tokio::stream::{self, Stream, StreamExt};
-use tokio::sync::mpsc;
-use tokio_test::task;
-use tokio_test::{assert_pending, assert_ready};
-
-#[tokio::test]
-async fn merge_sync_streams() {
- let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5]));
-
- for i in 0..7 {
- let rem = 7 - i;
- assert_eq!(s.size_hint(), (rem, Some(rem)));
- assert_eq!(Some(i), s.next().await);
- }
-
- assert!(s.next().await.is_none());
-}
-
-#[tokio::test]
-async fn merge_async_streams() {
- let (tx1, rx1) = mpsc::unbounded_channel();
- let (tx2, rx2) = mpsc::unbounded_channel();
-
- let mut rx = task::spawn(rx1.merge(rx2));
-
- assert_eq!(rx.size_hint(), (0, None));
-
- assert_pending!(rx.poll_next());
-
- tx1.send(1).unwrap();
-
- assert!(rx.is_woken());
- assert_eq!(Some(1), assert_ready!(rx.poll_next()));
-
- assert_pending!(rx.poll_next());
- tx2.send(2).unwrap();
-
- assert!(rx.is_woken());
- assert_eq!(Some(2), assert_ready!(rx.poll_next()));
- assert_pending!(rx.poll_next());
-
- drop(tx1);
- assert!(rx.is_woken());
- assert_pending!(rx.poll_next());
-
- tx2.send(3).unwrap();
- assert!(rx.is_woken());
- assert_eq!(Some(3), assert_ready!(rx.poll_next()));
- assert_pending!(rx.poll_next());
-
- drop(tx2);
- assert!(rx.is_woken());
- assert_eq!(None, assert_ready!(rx.poll_next()));
-}
-
-#[test]
-fn size_overflow() {
- struct Monster;
-
- impl tokio::stream::Stream for Monster {
- type Item = ();
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<()>> {
- panic!()
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
- }
- }
-
- let m1 = Monster;
- let m2 = Monster;
- let m = m1.merge(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
-}
diff --git a/tests/stream_once.rs b/tests/stream_once.rs
deleted file mode 100644
index bb4635a..0000000
--- a/tests/stream_once.rs
+++ /dev/null
@@ -1,12 +0,0 @@
-use tokio::stream::{self, Stream, StreamExt};
-
-#[tokio::test]
-async fn basic_usage() {
- let mut one = stream::once(1);
-
- assert_eq!(one.size_hint(), (1, Some(1)));
- assert_eq!(Some(1), one.next().await);
-
- assert_eq!(one.size_hint(), (0, Some(0)));
- assert_eq!(None, one.next().await);
-}
diff --git a/tests/stream_pending.rs b/tests/stream_pending.rs
deleted file mode 100644
index f4d3080..0000000
--- a/tests/stream_pending.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use tokio::stream::{self, Stream, StreamExt};
-use tokio_test::{assert_pending, task};
-
-#[tokio::test]
-async fn basic_usage() {
- let mut stream = stream::pending::<i32>();
-
- for _ in 0..2 {
- assert_eq!(stream.size_hint(), (0, None));
-
- let mut next = task::spawn(async { stream.next().await });
- assert_pending!(next.poll());
- }
-}
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
deleted file mode 100644
index 38bb0c5..0000000
--- a/tests/stream_stream_map.rs
+++ /dev/null
@@ -1,372 +0,0 @@
-use tokio::stream::{self, pending, Stream, StreamExt, StreamMap};
-use tokio::sync::mpsc;
-use tokio_test::{assert_ok, assert_pending, assert_ready, task};
-
-use std::pin::Pin;
-
-macro_rules! assert_ready_some {
- ($($t:tt)*) => {
- match assert_ready!($($t)*) {
- Some(v) => v,
- None => panic!("expected `Some`, got `None`"),
- }
- };
-}
-
-macro_rules! assert_ready_none {
- ($($t:tt)*) => {
- match assert_ready!($($t)*) {
- None => {}
- Some(v) => panic!("expected `None`, got `Some({:?})`", v),
- }
- };
-}
-
-#[tokio::test]
-async fn empty() {
- let mut map = StreamMap::<&str, stream::Pending<()>>::new();
-
- assert_eq!(map.len(), 0);
- assert!(map.is_empty());
-
- assert!(map.next().await.is_none());
- assert!(map.next().await.is_none());
-
- assert!(map.remove("foo").is_none());
-}
-
-#[tokio::test]
-async fn single_entry() {
- let mut map = task::spawn(StreamMap::new());
- let (tx, rx) = mpsc::unbounded_channel();
-
- assert_ready_none!(map.poll_next());
-
- assert!(map.insert("foo", rx).is_none());
- assert!(map.contains_key("foo"));
- assert!(!map.contains_key("bar"));
-
- assert_eq!(map.len(), 1);
- assert!(!map.is_empty());
-
- assert_pending!(map.poll_next());
-
- assert_ok!(tx.send(1));
-
- assert!(map.is_woken());
- let (k, v) = assert_ready_some!(map.poll_next());
- assert_eq!(k, "foo");
- assert_eq!(v, 1);
-
- assert_pending!(map.poll_next());
-
- assert_ok!(tx.send(2));
-
- assert!(map.is_woken());
- let (k, v) = assert_ready_some!(map.poll_next());
- assert_eq!(k, "foo");
- assert_eq!(v, 2);
-
- assert_pending!(map.poll_next());
- drop(tx);
- assert!(map.is_woken());
- assert_ready_none!(map.poll_next());
-}
-
-#[tokio::test]
-async fn multiple_entries() {
- let mut map = task::spawn(StreamMap::new());
- let (tx1, rx1) = mpsc::unbounded_channel();
- let (tx2, rx2) = mpsc::unbounded_channel();
-
- map.insert("foo", rx1);
- map.insert("bar", rx2);
-
- assert_pending!(map.poll_next());
-
- assert_ok!(tx1.send(1));
-
- assert!(map.is_woken());
- let (k, v) = assert_ready_some!(map.poll_next());
- assert_eq!(k, "foo");
- assert_eq!(v, 1);
-
- assert_pending!(map.poll_next());
-
- assert_ok!(tx2.send(2));
-
- assert!(map.is_woken());
- let (k, v) = assert_ready_some!(map.poll_next());
- assert_eq!(k, "bar");
- assert_eq!(v, 2);
-
- assert_pending!(map.poll_next());
-
- assert_ok!(tx1.send(3));
- assert_ok!(tx2.send(4));
-
- assert!(map.is_woken());
-
- // Given the randomization, there is no guarantee what order the values will
- // be received in.
- let mut v = (0..2)
- .map(|_| assert_ready_some!(map.poll_next()))
- .collect::<Vec<_>>();
-
- assert_pending!(map.poll_next());
-
- v.sort_unstable();
- assert_eq!(v[0].0, "bar");
- assert_eq!(v[0].1, 4);
- assert_eq!(v[1].0, "foo");
- assert_eq!(v[1].1, 3);
-
- drop(tx1);
- assert!(map.is_woken());
- assert_pending!(map.poll_next());
- drop(tx2);
-
- assert_ready_none!(map.poll_next());
-}
-
-#[tokio::test]
-async fn insert_remove() {
- let mut map = task::spawn(StreamMap::new());
- let (tx, rx) = mpsc::unbounded_channel();
-
- assert_ready_none!(map.poll_next());
-
- assert!(map.insert("foo", rx).is_none());
- let rx = map.remove("foo").unwrap();
-
- assert_ok!(tx.send(1));
-
- assert!(!map.is_woken());
- assert_ready_none!(map.poll_next());
-
- assert!(map.insert("bar", rx).is_none());
-
- let v = assert_ready_some!(map.poll_next());
- assert_eq!(v.0, "bar");
- assert_eq!(v.1, 1);
-
- assert!(map.remove("bar").is_some());
- assert_ready_none!(map.poll_next());
-
- assert!(map.is_empty());
- assert_eq!(0, map.len());
-}
-
-#[tokio::test]
-async fn replace() {
- let mut map = task::spawn(StreamMap::new());
- let (tx1, rx1) = mpsc::unbounded_channel();
- let (tx2, rx2) = mpsc::unbounded_channel();
-
- assert!(map.insert("foo", rx1).is_none());
-
- assert_pending!(map.poll_next());
-
- let _rx1 = map.insert("foo", rx2).unwrap();
-
- assert_pending!(map.poll_next());
-
- tx1.send(1).unwrap();
- assert_pending!(map.poll_next());
-
- tx2.send(2).unwrap();
- assert!(map.is_woken());
- let v = assert_ready_some!(map.poll_next());
- assert_eq!(v.0, "foo");
- assert_eq!(v.1, 2);
-}
-
-#[test]
-fn size_hint_with_upper() {
- let mut map = StreamMap::new();
-
- map.insert("a", stream::iter(vec![1]));
- map.insert("b", stream::iter(vec![1, 2]));
- map.insert("c", stream::iter(vec![1, 2, 3]));
-
- assert_eq!(3, map.len());
- assert!(!map.is_empty());
-
- let size_hint = map.size_hint();
- assert_eq!(size_hint, (6, Some(6)));
-}
-
-#[test]
-fn size_hint_without_upper() {
- let mut map = StreamMap::new();
-
- map.insert("a", pin_box(stream::iter(vec![1])));
- map.insert("b", pin_box(stream::iter(vec![1, 2])));
- map.insert("c", pin_box(pending()));
-
- let size_hint = map.size_hint();
- assert_eq!(size_hint, (3, None));
-}
-
-#[test]
-fn new_capacity_zero() {
- let map = StreamMap::<&str, stream::Pending<()>>::new();
- assert_eq!(0, map.capacity());
-
- assert!(map.keys().next().is_none());
-}
-
-#[test]
-fn with_capacity() {
- let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
- assert!(10 <= map.capacity());
-
- assert!(map.keys().next().is_none());
-}
-
-#[test]
-fn iter_keys() {
- let mut map = StreamMap::new();
-
- map.insert("a", pending::<i32>());
- map.insert("b", pending());
- map.insert("c", pending());
-
- let mut keys = map.keys().collect::<Vec<_>>();
- keys.sort_unstable();
-
- assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
-}
-
-#[test]
-fn iter_values() {
- let mut map = StreamMap::new();
-
- map.insert("a", stream::iter(vec![1]));
- map.insert("b", stream::iter(vec![1, 2]));
- map.insert("c", stream::iter(vec![1, 2, 3]));
-
- let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
-
- size_hints.sort_unstable();
-
- assert_eq!(&size_hints[..], &[1, 2, 3]);
-}
-
-#[test]
-fn iter_values_mut() {
- let mut map = StreamMap::new();
-
- map.insert("a", stream::iter(vec![1]));
- map.insert("b", stream::iter(vec![1, 2]));
- map.insert("c", stream::iter(vec![1, 2, 3]));
-
- let mut size_hints = map
- .values_mut()
- .map(|s: &mut _| s.size_hint().0)
- .collect::<Vec<_>>();
-
- size_hints.sort_unstable();
-
- assert_eq!(&size_hints[..], &[1, 2, 3]);
-}
-
-#[test]
-fn clear() {
- let mut map = task::spawn(StreamMap::new());
-
- map.insert("a", stream::iter(vec![1]));
- map.insert("b", stream::iter(vec![1, 2]));
- map.insert("c", stream::iter(vec![1, 2, 3]));
-
- assert_ready_some!(map.poll_next());
-
- map.clear();
-
- assert_ready_none!(map.poll_next());
- assert!(map.is_empty());
-}
-
-#[test]
-fn contains_key_borrow() {
- let mut map = StreamMap::new();
- map.insert("foo".to_string(), pending::<()>());
-
- assert!(map.contains_key("foo"));
-}
-
-#[test]
-fn one_ready_many_none() {
- // Run a few times because of randomness
- for _ in 0..100 {
- let mut map = task::spawn(StreamMap::new());
-
- map.insert(0, pin_box(stream::empty()));
- map.insert(1, pin_box(stream::empty()));
- map.insert(2, pin_box(stream::once("hello")));
- map.insert(3, pin_box(stream::pending()));
-
- let v = assert_ready_some!(map.poll_next());
- assert_eq!(v, (2, "hello"));
- }
-}
-
-proptest::proptest! {
- #[test]
- fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
- use std::task::{Context, Poll};
-
- struct DidPoll<T> {
- did_poll: bool,
- inner: T,
- }
-
- impl<T: Stream + Unpin> Stream for DidPoll<T> {
- type Item = T::Item;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
- -> Poll<Option<T::Item>>
- {
- self.did_poll = true;
- Pin::new(&mut self.inner).poll_next(cx)
- }
- }
-
- for _ in 0..10 {
- let mut map = task::spawn(StreamMap::new());
- let mut expect = 0;
-
- for (i, &is_empty) in kinds.iter().enumerate() {
- let inner = if is_empty {
- pin_box(stream::empty::<()>())
- } else {
- expect += 1;
- pin_box(stream::pending::<()>())
- };
-
- let stream = DidPoll {
- did_poll: false,
- inner,
- };
-
- map.insert(i, stream);
- }
-
- if expect == 0 {
- assert_ready_none!(map.poll_next());
- } else {
- assert_pending!(map.poll_next());
-
- assert_eq!(expect, map.values().count());
-
- for stream in map.values() {
- assert!(stream.did_poll);
- }
- }
- }
- }
-}
-
-fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
- Box::pin(s)
-}
diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs
deleted file mode 100644
index a787bba..0000000
--- a/tests/stream_timeout.rs
+++ /dev/null
@@ -1,109 +0,0 @@
-#![cfg(feature = "full")]
-
-use tokio::stream::{self, StreamExt};
-use tokio::time::{self, sleep, Duration};
-use tokio_test::*;
-
-use futures::StreamExt as _;
-
-async fn maybe_sleep(idx: i32) -> i32 {
- if idx % 2 == 0 {
- sleep(ms(200)).await;
- }
- idx
-}
-
-fn ms(n: u64) -> Duration {
- Duration::from_millis(n)
-}
-
-#[tokio::test]
-async fn basic_usage() {
- time::pause();
-
- // Items 2 and 4 time out. If we run the stream until it completes,
- // we end up with the following items:
- //
- // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
-
- let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100));
- let mut stream = task::spawn(stream);
-
- // First item completes immediately
- assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
-
- // Second item is delayed 200ms, times out after 100ms
- assert_pending!(stream.poll_next());
-
- time::advance(ms(150)).await;
- let v = assert_ready!(stream.poll_next());
- assert!(v.unwrap().is_err());
-
- assert_pending!(stream.poll_next());
-
- time::advance(ms(100)).await;
- assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
-
- // Third item is ready immediately
- assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
-
- // Fourth item is delayed 200ms, times out after 100ms
- assert_pending!(stream.poll_next());
-
- time::advance(ms(60)).await;
- assert_pending!(stream.poll_next()); // nothing ready yet
-
- time::advance(ms(60)).await;
- let v = assert_ready!(stream.poll_next());
- assert!(v.unwrap().is_err()); // timeout!
-
- time::advance(ms(120)).await;
- assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
-
- // Done.
- assert_ready_eq!(stream.poll_next(), None);
-}
-
-#[tokio::test]
-async fn return_elapsed_errors_only_once() {
- time::pause();
-
- let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50));
- let mut stream = task::spawn(stream);
-
- // First item completes immediately
- assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
-
- // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
- // error is returned.
- assert_pending!(stream.poll_next());
- //
- time::advance(ms(50)).await;
- let v = assert_ready!(stream.poll_next());
- assert!(v.unwrap().is_err()); // timeout!
-
- // deadline elapses again, but no error is returned
- time::advance(ms(50)).await;
- assert_pending!(stream.poll_next());
-
- time::advance(ms(100)).await;
- assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
- assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
-
- // Done
- assert_ready_eq!(stream.poll_next(), None);
-}
-
-#[tokio::test]
-async fn no_timeouts() {
- let stream = stream::iter(vec![1, 3, 5])
- .then(maybe_sleep)
- .timeout(ms(100));
-
- let mut stream = task::spawn(stream);
-
- assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
- assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
- assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
- assert_ready_eq!(stream.poll_next(), None);
-}
diff --git a/tests/support/mpsc_stream.rs b/tests/support/mpsc_stream.rs
new file mode 100644
index 0000000..aa385a3
--- /dev/null
+++ b/tests/support/mpsc_stream.rs
@@ -0,0 +1,42 @@
+#![allow(dead_code)]
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio_stream::Stream;
+
+struct UnboundedStream<T> {
+ recv: UnboundedReceiver<T>,
+}
+impl<T> Stream for UnboundedStream<T> {
+ type Item = T;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::into_inner(self).recv.poll_recv(cx)
+ }
+}
+
+pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ let stream = UnboundedStream { recv: rx };
+
+ (tx, stream)
+}
+
+struct BoundedStream<T> {
+ recv: Receiver<T>,
+}
+impl<T> Stream for BoundedStream<T> {
+ type Item = T;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::into_inner(self).recv.poll_recv(cx)
+ }
+}
+
+pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
+ let (tx, rx) = mpsc::channel(size);
+
+ let stream = BoundedStream { recv: rx };
+
+ (tx, stream)
+}
diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs
index 84c77a7..5f79800 100644
--- a/tests/sync_broadcast.rs
+++ b/tests/sync_broadcast.rs
@@ -89,46 +89,6 @@ fn send_two_recv() {
assert_empty!(rx2);
}
-#[tokio::test]
-async fn send_recv_into_stream_ready() {
- use tokio::stream::StreamExt;
-
- let (tx, rx) = broadcast::channel::<i32>(8);
- tokio::pin! {
- let rx = rx.into_stream();
- }
-
- assert_ok!(tx.send(1));
- assert_ok!(tx.send(2));
-
- assert_eq!(Some(Ok(1)), rx.next().await);
- assert_eq!(Some(Ok(2)), rx.next().await);
-
- drop(tx);
-
- assert_eq!(None, rx.next().await);
-}
-
-#[tokio::test]
-async fn send_recv_into_stream_pending() {
- use tokio::stream::StreamExt;
-
- let (tx, rx) = broadcast::channel::<i32>(8);
-
- tokio::pin! {
- let rx = rx.into_stream();
- }
-
- let mut recv = task::spawn(rx.next());
- assert_pending!(recv.poll());
-
- assert_ok!(tx.send(1));
-
- assert!(recv.is_woken());
- let val = assert_ready!(recv.poll());
- assert_eq!(val, Some(Ok(1)));
-}
-
#[test]
fn send_recv_bounded() {
let (tx, mut rx) = broadcast::channel(16);
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs
index adefcb1..b378e6b 100644
--- a/tests/sync_mpsc.rs
+++ b/tests/sync_mpsc.rs
@@ -5,7 +5,7 @@
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
+use tokio::sync::mpsc::error::TrySendError;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -13,6 +13,10 @@ use tokio_test::{
use std::sync::Arc;
+mod support {
+ pub(crate) mod mpsc_stream;
+}
+
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
@@ -80,9 +84,10 @@ async fn reserve_disarm() {
#[tokio::test]
async fn send_recv_stream_with_buffer() {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
- let (tx, mut rx) = mpsc::channel::<i32>(16);
+ let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
+ let mut rx = Box::pin(rx);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
@@ -178,9 +183,11 @@ async fn async_send_recv_unbounded() {
#[tokio::test]
async fn send_recv_stream_unbounded() {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
- let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
+
+ let mut rx = Box::pin(rx);
tokio::spawn(async move {
assert_ok!(tx.send(1));
@@ -386,44 +393,6 @@ fn unconsumed_messages_are_dropped() {
}
#[test]
-fn try_recv() {
- let (tx, mut rx) = mpsc::channel(1);
- match rx.try_recv() {
- Err(TryRecvError::Empty) => {}
- _ => panic!(),
- }
- tx.try_send(42).unwrap();
- match rx.try_recv() {
- Ok(42) => {}
- _ => panic!(),
- }
- drop(tx);
- match rx.try_recv() {
- Err(TryRecvError::Closed) => {}
- _ => panic!(),
- }
-}
-
-#[test]
-fn try_recv_unbounded() {
- let (tx, mut rx) = mpsc::unbounded_channel();
- match rx.try_recv() {
- Err(TryRecvError::Empty) => {}
- _ => panic!(),
- }
- tx.send(42).unwrap();
- match rx.try_recv() {
- Ok(42) => {}
- _ => panic!(),
- }
- drop(tx);
- match rx.try_recv() {
- Err(TryRecvError::Closed) => {}
- _ => panic!(),
- }
-}
-
-#[test]
fn blocking_recv() {
let (tx, mut rx) = mpsc::channel::<u8>(1);
@@ -483,3 +452,22 @@ async fn ready_close_cancel_bounded() {
let val = assert_ready!(recv.poll());
assert!(val.is_none());
}
+
+#[tokio::test]
+async fn permit_available_not_acquired_close() {
+ let (tx1, mut rx) = mpsc::channel::<()>(1);
+ let tx2 = tx1.clone();
+
+ let permit1 = assert_ok!(tx1.reserve().await);
+
+ let mut permit2 = task::spawn(tx2.reserve());
+ assert_pending!(permit2.poll());
+
+ rx.close();
+
+ drop(permit1);
+ assert!(permit2.is_woken());
+
+ drop(permit2);
+ assert!(rx.recv().await.is_none());
+}
diff --git a/tests/sync_mutex.rs b/tests/sync_mutex.rs
index 96194b3..0ddb203 100644
--- a/tests/sync_mutex.rs
+++ b/tests/sync_mutex.rs
@@ -91,10 +91,11 @@ async fn aborted_future_1() {
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
- let mut iv = interval(Duration::from_millis(1000));
+ let iv = interval(Duration::from_millis(1000));
+ tokio::pin!(iv);
m2.lock().await;
- iv.tick().await;
- iv.tick().await;
+ iv.as_mut().tick().await;
+ iv.as_mut().tick().await;
})
.await
.unwrap_err();
diff --git a/tests/sync_mutex_owned.rs b/tests/sync_mutex_owned.rs
index 394a670..0f1399c 100644
--- a/tests/sync_mutex_owned.rs
+++ b/tests/sync_mutex_owned.rs
@@ -58,10 +58,11 @@ async fn aborted_future_1() {
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
- let mut iv = interval(Duration::from_millis(1000));
+ let iv = interval(Duration::from_millis(1000));
+ tokio::pin!(iv);
m2.lock_owned().await;
- iv.tick().await;
- iv.tick().await;
+ iv.as_mut().tick().await;
+ iv.as_mut().tick().await;
})
.await
.unwrap_err();
diff --git a/tests/sync_semaphore.rs b/tests/sync_semaphore.rs
index 1cb0c74..a33b878 100644
--- a/tests/sync_semaphore.rs
+++ b/tests/sync_semaphore.rs
@@ -79,3 +79,17 @@ async fn stresstest() {
let _p5 = sem.try_acquire().unwrap();
assert!(sem.try_acquire().is_err());
}
+
+#[test]
+fn add_max_amount_permits() {
+ let s = tokio::sync::Semaphore::new(0);
+ s.add_permits(usize::MAX >> 3);
+ assert_eq!(s.available_permits(), usize::MAX >> 3);
+}
+
+#[test]
+#[should_panic]
+fn add_more_than_max_amount_permits() {
+ let s = tokio::sync::Semaphore::new(1);
+ s.add_permits(usize::MAX >> 3);
+}
diff --git a/tests/task_abort.rs b/tests/task_abort.rs
new file mode 100644
index 0000000..e84f19c
--- /dev/null
+++ b/tests/task_abort.rs
@@ -0,0 +1,26 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+/// Checks that a suspended task can be aborted without panicking as reported in
+/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
+#[test]
+fn test_abort_without_panic_3157() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .enable_time()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ let handle = tokio::spawn(async move {
+ println!("task started");
+ tokio::time::sleep(std::time::Duration::new(100, 0)).await
+ });
+
+ // wait for task to sleep.
+ tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+
+ handle.abort();
+ let _ = handle.await;
+ });
+}
diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs
index eec19cc..82bef8a 100644
--- a/tests/task_blocking.rs
+++ b/tests/task_blocking.rs
@@ -7,6 +7,10 @@ use tokio_test::assert_ok;
use std::thread;
use std::time::Duration;
+mod support {
+ pub(crate) mod mpsc_stream;
+}
+
#[tokio::test]
async fn basic_blocking() {
// Run a few times
@@ -165,7 +169,8 @@ fn coop_disabled_in_block_in_place() {
.build()
.unwrap();
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
+
for i in 0..200 {
tx.send(i).unwrap();
}
@@ -175,7 +180,7 @@ fn coop_disabled_in_block_in_place() {
let jh = tokio::spawn(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})
@@ -195,7 +200,8 @@ fn coop_disabled_in_block_in_place_in_block_on() {
thread::spawn(move || {
let outer = tokio::runtime::Runtime::new().unwrap();
- let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
+
for i in 0..200 {
tx.send(i).unwrap();
}
@@ -204,7 +210,7 @@ fn coop_disabled_in_block_in_place_in_block_on() {
outer.block_on(async move {
tokio::task::block_in_place(move || {
futures::executor::block_on(async move {
- use tokio::stream::StreamExt;
+ use tokio_stream::StreamExt;
assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
})
})
diff --git a/tests/tcp_accept.rs b/tests/tcp_accept.rs
index 4c0d682..5ffb946 100644
--- a/tests/tcp_accept.rs
+++ b/tests/tcp_accept.rs
@@ -46,7 +46,7 @@ use std::sync::{
Arc,
};
use std::task::{Context, Poll};
-use tokio::stream::{Stream, StreamExt};
+use tokio_stream::{Stream, StreamExt};
struct TrackPolls<'a> {
npolls: Arc<AtomicUsize>,
@@ -88,7 +88,7 @@ async fn no_extra_poll() {
assert_eq!(npolls.load(SeqCst), 1);
let _ = assert_ok!(TcpStream::connect(&addr).await);
- accepted_rx.next().await.unwrap();
+ accepted_rx.recv().await.unwrap();
// should have been polled twice more: once to yield Some(), then once to yield Pending
assert_eq!(npolls.load(SeqCst), 1 + 2);
diff --git a/tests/tcp_connect.rs b/tests/tcp_connect.rs
index 44942c4..cbe68fa 100644
--- a/tests/tcp_connect.rs
+++ b/tests/tcp_connect.rs
@@ -169,7 +169,7 @@ async fn connect_addr_host_str_port_tuple() {
#[cfg(target_os = "linux")]
mod linux {
use tokio::net::{TcpListener, TcpStream};
- use tokio::prelude::*;
+ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_test::assert_ok;
use mio::unix::UnixReady;
diff --git a/tests/tcp_echo.rs b/tests/tcp_echo.rs
index d9cb456..5bb7ff0 100644
--- a/tests/tcp_echo.rs
+++ b/tests/tcp_echo.rs
@@ -1,8 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
-use tokio::prelude::*;
use tokio::sync::oneshot;
use tokio_test::assert_ok;
diff --git a/tests/tcp_into_std.rs b/tests/tcp_into_std.rs
new file mode 100644
index 0000000..a46aace
--- /dev/null
+++ b/tests/tcp_into_std.rs
@@ -0,0 +1,44 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use std::io::Read;
+use std::io::Result;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+
+#[tokio::test]
+async fn tcp_into_std() -> Result<()> {
+ let mut data = [0u8; 12];
+ let listener = TcpListener::bind("127.0.0.1:34254").await?;
+
+ let handle = tokio::spawn(async {
+ let stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap();
+ stream
+ });
+
+ let (tokio_tcp_stream, _) = listener.accept().await?;
+ let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
+ std_tcp_stream
+ .set_nonblocking(false)
+ .expect("set_nonblocking call failed");
+
+ let mut client = handle.await.expect("The task being joined has panicked");
+ client.write_all(b"Hello world!").await?;
+
+ std_tcp_stream
+ .read_exact(&mut data)
+ .expect("std TcpStream read failed!");
+ assert_eq!(b"Hello world!", &data);
+
+ // test back to tokio stream
+ std_tcp_stream
+ .set_nonblocking(true)
+ .expect("set_nonblocking call failed");
+ let mut tokio_tcp_stream = TcpStream::from_std(std_tcp_stream)?;
+ client.write_all(b"Hello tokio!").await?;
+ let _size = tokio_tcp_stream.read_exact(&mut data).await?;
+ assert_eq!(b"Hello tokio!", &data);
+
+ Ok(())
+}
diff --git a/tests/tcp_shutdown.rs b/tests/tcp_shutdown.rs
index 615855f..536a161 100644
--- a/tests/tcp_shutdown.rs
+++ b/tests/tcp_shutdown.rs
@@ -1,9 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::io::{self, AsyncWriteExt};
+use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
-use tokio::prelude::*;
use tokio_test::assert_ok;
#[tokio::test]
@@ -16,7 +15,7 @@ async fn shutdown() {
assert_ok!(AsyncWriteExt::shutdown(&mut stream).await);
- let mut buf = [0; 1];
+ let mut buf = [0u8; 1];
let n = assert_ok!(stream.read(&mut buf).await);
assert_eq!(n, 0);
});
diff --git a/tests/time_interval.rs b/tests/time_interval.rs
index 5ac6ae6..a3c7f08 100644
--- a/tests/time_interval.rs
+++ b/tests/time_interval.rs
@@ -44,20 +44,6 @@ async fn usage() {
assert_pending!(poll_next(&mut i));
}
-#[tokio::test]
-async fn usage_stream() {
- use tokio::stream::StreamExt;
-
- let start = Instant::now();
- let mut interval = time::interval(ms(10));
-
- for _ in 0..3 {
- interval.next().await.unwrap();
- }
-
- assert!(start.elapsed() > ms(20));
-}
-
fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
interval.enter(|cx, mut interval| {
tokio::pin! {
diff --git a/tests/time_pause.rs b/tests/time_pause.rs
new file mode 100644
index 0000000..49a7677
--- /dev/null
+++ b/tests/time_pause.rs
@@ -0,0 +1,33 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio_test::assert_err;
+
+#[tokio::test]
+async fn pause_time_in_main() {
+ tokio::time::pause();
+}
+
+#[tokio::test]
+async fn pause_time_in_task() {
+ let t = tokio::spawn(async {
+ tokio::time::pause();
+ });
+
+ t.await.unwrap();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+#[should_panic]
+async fn pause_time_in_main_threads() {
+ tokio::time::pause();
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+async fn pause_time_in_spawn_threads() {
+ let t = tokio::spawn(async {
+ tokio::time::pause();
+ });
+
+ assert_err!(t.await);
+}
diff --git a/tests/time_rt.rs b/tests/time_rt.rs
index 85db78d..0775343 100644
--- a/tests/time_rt.rs
+++ b/tests/time_rt.rs
@@ -68,7 +68,7 @@ async fn starving() {
}
let when = Instant::now() + Duration::from_millis(20);
- let starve = Starve(sleep_until(when), 0);
+ let starve = Starve(Box::pin(sleep_until(when)), 0);
starve.await;
assert!(Instant::now() >= when);
diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs
index 955d833..2736258 100644
--- a/tests/time_sleep.rs
+++ b/tests/time_sleep.rs
@@ -1,6 +1,11 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use std::future::Future;
+use std::task::Context;
+
+use futures::task::noop_waker_ref;
+
use tokio::time::{self, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
@@ -31,6 +36,25 @@ async fn immediate_sleep() {
}
#[tokio::test]
+async fn is_elapsed() {
+ time::pause();
+
+ let sleep = time::sleep(Duration::from_millis(50));
+
+ tokio::pin!(sleep);
+
+ assert!(!sleep.is_elapsed());
+
+ assert!(futures::poll!(sleep.as_mut()).is_pending());
+
+ assert!(!sleep.is_elapsed());
+
+ sleep.as_mut().await;
+
+ assert!(sleep.is_elapsed());
+}
+
+#[tokio::test]
async fn delayed_sleep_level_0() {
time::pause();
@@ -75,12 +99,12 @@ async fn reset_future_sleep_before_fire() {
let now = Instant::now();
- let mut sleep = task::spawn(time::sleep_until(now + ms(100)));
+ let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100))));
assert_pending!(sleep.poll());
let mut sleep = sleep.into_inner();
- sleep.reset(Instant::now() + ms(200));
+ sleep.as_mut().reset(Instant::now() + ms(200));
sleep.await;
assert_elapsed!(now, 200);
@@ -92,12 +116,12 @@ async fn reset_past_sleep_before_turn() {
let now = Instant::now();
- let mut sleep = task::spawn(time::sleep_until(now + ms(100)));
+ let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100))));
assert_pending!(sleep.poll());
let mut sleep = sleep.into_inner();
- sleep.reset(now + ms(80));
+ sleep.as_mut().reset(now + ms(80));
sleep.await;
assert_elapsed!(now, 80);
@@ -109,14 +133,14 @@ async fn reset_past_sleep_before_fire() {
let now = Instant::now();
- let mut sleep = task::spawn(time::sleep_until(now + ms(100)));
+ let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100))));
assert_pending!(sleep.poll());
let mut sleep = sleep.into_inner();
time::sleep(ms(10)).await;
- sleep.reset(now + ms(80));
+ sleep.as_mut().reset(now + ms(80));
sleep.await;
assert_elapsed!(now, 80);
@@ -127,12 +151,12 @@ async fn reset_future_sleep_after_fire() {
time::pause();
let now = Instant::now();
- let mut sleep = time::sleep_until(now + ms(100));
+ let mut sleep = Box::pin(time::sleep_until(now + ms(100)));
- (&mut sleep).await;
+ sleep.as_mut().await;
assert_elapsed!(now, 100);
- sleep.reset(now + ms(110));
+ sleep.as_mut().reset(now + ms(110));
sleep.await;
assert_elapsed!(now, 110);
}
@@ -143,16 +167,17 @@ async fn reset_sleep_to_past() {
let now = Instant::now();
- let mut sleep = task::spawn(time::sleep_until(now + ms(100)));
+ let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100))));
assert_pending!(sleep.poll());
time::sleep(ms(50)).await;
assert!(!sleep.is_woken());
- sleep.reset(now + ms(40));
+ sleep.as_mut().reset(now + ms(40));
- assert!(sleep.is_woken());
+ // TODO: is this required?
+ //assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}
@@ -167,22 +192,110 @@ fn creating_sleep_outside_of_context() {
let _fut = time::sleep_until(now + ms(500));
}
-#[should_panic]
#[tokio::test]
async fn greater_than_max() {
const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
+ time::pause();
time::sleep_until(Instant::now() + ms(YR_5)).await;
}
+#[tokio::test]
+async fn short_sleeps() {
+ for i in 0..10000 {
+ if (i % 10) == 0 {
+ eprintln!("=== {}", i);
+ }
+ tokio::time::sleep(std::time::Duration::from_millis(0)).await;
+ }
+}
+
+#[tokio::test]
+async fn multi_long_sleeps() {
+ tokio::time::pause();
+
+ for _ in 0..5u32 {
+ tokio::time::sleep(Duration::from_secs(
+ // about a year
+ 365 * 24 * 3600,
+ ))
+ .await;
+ }
+
+ let deadline = tokio::time::Instant::now()
+ + Duration::from_secs(
+ // about 10 years
+ 10 * 365 * 24 * 3600,
+ );
+
+ tokio::time::sleep_until(deadline).await;
+
+ assert!(tokio::time::Instant::now() >= deadline);
+}
+
+#[tokio::test]
+async fn long_sleeps() {
+ tokio::time::pause();
+
+ let deadline = tokio::time::Instant::now()
+ + Duration::from_secs(
+ // about 10 years
+ 10 * 365 * 24 * 3600,
+ );
+
+ tokio::time::sleep_until(deadline).await;
+
+ assert!(tokio::time::Instant::now() >= deadline);
+ assert!(tokio::time::Instant::now() <= deadline + Duration::from_millis(1));
+}
+
+#[tokio::test]
+#[should_panic(expected = "Duration too far into the future")]
+async fn very_long_sleeps() {
+ tokio::time::pause();
+
+ // Some platforms (eg macos) can't represent times this far in the future
+ if let Some(deadline) = tokio::time::Instant::now().checked_add(Duration::from_secs(1u64 << 62))
+ {
+ tokio::time::sleep_until(deadline).await;
+ } else {
+ // make it pass anyway (we can't skip/ignore the test based on the
+ // result of checked_add)
+ panic!("Duration too far into the future (test ignored)")
+ }
+}
+
+#[tokio::test]
+async fn reset_after_firing() {
+ let timer = tokio::time::sleep(std::time::Duration::from_millis(1));
+ tokio::pin!(timer);
+
+ let deadline = timer.deadline();
+
+ timer.as_mut().await;
+ assert_ready!(timer
+ .as_mut()
+ .poll(&mut Context::from_waker(noop_waker_ref())));
+ timer
+ .as_mut()
+ .reset(tokio::time::Instant::now() + std::time::Duration::from_secs(600));
+
+ assert_ne!(deadline, timer.deadline());
+
+ assert_pending!(timer
+ .as_mut()
+ .poll(&mut Context::from_waker(noop_waker_ref())));
+ assert_pending!(timer
+ .as_mut()
+ .poll(&mut Context::from_waker(noop_waker_ref())));
+}
+
const NUM_LEVELS: usize = 6;
const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
-#[should_panic]
#[tokio::test]
async fn exactly_max() {
- // TODO: this should not panic but `time::ms()` is acting up
- // If fixed, make sure to update documentation on `time::sleep` too.
+ time::pause();
time::sleep(ms(MAX_DURATION)).await;
}
@@ -195,3 +308,79 @@ async fn no_out_of_bounds_close_to_max() {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}
+
+#[tokio::test]
+async fn drop_after_reschedule_at_new_scheduled_time() {
+ use futures::poll;
+
+ tokio::time::pause();
+
+ let start = tokio::time::Instant::now();
+
+ let mut a = Box::pin(tokio::time::sleep(Duration::from_millis(5)));
+ let mut b = Box::pin(tokio::time::sleep(Duration::from_millis(5)));
+ let mut c = Box::pin(tokio::time::sleep(Duration::from_millis(10)));
+
+ let _ = poll!(&mut a);
+ let _ = poll!(&mut b);
+ let _ = poll!(&mut c);
+
+ b.as_mut().reset(start + Duration::from_millis(10));
+ a.await;
+
+ drop(b);
+}
+
+#[tokio::test]
+async fn drop_from_wake() {
+ use std::future::Future;
+ use std::pin::Pin;
+ use std::sync::atomic::{AtomicBool, Ordering};
+ use std::sync::{Arc, Mutex};
+ use std::task::Context;
+
+ let panicked = Arc::new(AtomicBool::new(false));
+ let list: Arc<Mutex<Vec<Pin<Box<tokio::time::Sleep>>>>> = Arc::new(Mutex::new(Vec::new()));
+
+ let arc_wake = Arc::new(DropWaker(panicked.clone(), list.clone()));
+ let arc_wake = futures::task::waker(arc_wake);
+
+ tokio::time::pause();
+
+ let mut lock = list.lock().unwrap();
+
+ for _ in 0..100 {
+ let mut timer = Box::pin(tokio::time::sleep(Duration::from_millis(10)));
+
+ let _ = timer.as_mut().poll(&mut Context::from_waker(&arc_wake));
+
+ lock.push(timer);
+ }
+
+ drop(lock);
+
+ tokio::time::sleep(Duration::from_millis(11)).await;
+
+ assert!(
+ !panicked.load(Ordering::SeqCst),
+ "paniced when dropping timers"
+ );
+
+ #[derive(Clone)]
+ struct DropWaker(
+ Arc<AtomicBool>,
+ Arc<Mutex<Vec<Pin<Box<tokio::time::Sleep>>>>>,
+ );
+
+ impl futures::task::ArcWake for DropWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
+ *arc_self.1.lock().expect("panic in lock") = Vec::new()
+ }));
+
+ if result.is_err() {
+ arc_self.0.store(true, Ordering::SeqCst);
+ }
+ }
+ }
+}
diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs
deleted file mode 100644
index c886319..0000000
--- a/tests/time_throttle.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-#![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
-
-use tokio::stream::StreamExt;
-use tokio::time;
-use tokio_test::*;
-
-use std::time::Duration;
-
-#[tokio::test]
-async fn usage() {
- time::pause();
-
- let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100)));
-
- assert_ready!(stream.poll_next());
- assert_pending!(stream.poll_next());
-
- time::advance(Duration::from_millis(90)).await;
-
- assert_pending!(stream.poll_next());
-
- time::advance(Duration::from_millis(101)).await;
-
- assert!(stream.is_woken());
-
- assert_ready!(stream.poll_next());
-}
diff --git a/tests/udp.rs b/tests/udp.rs
index 291267e..7cbba1b 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -66,7 +66,7 @@ async fn send_to_recv_from_poll() -> std::io::Result<()> {
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
- poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
@@ -83,7 +83,7 @@ async fn send_to_peek_from() -> std::io::Result<()> {
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
- poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
// peek
let mut recv_buf = [0u8; 32];
@@ -111,7 +111,7 @@ async fn send_to_peek_from_poll() -> std::io::Result<()> {
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
- poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
@@ -192,7 +192,7 @@ async fn split_chan_poll() -> std::io::Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
tokio::spawn(async move {
while let Some((bytes, addr)) = rx.recv().await {
- poll_fn(|cx| s.poll_send_to(cx, &bytes, &addr))
+ poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
.await
.unwrap();
}
@@ -209,7 +209,7 @@ async fn split_chan_poll() -> std::io::Result<()> {
// test that we can send a value and get back some response
let sender = UdpSocket::bind("127.0.0.1:0").await?;
- poll_fn(|cx| sender.poll_send_to(cx, MSG, &addr)).await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
let mut recv_buf = [0u8; 32];
let mut read = ReadBuf::new(&mut recv_buf);
diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs
index ec2f6f8..cdabd7b 100644
--- a/tests/uds_datagram.rs
+++ b/tests/uds_datagram.rs
@@ -2,6 +2,8 @@
#![cfg(feature = "full")]
#![cfg(unix)]
+use futures::future::poll_fn;
+use tokio::io::ReadBuf;
use tokio::net::UnixDatagram;
use tokio::try_join;
@@ -82,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> {
// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
+ dgram1.writable().await.unwrap();
+
match dgram1.try_send(payload) {
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
@@ -96,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
// Read every dgram we sent.
while count > 0 {
+ dgram2.readable().await.unwrap();
let len = dgram2.try_recv(&mut recv_buf[..])?;
assert_eq!(len, payload.len());
assert_eq!(payload, &recv_buf[..len]);
@@ -134,3 +139,94 @@ async fn split() -> std::io::Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn send_to_recv_from_poll() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let sender_path = dir.path().join("sender.sock");
+ let receiver_path = dir.path().join("receiver.sock");
+
+ let sender = UnixDatagram::bind(&sender_path)?;
+ let receiver = UnixDatagram::bind(&receiver_path)?;
+
+ let msg = b"hello";
+ poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), msg);
+ assert_eq!(addr.as_pathname(), Some(sender_path.as_ref()));
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_recv_poll() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let sender_path = dir.path().join("sender.sock");
+ let receiver_path = dir.path().join("receiver.sock");
+
+ let sender = UnixDatagram::bind(&sender_path)?;
+ let receiver = UnixDatagram::bind(&receiver_path)?;
+
+ sender.connect(&receiver_path)?;
+ receiver.connect(&sender_path)?;
+
+ let msg = b"hello";
+ poll_fn(|cx| sender.poll_send(cx, msg)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), msg);
+ Ok(())
+}
+
+#[tokio::test]
+async fn try_send_to_recv_from() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ // Create listener
+ let server = UnixDatagram::bind(&server_path)?;
+
+ // Create socket pair
+ let client = UnixDatagram::bind(&client_path)?;
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await?;
+
+ match client.try_send_to(b"hello world", &server_path) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await?;
+
+ let mut buf = [0; 512];
+
+ match server.try_recv_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/tests/uds_split.rs b/tests/uds_split.rs
index 76ff461..8161423 100644
--- a/tests/uds_split.rs
+++ b/tests/uds_split.rs
@@ -2,8 +2,8 @@
#![cfg(feature = "full")]
#![cfg(unix)]
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::UnixStream;
-use tokio::prelude::*;
/// Checks that `UnixStream` can be split into a read half and a write half using
/// `UnixStream::split` and `UnixStream::split_mut`.
diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs
index cd557e5..5160f17 100644
--- a/tests/uds_stream.rs
+++ b/tests/uds_stream.rs
@@ -2,10 +2,14 @@
#![warn(rust_2018_idioms)]
#![cfg(unix)]
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use std::io;
+use std::task::Poll;
+
+use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
use tokio::net::{UnixListener, UnixStream};
+use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task};
-use futures::future::try_join;
+use futures::future::{poll_fn, try_join};
#[tokio::test]
async fn accept_read_write() -> std::io::Result<()> {
@@ -56,3 +60,195 @@ async fn shutdown() -> std::io::Result<()> {
assert_eq!(n, 0);
Ok(())
}
+
+#[tokio::test]
+async fn try_read_write() -> std::io::Result<()> {
+ let msg = b"hello world";
+
+ let dir = tempfile::tempdir()?;
+ let bind_path = dir.path().join("bind.sock");
+
+ // Create listener
+ let listener = UnixListener::bind(&bind_path)?;
+
+ // Create socket pair
+ let client = UnixStream::connect(&bind_path).await?;
+
+ let (server, _) = listener.accept().await?;
+ let mut written = msg.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await?;
+ assert_eq!(msg.len(), client.try_write(msg)?);
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(msg) {
+ Ok(n) => written.extend(&msg[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await?;
+
+ match server.try_read(&mut read[i..]) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await?;
+
+ if ready.is_read_closed() {
+ break;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+
+ Ok(())
+}
+
+async fn create_pair() -> (UnixStream, UnixStream) {
+ let dir = assert_ok!(tempfile::tempdir());
+ let bind_path = dir.path().join("bind.sock");
+
+ let listener = assert_ok!(UnixListener::bind(&bind_path));
+
+ let accept = listener.accept();
+ let connect = UnixStream::connect(&bind_path);
+ let ((server, _), client) = assert_ok!(try_join(accept, connect).await);
+
+ (client, server)
+}
+
+macro_rules! assert_readable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_readable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_read_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+macro_rules! assert_writable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_writable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_write_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+#[tokio::test]
+async fn poll_read_ready() {
+ let (mut client, mut server) = create_pair().await;
+
+ // Initial state - not readable.
+ assert_not_readable_by_polling!(server);
+
+ // There is data in the buffer - readable.
+ assert_ok!(client.write_all(b"ping").await);
+ assert_readable_by_polling!(server);
+
+ // Readable until calls to `poll_read` return `Poll::Pending`.
+ let mut buf = [0u8; 4];
+ assert_ok!(server.read_exact(&mut buf).await);
+ assert_readable_by_polling!(server);
+ read_until_pending(&mut server);
+ assert_not_readable_by_polling!(server);
+
+ // Detect the client disconnect.
+ drop(client);
+ assert_readable_by_polling!(server);
+}
+
+#[tokio::test]
+async fn poll_write_ready() {
+ let (mut client, server) = create_pair().await;
+
+ // Initial state - writable.
+ assert_writable_by_polling!(client);
+
+ // No space to write - not writable.
+ write_until_pending(&mut client);
+ assert_not_writable_by_polling!(client);
+
+ // Detect the server disconnect.
+ drop(server);
+ assert_writable_by_polling!(client);
+}
+
+fn read_until_pending(stream: &mut UnixStream) {
+ let mut buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_read(&mut buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}
+
+fn write_until_pending(stream: &mut UnixStream) {
+ let buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_write(&buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}