From 290fc4903cd00fc31d93e0ecd49c402e6833c569 Mon Sep 17 00:00:00 2001 From: Haibo Huang Date: Thu, 14 Jan 2021 17:23:22 -0800 Subject: Upgrade rust/crates/tokio to 1.0.2 Test: make Change-Id: Ic48ff709bade266749eac8c146856901ce78da7f --- tests/async_send_sync.rs | 28 ++-- tests/buffered.rs | 3 +- tests/fs_dir.rs | 32 ---- tests/fs_file.rs | 2 +- tests/fs_file_mocked.rs | 2 +- tests/fs_link.rs | 4 +- tests/io_async_fd.rs | 23 ++- tests/io_lines.rs | 16 -- tests/macros_select.rs | 8 +- tests/process_issue_2174.rs | 2 +- tests/process_kill_on_drop.rs | 2 +- tests/rt_basic.rs | 15 +- tests/rt_common.rs | 17 +- tests/rt_threaded.rs | 29 +++- tests/stream_chain.rs | 95 ----------- tests/stream_collect.rs | 137 ---------------- tests/stream_empty.rs | 11 -- tests/stream_fuse.rs | 50 ------ tests/stream_iter.rs | 18 -- tests/stream_merge.rs | 78 --------- tests/stream_once.rs | 12 -- tests/stream_pending.rs | 14 -- tests/stream_stream_map.rs | 372 ------------------------------------------ tests/stream_timeout.rs | 109 ------------- tests/support/mpsc_stream.rs | 42 +++++ tests/sync_broadcast.rs | 40 ----- tests/sync_mpsc.rs | 74 ++++----- tests/sync_mutex.rs | 7 +- tests/sync_mutex_owned.rs | 7 +- tests/sync_semaphore.rs | 14 ++ tests/task_abort.rs | 26 +++ tests/task_blocking.rs | 14 +- tests/tcp_accept.rs | 4 +- tests/tcp_connect.rs | 2 +- tests/tcp_echo.rs | 2 +- tests/tcp_into_std.rs | 44 +++++ tests/tcp_shutdown.rs | 5 +- tests/time_interval.rs | 14 -- tests/time_pause.rs | 33 ++++ tests/time_rt.rs | 2 +- tests/time_sleep.rs | 221 +++++++++++++++++++++++-- tests/time_throttle.rs | 28 ---- tests/udp.rs | 10 +- tests/uds_datagram.rs | 96 +++++++++++ tests/uds_split.rs | 2 +- tests/uds_stream.rs | 200 ++++++++++++++++++++++- 46 files changed, 810 insertions(+), 1156 deletions(-) delete mode 100644 tests/stream_chain.rs delete mode 100644 tests/stream_collect.rs delete mode 100644 tests/stream_empty.rs delete mode 100644 tests/stream_fuse.rs delete mode 100644 tests/stream_iter.rs delete mode 100644 tests/stream_merge.rs delete mode 100644 tests/stream_once.rs delete mode 100644 tests/stream_pending.rs delete mode 100644 tests/stream_stream_map.rs delete mode 100644 tests/stream_timeout.rs create mode 100644 tests/support/mpsc_stream.rs create mode 100644 tests/task_abort.rs create mode 100644 tests/tcp_into_std.rs create mode 100644 tests/time_pause.rs delete mode 100644 tests/time_throttle.rs (limited to 'tests') diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index 2ee3857..671fa4a 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -14,8 +14,7 @@ type BoxFutureSync = std::pin::Pin + type BoxFutureSend = std::pin::Pin + Send>>; #[allow(dead_code)] type BoxFuture = std::pin::Pin>>; -#[allow(dead_code)] -type BoxStream = std::pin::Pin>>; + #[allow(dead_code)] type BoxAsyncRead = std::pin::Pin>; #[allow(dead_code)] @@ -94,6 +93,14 @@ macro_rules! assert_value { AmbiguousIfSync::some_item(&f); }; }; + ($type:ty: Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f: $type = todo!(); + require_unpin(&f); + }; + }; } macro_rules! async_assert_fn { ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { @@ -222,10 +229,6 @@ async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync); #[cfg(unix)] async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync); -async_assert_fn!(tokio::stream::empty>(): Send & Sync); -async_assert_fn!(tokio::stream::pending>(): Send & Sync); -async_assert_fn!(tokio::stream::iter(std::vec::IntoIter): Send & Sync); - async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex::lock(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock(_): Send & Sync); @@ -285,13 +288,12 @@ async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sy async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync); async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync); -async_assert_fn!(tokio::stream::StreamExt::next(&mut BoxStream<()>): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::try_next(&mut BoxStream>): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::collect>(&mut BoxStream<()>): !Unpin); - +assert_value!(tokio::time::Interval: Unpin); +async_assert_fn!(tokio::time::sleep(Duration): !Unpin); +async_assert_fn!(tokio::time::sleep_until(Instant): !Unpin); +async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Unpin); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Unpin); +async_assert_fn!(tokio::time::Interval::tick(_): !Unpin); async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec): !Unpin); async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin); async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin); diff --git a/tests/buffered.rs b/tests/buffered.rs index 97ba00c..98b6d5f 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -2,7 +2,6 @@ #![cfg(feature = "full")] use tokio::net::TcpListener; -use tokio::prelude::*; use tokio_test::assert_ok; use std::io::prelude::*; @@ -41,7 +40,7 @@ async fn echo_server() { let (mut a, _) = assert_ok!(srv.accept().await); let (mut b, _) = assert_ok!(srv.accept().await); - let n = assert_ok!(io::copy(&mut a, &mut b).await); + let n = assert_ok!(tokio::io::copy(&mut a, &mut b).await); let (expected, t2) = t.join().unwrap(); let actual = t2.join().unwrap(); diff --git a/tests/fs_dir.rs b/tests/fs_dir.rs index 6355ef0..21efe8c 100644 --- a/tests/fs_dir.rs +++ b/tests/fs_dir.rs @@ -85,35 +85,3 @@ async fn read_inherent() { vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] ); } - -#[tokio::test] -async fn read_stream() { - use tokio::stream::StreamExt; - - let base_dir = tempdir().unwrap(); - - let p = base_dir.path(); - std::fs::create_dir(p.join("aa")).unwrap(); - std::fs::create_dir(p.join("bb")).unwrap(); - std::fs::create_dir(p.join("cc")).unwrap(); - - let files = Arc::new(Mutex::new(Vec::new())); - - let f = files.clone(); - let p = p.to_path_buf(); - - let mut entries = fs::read_dir(p).await.unwrap(); - - while let Some(res) = entries.next().await { - let e = assert_ok!(res); - let s = e.file_name().to_str().unwrap().to_string(); - f.lock().unwrap().push(s); - } - - let mut files = files.lock().unwrap(); - files.sort(); // because the order is not guaranteed - assert_eq!( - *files, - vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] - ); -} diff --git a/tests/fs_file.rs b/tests/fs_file.rs index d5b56e6..bf2f1d7 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -2,7 +2,7 @@ #![cfg(feature = "full")] use tokio::fs::File; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio_test::task; use std::io::prelude::*; diff --git a/tests/fs_file_mocked.rs b/tests/fs_file_mocked.rs index edb74a7..7771532 100644 --- a/tests/fs_file_mocked.rs +++ b/tests/fs_file_mocked.rs @@ -62,7 +62,7 @@ pub(crate) mod sync { } use fs::sys; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; use std::io::SeekFrom; diff --git a/tests/fs_link.rs b/tests/fs_link.rs index cbbe27e..2ef666f 100644 --- a/tests/fs_link.rs +++ b/tests/fs_link.rs @@ -48,9 +48,7 @@ async fn test_symlink() { let src_2 = src.clone(); let dst_2 = dst.clone(); - assert!(fs::os::unix::symlink(src_2.clone(), dst_2.clone()) - .await - .is_ok()); + assert!(fs::symlink(src_2.clone(), dst_2.clone()).await.is_ok()); let mut content = String::new(); diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs index f8dc65f..d1586bb 100644 --- a/tests/io_async_fd.rs +++ b/tests/io_async_fd.rs @@ -201,7 +201,10 @@ async fn reset_readable() { let mut guard = readable.await.unwrap(); - guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap(); + guard + .try_io(|_| afd_a.get_ref().read(&mut [0])) + .unwrap() + .unwrap(); // `a` is not readable, but the reactor still thinks it is // (because we have not observed a not-ready error yet) @@ -233,12 +236,10 @@ async fn reset_writable() { let mut guard = afd_a.writable().await.unwrap(); // Write until we get a WouldBlock. This also clears the ready state. - loop { - if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) { - assert_eq!(ErrorKind::WouldBlock, e.kind()); - break; - } - } + while guard + .try_io(|_| afd_a.get_ref().write(&[0; 512][..])) + .is_ok() + {} // Writable state should be cleared now. let writable = afd_a.writable(); @@ -313,9 +314,7 @@ async fn reregister() { } #[tokio::test] -async fn with_poll() { - use std::task::Poll; - +async fn try_io() { let (a, mut b) = socketpair(); b.write_all(b"0").unwrap(); @@ -327,13 +326,13 @@ async fn with_poll() { afd_a.get_ref().read_exact(&mut [0]).unwrap(); // Should not clear the readable state - let _ = guard.with_poll(|| Poll::Ready(())); + let _ = guard.try_io(|_| Ok(())); // Still readable... let _ = afd_a.readable().await.unwrap(); // Should clear the readable state - let _ = guard.with_poll(|| Poll::Pending::<()>); + let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into())); // Assert not readable let readable = afd_a.readable(); diff --git a/tests/io_lines.rs b/tests/io_lines.rs index 2f6b339..9996d81 100644 --- a/tests/io_lines.rs +++ b/tests/io_lines.rs @@ -17,19 +17,3 @@ async fn lines_inherent() { assert_eq!(b, ""); assert!(assert_ok!(st.next_line().await).is_none()); } - -#[tokio::test] -async fn lines_stream() { - use tokio::stream::StreamExt; - - let rd: &[u8] = b"hello\r\nworld\n\n"; - let mut st = rd.lines(); - - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, "hello"); - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, "world"); - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, ""); - assert!(st.next().await.is_none()); -} diff --git a/tests/macros_select.rs b/tests/macros_select.rs index cc214bb..3359849 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -359,12 +359,14 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let mut sleep = time::sleep(Duration::from_millis(50)); + let sleep = time::sleep(Duration::from_millis(50)); + tokio::pin!(sleep); tokio::select! { - _ = &mut sleep, if !sleep.is_elapsed() => { + _ = time::sleep(Duration::from_millis(50)), if false => { + panic!("if condition ignored") } - _ = async { 1 } => { + _ = async { 1u32 } => { } } } diff --git a/tests/process_issue_2174.rs b/tests/process_issue_2174.rs index 6ee7d1a..5ee9dc0 100644 --- a/tests/process_issue_2174.rs +++ b/tests/process_issue_2174.rs @@ -11,7 +11,7 @@ use std::process::Stdio; use std::time::Duration; -use tokio::prelude::*; +use tokio::io::AsyncWriteExt; use tokio::process::Command; use tokio::time; use tokio_test::assert_err; diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs index f67bb23..00f5c6d 100644 --- a/tests/process_kill_on_drop.rs +++ b/tests/process_kill_on_drop.rs @@ -10,7 +10,7 @@ use tokio_test::assert_ok; #[tokio::test] async fn kill_on_drop() { - let mut cmd = Command::new("sh"); + let mut cmd = Command::new("bash"); cmd.args(&[ "-c", " diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs index 7b5b622..977a838 100644 --- a/tests/rt_basic.rs +++ b/tests/rt_basic.rs @@ -2,12 +2,16 @@ #![cfg(feature = "full")] use tokio::runtime::Runtime; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; use std::thread; use std::time::Duration; +mod support { + pub(crate) mod mpsc_stream; +} + #[test] fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); @@ -36,7 +40,7 @@ fn no_extra_poll() { Arc, }; use std::task::{Context, Poll}; - use tokio::stream::{Stream, StreamExt}; + use tokio_stream::{Stream, StreamExt}; pin_project! { struct TrackPolls { @@ -58,8 +62,8 @@ fn no_extra_poll() { } } - let (tx, rx) = mpsc::unbounded_channel(); - let mut rx = TrackPolls { + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); + let rx = TrackPolls { npolls: Arc::new(AtomicUsize::new(0)), s: rx, }; @@ -67,6 +71,9 @@ fn no_extra_poll() { let rt = rt(); + // TODO: could probably avoid this, but why not. + let mut rx = Box::pin(rx); + rt.spawn(async move { while rx.next().await.is_some() {} }); rt.block_on(async { tokio::task::yield_now().await; diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 74a94d5..66e6f2c 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -56,7 +56,7 @@ fn send_sync_bound() { rt_test! { use tokio::net::{TcpListener, TcpStream, UdpSocket}; - use tokio::prelude::*; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::runtime::Runtime; use tokio::sync::oneshot; use tokio::{task, time}; @@ -857,6 +857,21 @@ rt_test! { Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100)); } + #[test] + fn shutdown_timeout_0() { + let runtime = rt(); + + runtime.block_on(async move { + task::spawn_blocking(move || { + thread::sleep(Duration::from_secs(10_000)); + }); + }); + + let now = Instant::now(); + Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0)); + assert!(now.elapsed().as_secs() < 1); + } + #[test] fn shutdown_wakeup_time() { let runtime = rt(); diff --git a/tests/rt_threaded.rs b/tests/rt_threaded.rs index 90ebf6a..19b381c 100644 --- a/tests/rt_threaded.rs +++ b/tests/rt_threaded.rs @@ -331,7 +331,7 @@ fn coop_and_block_in_place() { // runtime worker yielded as part of `block_in_place` and guarantees the // same thread will reclaim the worker at the end of the // `block_in_place` call. - .max_threads(1) + .max_blocking_threads(1) .build() .unwrap(); @@ -375,13 +375,36 @@ fn coop_and_block_in_place() { // Testing this does not panic #[test] -fn max_threads() { +fn max_blocking_threads() { let _rt = tokio::runtime::Builder::new_multi_thread() - .max_threads(1) + .max_blocking_threads(1) .build() .unwrap(); } +#[test] +#[should_panic] +fn max_blocking_threads_set_to_zero() { + let _rt = tokio::runtime::Builder::new_multi_thread() + .max_blocking_threads(0) + .build() + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn hang_on_shutdown() { + let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); + tokio::spawn(async move { + tokio::task::block_in_place(|| sync_rx.recv().ok()); + }); + + tokio::spawn(async { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + drop(sync_tx); + }); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; +} + fn rt() -> Runtime { Runtime::new().unwrap() } diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs deleted file mode 100644 index 98461a8..0000000 --- a/tests/stream_chain.rs +++ /dev/null @@ -1,95 +0,0 @@ -use tokio::stream::{self, Stream, StreamExt}; -use tokio::sync::mpsc; -use tokio_test::{assert_pending, assert_ready, task}; - -#[tokio::test] -async fn basic_usage() { - let one = stream::iter(vec![1, 2, 3]); - let two = stream::iter(vec![4, 5, 6]); - - let mut stream = one.chain(two); - - assert_eq!(stream.size_hint(), (6, Some(6))); - assert_eq!(stream.next().await, Some(1)); - - assert_eq!(stream.size_hint(), (5, Some(5))); - assert_eq!(stream.next().await, Some(2)); - - assert_eq!(stream.size_hint(), (4, Some(4))); - assert_eq!(stream.next().await, Some(3)); - - assert_eq!(stream.size_hint(), (3, Some(3))); - assert_eq!(stream.next().await, Some(4)); - - assert_eq!(stream.size_hint(), (2, Some(2))); - assert_eq!(stream.next().await, Some(5)); - - assert_eq!(stream.size_hint(), (1, Some(1))); - assert_eq!(stream.next().await, Some(6)); - - assert_eq!(stream.size_hint(), (0, Some(0))); - assert_eq!(stream.next().await, None); - - assert_eq!(stream.size_hint(), (0, Some(0))); - assert_eq!(stream.next().await, None); -} - -#[tokio::test] -async fn pending_first() { - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); - - let mut stream = task::spawn(rx1.chain(rx2)); - assert_eq!(stream.size_hint(), (0, None)); - - assert_pending!(stream.poll_next()); - - tx2.send(2).unwrap(); - assert!(!stream.is_woken()); - - assert_pending!(stream.poll_next()); - - tx1.send(1).unwrap(); - assert!(stream.is_woken()); - assert_eq!(Some(1), assert_ready!(stream.poll_next())); - - assert_pending!(stream.poll_next()); - - drop(tx1); - - assert_eq!(stream.size_hint(), (0, None)); - - assert!(stream.is_woken()); - assert_eq!(Some(2), assert_ready!(stream.poll_next())); - - assert_eq!(stream.size_hint(), (0, None)); - - drop(tx2); - - assert_eq!(stream.size_hint(), (0, None)); - assert_eq!(None, assert_ready!(stream.poll_next())); -} - -#[test] -fn size_overflow() { - struct Monster; - - impl tokio::stream::Stream for Monster { - type Item = (); - fn poll_next( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - panic!() - } - - fn size_hint(&self) -> (usize, Option) { - (usize::max_value(), Some(usize::max_value())) - } - } - - let m1 = Monster; - let m2 = Monster; - let m = m1.chain(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); -} diff --git a/tests/stream_collect.rs b/tests/stream_collect.rs deleted file mode 100644 index 7ab1a34..0000000 --- a/tests/stream_collect.rs +++ /dev/null @@ -1,137 +0,0 @@ -use tokio::stream::{self, StreamExt}; -use tokio::sync::mpsc; -use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; - -#[allow(clippy::let_unit_value)] -#[tokio::test] -async fn empty_unit() { - // Drains the stream. - let mut iter = vec![(), (), ()].into_iter(); - let _: () = stream::iter(&mut iter).collect().await; - assert!(iter.next().is_none()); -} - -#[tokio::test] -async fn empty_vec() { - let coll: Vec = stream::empty().collect().await; - assert!(coll.is_empty()); -} - -#[tokio::test] -async fn empty_box_slice() { - let coll: Box<[u32]> = stream::empty().collect().await; - assert!(coll.is_empty()); -} - -#[tokio::test] -async fn empty_string() { - let coll: String = stream::empty::<&str>().collect().await; - assert!(coll.is_empty()); -} - -#[tokio::test] -async fn empty_result() { - let coll: Result, &str> = stream::empty().collect().await; - assert_eq!(Ok(vec![]), coll); -} - -#[tokio::test] -async fn collect_vec_items() { - let (tx, rx) = mpsc::unbounded_channel(); - let mut fut = task::spawn(rx.collect::>()); - - assert_pending!(fut.poll()); - - tx.send(1).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - tx.send(2).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - drop(tx); - assert!(fut.is_woken()); - let coll = assert_ready!(fut.poll()); - assert_eq!(vec![1, 2], coll); -} - -#[tokio::test] -async fn collect_string_items() { - let (tx, rx) = mpsc::unbounded_channel(); - let mut fut = task::spawn(rx.collect::()); - - assert_pending!(fut.poll()); - - tx.send("hello ".to_string()).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - tx.send("world".to_string()).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - drop(tx); - assert!(fut.is_woken()); - let coll = assert_ready!(fut.poll()); - assert_eq!("hello world", coll); -} - -#[tokio::test] -async fn collect_str_items() { - let (tx, rx) = mpsc::unbounded_channel(); - let mut fut = task::spawn(rx.collect::()); - - assert_pending!(fut.poll()); - - tx.send("hello ").unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - tx.send("world").unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - drop(tx); - assert!(fut.is_woken()); - let coll = assert_ready!(fut.poll()); - assert_eq!("hello world", coll); -} - -#[tokio::test] -async fn collect_results_ok() { - let (tx, rx) = mpsc::unbounded_channel(); - let mut fut = task::spawn(rx.collect::>()); - - assert_pending!(fut.poll()); - - tx.send(Ok("hello ")).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - tx.send(Ok("world")).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - drop(tx); - assert!(fut.is_woken()); - let coll = assert_ready_ok!(fut.poll()); - assert_eq!("hello world", coll); -} - -#[tokio::test] -async fn collect_results_err() { - let (tx, rx) = mpsc::unbounded_channel(); - let mut fut = task::spawn(rx.collect::>()); - - assert_pending!(fut.poll()); - - tx.send(Ok("hello ")).unwrap(); - assert!(fut.is_woken()); - assert_pending!(fut.poll()); - - tx.send(Err("oh no")).unwrap(); - assert!(fut.is_woken()); - let err = assert_ready_err!(fut.poll()); - assert_eq!("oh no", err); -} diff --git a/tests/stream_empty.rs b/tests/stream_empty.rs deleted file mode 100644 index f278076..0000000 --- a/tests/stream_empty.rs +++ /dev/null @@ -1,11 +0,0 @@ -use tokio::stream::{self, Stream, StreamExt}; - -#[tokio::test] -async fn basic_usage() { - let mut stream = stream::empty::(); - - for _ in 0..2 { - assert_eq!(stream.size_hint(), (0, Some(0))); - assert_eq!(None, stream.next().await); - } -} diff --git a/tests/stream_fuse.rs b/tests/stream_fuse.rs deleted file mode 100644 index 9d7d969..0000000 --- a/tests/stream_fuse.rs +++ /dev/null @@ -1,50 +0,0 @@ -use tokio::stream::{Stream, StreamExt}; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -// a stream which alternates between Some and None -struct Alternate { - state: i32, -} - -impl Stream for Alternate { - type Item = i32; - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let val = self.state; - self.state += 1; - - // if it's even, Some(i32), else None - if val % 2 == 0 { - Poll::Ready(Some(val)) - } else { - Poll::Ready(None) - } - } -} - -#[tokio::test] -async fn basic_usage() { - let mut stream = Alternate { state: 0 }; - - // the stream goes back and forth - assert_eq!(stream.next().await, Some(0)); - assert_eq!(stream.next().await, None); - assert_eq!(stream.next().await, Some(2)); - assert_eq!(stream.next().await, None); - - // however, once it is fused - let mut stream = stream.fuse(); - - assert_eq!(stream.size_hint(), (0, None)); - assert_eq!(stream.next().await, Some(4)); - - assert_eq!(stream.size_hint(), (0, None)); - assert_eq!(stream.next().await, None); - - // it will always return `None` after the first time. - assert_eq!(stream.size_hint(), (0, Some(0))); - assert_eq!(stream.next().await, None); - assert_eq!(stream.size_hint(), (0, Some(0))); -} diff --git a/tests/stream_iter.rs b/tests/stream_iter.rs deleted file mode 100644 index 45148a7..0000000 --- a/tests/stream_iter.rs +++ /dev/null @@ -1,18 +0,0 @@ -use tokio::stream; -use tokio_test::task; - -use std::iter; - -#[tokio::test] -async fn coop() { - let mut stream = task::spawn(stream::iter(iter::repeat(1))); - - for _ in 0..10_000 { - if stream.poll_next().is_pending() { - assert!(stream.is_woken()); - return; - } - } - - panic!("did not yield"); -} diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs deleted file mode 100644 index 45ecdcb..0000000 --- a/tests/stream_merge.rs +++ /dev/null @@ -1,78 +0,0 @@ -use tokio::stream::{self, Stream, StreamExt}; -use tokio::sync::mpsc; -use tokio_test::task; -use tokio_test::{assert_pending, assert_ready}; - -#[tokio::test] -async fn merge_sync_streams() { - let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5])); - - for i in 0..7 { - let rem = 7 - i; - assert_eq!(s.size_hint(), (rem, Some(rem))); - assert_eq!(Some(i), s.next().await); - } - - assert!(s.next().await.is_none()); -} - -#[tokio::test] -async fn merge_async_streams() { - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); - - let mut rx = task::spawn(rx1.merge(rx2)); - - assert_eq!(rx.size_hint(), (0, None)); - - assert_pending!(rx.poll_next()); - - tx1.send(1).unwrap(); - - assert!(rx.is_woken()); - assert_eq!(Some(1), assert_ready!(rx.poll_next())); - - assert_pending!(rx.poll_next()); - tx2.send(2).unwrap(); - - assert!(rx.is_woken()); - assert_eq!(Some(2), assert_ready!(rx.poll_next())); - assert_pending!(rx.poll_next()); - - drop(tx1); - assert!(rx.is_woken()); - assert_pending!(rx.poll_next()); - - tx2.send(3).unwrap(); - assert!(rx.is_woken()); - assert_eq!(Some(3), assert_ready!(rx.poll_next())); - assert_pending!(rx.poll_next()); - - drop(tx2); - assert!(rx.is_woken()); - assert_eq!(None, assert_ready!(rx.poll_next())); -} - -#[test] -fn size_overflow() { - struct Monster; - - impl tokio::stream::Stream for Monster { - type Item = (); - fn poll_next( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - panic!() - } - - fn size_hint(&self) -> (usize, Option) { - (usize::max_value(), Some(usize::max_value())) - } - } - - let m1 = Monster; - let m2 = Monster; - let m = m1.merge(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); -} diff --git a/tests/stream_once.rs b/tests/stream_once.rs deleted file mode 100644 index bb4635a..0000000 --- a/tests/stream_once.rs +++ /dev/null @@ -1,12 +0,0 @@ -use tokio::stream::{self, Stream, StreamExt}; - -#[tokio::test] -async fn basic_usage() { - let mut one = stream::once(1); - - assert_eq!(one.size_hint(), (1, Some(1))); - assert_eq!(Some(1), one.next().await); - - assert_eq!(one.size_hint(), (0, Some(0))); - assert_eq!(None, one.next().await); -} diff --git a/tests/stream_pending.rs b/tests/stream_pending.rs deleted file mode 100644 index f4d3080..0000000 --- a/tests/stream_pending.rs +++ /dev/null @@ -1,14 +0,0 @@ -use tokio::stream::{self, Stream, StreamExt}; -use tokio_test::{assert_pending, task}; - -#[tokio::test] -async fn basic_usage() { - let mut stream = stream::pending::(); - - for _ in 0..2 { - assert_eq!(stream.size_hint(), (0, None)); - - let mut next = task::spawn(async { stream.next().await }); - assert_pending!(next.poll()); - } -} diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs deleted file mode 100644 index 38bb0c5..0000000 --- a/tests/stream_stream_map.rs +++ /dev/null @@ -1,372 +0,0 @@ -use tokio::stream::{self, pending, Stream, StreamExt, StreamMap}; -use tokio::sync::mpsc; -use tokio_test::{assert_ok, assert_pending, assert_ready, task}; - -use std::pin::Pin; - -macro_rules! assert_ready_some { - ($($t:tt)*) => { - match assert_ready!($($t)*) { - Some(v) => v, - None => panic!("expected `Some`, got `None`"), - } - }; -} - -macro_rules! assert_ready_none { - ($($t:tt)*) => { - match assert_ready!($($t)*) { - None => {} - Some(v) => panic!("expected `None`, got `Some({:?})`", v), - } - }; -} - -#[tokio::test] -async fn empty() { - let mut map = StreamMap::<&str, stream::Pending<()>>::new(); - - assert_eq!(map.len(), 0); - assert!(map.is_empty()); - - assert!(map.next().await.is_none()); - assert!(map.next().await.is_none()); - - assert!(map.remove("foo").is_none()); -} - -#[tokio::test] -async fn single_entry() { - let mut map = task::spawn(StreamMap::new()); - let (tx, rx) = mpsc::unbounded_channel(); - - assert_ready_none!(map.poll_next()); - - assert!(map.insert("foo", rx).is_none()); - assert!(map.contains_key("foo")); - assert!(!map.contains_key("bar")); - - assert_eq!(map.len(), 1); - assert!(!map.is_empty()); - - assert_pending!(map.poll_next()); - - assert_ok!(tx.send(1)); - - assert!(map.is_woken()); - let (k, v) = assert_ready_some!(map.poll_next()); - assert_eq!(k, "foo"); - assert_eq!(v, 1); - - assert_pending!(map.poll_next()); - - assert_ok!(tx.send(2)); - - assert!(map.is_woken()); - let (k, v) = assert_ready_some!(map.poll_next()); - assert_eq!(k, "foo"); - assert_eq!(v, 2); - - assert_pending!(map.poll_next()); - drop(tx); - assert!(map.is_woken()); - assert_ready_none!(map.poll_next()); -} - -#[tokio::test] -async fn multiple_entries() { - let mut map = task::spawn(StreamMap::new()); - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); - - map.insert("foo", rx1); - map.insert("bar", rx2); - - assert_pending!(map.poll_next()); - - assert_ok!(tx1.send(1)); - - assert!(map.is_woken()); - let (k, v) = assert_ready_some!(map.poll_next()); - assert_eq!(k, "foo"); - assert_eq!(v, 1); - - assert_pending!(map.poll_next()); - - assert_ok!(tx2.send(2)); - - assert!(map.is_woken()); - let (k, v) = assert_ready_some!(map.poll_next()); - assert_eq!(k, "bar"); - assert_eq!(v, 2); - - assert_pending!(map.poll_next()); - - assert_ok!(tx1.send(3)); - assert_ok!(tx2.send(4)); - - assert!(map.is_woken()); - - // Given the randomization, there is no guarantee what order the values will - // be received in. - let mut v = (0..2) - .map(|_| assert_ready_some!(map.poll_next())) - .collect::>(); - - assert_pending!(map.poll_next()); - - v.sort_unstable(); - assert_eq!(v[0].0, "bar"); - assert_eq!(v[0].1, 4); - assert_eq!(v[1].0, "foo"); - assert_eq!(v[1].1, 3); - - drop(tx1); - assert!(map.is_woken()); - assert_pending!(map.poll_next()); - drop(tx2); - - assert_ready_none!(map.poll_next()); -} - -#[tokio::test] -async fn insert_remove() { - let mut map = task::spawn(StreamMap::new()); - let (tx, rx) = mpsc::unbounded_channel(); - - assert_ready_none!(map.poll_next()); - - assert!(map.insert("foo", rx).is_none()); - let rx = map.remove("foo").unwrap(); - - assert_ok!(tx.send(1)); - - assert!(!map.is_woken()); - assert_ready_none!(map.poll_next()); - - assert!(map.insert("bar", rx).is_none()); - - let v = assert_ready_some!(map.poll_next()); - assert_eq!(v.0, "bar"); - assert_eq!(v.1, 1); - - assert!(map.remove("bar").is_some()); - assert_ready_none!(map.poll_next()); - - assert!(map.is_empty()); - assert_eq!(0, map.len()); -} - -#[tokio::test] -async fn replace() { - let mut map = task::spawn(StreamMap::new()); - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); - - assert!(map.insert("foo", rx1).is_none()); - - assert_pending!(map.poll_next()); - - let _rx1 = map.insert("foo", rx2).unwrap(); - - assert_pending!(map.poll_next()); - - tx1.send(1).unwrap(); - assert_pending!(map.poll_next()); - - tx2.send(2).unwrap(); - assert!(map.is_woken()); - let v = assert_ready_some!(map.poll_next()); - assert_eq!(v.0, "foo"); - assert_eq!(v.1, 2); -} - -#[test] -fn size_hint_with_upper() { - let mut map = StreamMap::new(); - - map.insert("a", stream::iter(vec![1])); - map.insert("b", stream::iter(vec![1, 2])); - map.insert("c", stream::iter(vec![1, 2, 3])); - - assert_eq!(3, map.len()); - assert!(!map.is_empty()); - - let size_hint = map.size_hint(); - assert_eq!(size_hint, (6, Some(6))); -} - -#[test] -fn size_hint_without_upper() { - let mut map = StreamMap::new(); - - map.insert("a", pin_box(stream::iter(vec![1]))); - map.insert("b", pin_box(stream::iter(vec![1, 2]))); - map.insert("c", pin_box(pending())); - - let size_hint = map.size_hint(); - assert_eq!(size_hint, (3, None)); -} - -#[test] -fn new_capacity_zero() { - let map = StreamMap::<&str, stream::Pending<()>>::new(); - assert_eq!(0, map.capacity()); - - assert!(map.keys().next().is_none()); -} - -#[test] -fn with_capacity() { - let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); - assert!(10 <= map.capacity()); - - assert!(map.keys().next().is_none()); -} - -#[test] -fn iter_keys() { - let mut map = StreamMap::new(); - - map.insert("a", pending::()); - map.insert("b", pending()); - map.insert("c", pending()); - - let mut keys = map.keys().collect::>(); - keys.sort_unstable(); - - assert_eq!(&keys[..], &[&"a", &"b", &"c"]); -} - -#[test] -fn iter_values() { - let mut map = StreamMap::new(); - - map.insert("a", stream::iter(vec![1])); - map.insert("b", stream::iter(vec![1, 2])); - map.insert("c", stream::iter(vec![1, 2, 3])); - - let mut size_hints = map.values().map(|s| s.size_hint().0).collect::>(); - - size_hints.sort_unstable(); - - assert_eq!(&size_hints[..], &[1, 2, 3]); -} - -#[test] -fn iter_values_mut() { - let mut map = StreamMap::new(); - - map.insert("a", stream::iter(vec![1])); - map.insert("b", stream::iter(vec![1, 2])); - map.insert("c", stream::iter(vec![1, 2, 3])); - - let mut size_hints = map - .values_mut() - .map(|s: &mut _| s.size_hint().0) - .collect::>(); - - size_hints.sort_unstable(); - - assert_eq!(&size_hints[..], &[1, 2, 3]); -} - -#[test] -fn clear() { - let mut map = task::spawn(StreamMap::new()); - - map.insert("a", stream::iter(vec![1])); - map.insert("b", stream::iter(vec![1, 2])); - map.insert("c", stream::iter(vec![1, 2, 3])); - - assert_ready_some!(map.poll_next()); - - map.clear(); - - assert_ready_none!(map.poll_next()); - assert!(map.is_empty()); -} - -#[test] -fn contains_key_borrow() { - let mut map = StreamMap::new(); - map.insert("foo".to_string(), pending::<()>()); - - assert!(map.contains_key("foo")); -} - -#[test] -fn one_ready_many_none() { - // Run a few times because of randomness - for _ in 0..100 { - let mut map = task::spawn(StreamMap::new()); - - map.insert(0, pin_box(stream::empty())); - map.insert(1, pin_box(stream::empty())); - map.insert(2, pin_box(stream::once("hello"))); - map.insert(3, pin_box(stream::pending())); - - let v = assert_ready_some!(map.poll_next()); - assert_eq!(v, (2, "hello")); - } -} - -proptest::proptest! { - #[test] - fn fuzz_pending_complete_mix(kinds: Vec) { - use std::task::{Context, Poll}; - - struct DidPoll { - did_poll: bool, - inner: T, - } - - impl Stream for DidPoll { - type Item = T::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { - self.did_poll = true; - Pin::new(&mut self.inner).poll_next(cx) - } - } - - for _ in 0..10 { - let mut map = task::spawn(StreamMap::new()); - let mut expect = 0; - - for (i, &is_empty) in kinds.iter().enumerate() { - let inner = if is_empty { - pin_box(stream::empty::<()>()) - } else { - expect += 1; - pin_box(stream::pending::<()>()) - }; - - let stream = DidPoll { - did_poll: false, - inner, - }; - - map.insert(i, stream); - } - - if expect == 0 { - assert_ready_none!(map.poll_next()); - } else { - assert_pending!(map.poll_next()); - - assert_eq!(expect, map.values().count()); - - for stream in map.values() { - assert!(stream.did_poll); - } - } - } - } -} - -fn pin_box + 'static, U>(s: T) -> Pin>> { - Box::pin(s) -} diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs deleted file mode 100644 index a787bba..0000000 --- a/tests/stream_timeout.rs +++ /dev/null @@ -1,109 +0,0 @@ -#![cfg(feature = "full")] - -use tokio::stream::{self, StreamExt}; -use tokio::time::{self, sleep, Duration}; -use tokio_test::*; - -use futures::StreamExt as _; - -async fn maybe_sleep(idx: i32) -> i32 { - if idx % 2 == 0 { - sleep(ms(200)).await; - } - idx -} - -fn ms(n: u64) -> Duration { - Duration::from_millis(n) -} - -#[tokio::test] -async fn basic_usage() { - time::pause(); - - // Items 2 and 4 time out. If we run the stream until it completes, - // we end up with the following items: - // - // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] - - let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); - let mut stream = task::spawn(stream); - - // First item completes immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - - // Second item is delayed 200ms, times out after 100ms - assert_pending!(stream.poll_next()); - - time::advance(ms(150)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); - - assert_pending!(stream.poll_next()); - - time::advance(ms(100)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(2))); - - // Third item is ready immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - - // Fourth item is delayed 200ms, times out after 100ms - assert_pending!(stream.poll_next()); - - time::advance(ms(60)).await; - assert_pending!(stream.poll_next()); // nothing ready yet - - time::advance(ms(60)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); // timeout! - - time::advance(ms(120)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(4))); - - // Done. - assert_ready_eq!(stream.poll_next(), None); -} - -#[tokio::test] -async fn return_elapsed_errors_only_once() { - time::pause(); - - let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); - let mut stream = task::spawn(stream); - - // First item completes immediately - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - - // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` - // error is returned. - assert_pending!(stream.poll_next()); - // - time::advance(ms(50)).await; - let v = assert_ready!(stream.poll_next()); - assert!(v.unwrap().is_err()); // timeout! - - // deadline elapses again, but no error is returned - time::advance(ms(50)).await; - assert_pending!(stream.poll_next()); - - time::advance(ms(100)).await; - assert_ready_eq!(stream.poll_next(), Some(Ok(2))); - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - - // Done - assert_ready_eq!(stream.poll_next(), None); -} - -#[tokio::test] -async fn no_timeouts() { - let stream = stream::iter(vec![1, 3, 5]) - .then(maybe_sleep) - .timeout(ms(100)); - - let mut stream = task::spawn(stream); - - assert_ready_eq!(stream.poll_next(), Some(Ok(1))); - assert_ready_eq!(stream.poll_next(), Some(Ok(3))); - assert_ready_eq!(stream.poll_next(), Some(Ok(5))); - assert_ready_eq!(stream.poll_next(), None); -} diff --git a/tests/support/mpsc_stream.rs b/tests/support/mpsc_stream.rs new file mode 100644 index 0000000..aa385a3 --- /dev/null +++ b/tests/support/mpsc_stream.rs @@ -0,0 +1,42 @@ +#![allow(dead_code)] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio_stream::Stream; + +struct UnboundedStream { + recv: UnboundedReceiver, +} +impl Stream for UnboundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + +pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { + let (tx, rx) = mpsc::unbounded_channel(); + + let stream = UnboundedStream { recv: rx }; + + (tx, stream) +} + +struct BoundedStream { + recv: Receiver, +} +impl Stream for BoundedStream { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + +pub fn channel_stream(size: usize) -> (Sender, impl Stream) { + let (tx, rx) = mpsc::channel(size); + + let stream = BoundedStream { recv: rx }; + + (tx, stream) +} diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs index 84c77a7..5f79800 100644 --- a/tests/sync_broadcast.rs +++ b/tests/sync_broadcast.rs @@ -89,46 +89,6 @@ fn send_two_recv() { assert_empty!(rx2); } -#[tokio::test] -async fn send_recv_into_stream_ready() { - use tokio::stream::StreamExt; - - let (tx, rx) = broadcast::channel::(8); - tokio::pin! { - let rx = rx.into_stream(); - } - - assert_ok!(tx.send(1)); - assert_ok!(tx.send(2)); - - assert_eq!(Some(Ok(1)), rx.next().await); - assert_eq!(Some(Ok(2)), rx.next().await); - - drop(tx); - - assert_eq!(None, rx.next().await); -} - -#[tokio::test] -async fn send_recv_into_stream_pending() { - use tokio::stream::StreamExt; - - let (tx, rx) = broadcast::channel::(8); - - tokio::pin! { - let rx = rx.into_stream(); - } - - let mut recv = task::spawn(rx.next()); - assert_pending!(recv.poll()); - - assert_ok!(tx.send(1)); - - assert!(recv.is_woken()); - let val = assert_ready!(recv.poll()); - assert_eq!(val, Some(Ok(1))); -} - #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs index adefcb1..b378e6b 100644 --- a/tests/sync_mpsc.rs +++ b/tests/sync_mpsc.rs @@ -5,7 +5,7 @@ use std::thread; use tokio::runtime::Runtime; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio::sync::mpsc::error::TrySendError; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, @@ -13,6 +13,10 @@ use tokio_test::{ use std::sync::Arc; +mod support { + pub(crate) mod mpsc_stream; +} + trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} impl AssertSend for mpsc::Receiver {} @@ -80,9 +84,10 @@ async fn reserve_disarm() { #[tokio::test] async fn send_recv_stream_with_buffer() { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; - let (tx, mut rx) = mpsc::channel::(16); + let (tx, rx) = support::mpsc_stream::channel_stream::(16); + let mut rx = Box::pin(rx); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -178,9 +183,11 @@ async fn async_send_recv_unbounded() { #[tokio::test] async fn send_recv_stream_unbounded() { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::(); + + let mut rx = Box::pin(rx); tokio::spawn(async move { assert_ok!(tx.send(1)); @@ -385,44 +392,6 @@ fn unconsumed_messages_are_dropped() { assert_eq!(1, Arc::strong_count(&msg)); } -#[test] -fn try_recv() { - let (tx, mut rx) = mpsc::channel(1); - match rx.try_recv() { - Err(TryRecvError::Empty) => {} - _ => panic!(), - } - tx.try_send(42).unwrap(); - match rx.try_recv() { - Ok(42) => {} - _ => panic!(), - } - drop(tx); - match rx.try_recv() { - Err(TryRecvError::Closed) => {} - _ => panic!(), - } -} - -#[test] -fn try_recv_unbounded() { - let (tx, mut rx) = mpsc::unbounded_channel(); - match rx.try_recv() { - Err(TryRecvError::Empty) => {} - _ => panic!(), - } - tx.send(42).unwrap(); - match rx.try_recv() { - Ok(42) => {} - _ => panic!(), - } - drop(tx); - match rx.try_recv() { - Err(TryRecvError::Closed) => {} - _ => panic!(), - } -} - #[test] fn blocking_recv() { let (tx, mut rx) = mpsc::channel::(1); @@ -483,3 +452,22 @@ async fn ready_close_cancel_bounded() { let val = assert_ready!(recv.poll()); assert!(val.is_none()); } + +#[tokio::test] +async fn permit_available_not_acquired_close() { + let (tx1, mut rx) = mpsc::channel::<()>(1); + let tx2 = tx1.clone(); + + let permit1 = assert_ok!(tx1.reserve().await); + + let mut permit2 = task::spawn(tx2.reserve()); + assert_pending!(permit2.poll()); + + rx.close(); + + drop(permit1); + assert!(permit2.is_woken()); + + drop(permit2); + assert!(rx.recv().await.is_none()); +} diff --git a/tests/sync_mutex.rs b/tests/sync_mutex.rs index 96194b3..0ddb203 100644 --- a/tests/sync_mutex.rs +++ b/tests/sync_mutex.rs @@ -91,10 +91,11 @@ async fn aborted_future_1() { let m2 = m1.clone(); // Try to lock mutex in a future that is aborted prematurely timeout(Duration::from_millis(1u64), async move { - let mut iv = interval(Duration::from_millis(1000)); + let iv = interval(Duration::from_millis(1000)); + tokio::pin!(iv); m2.lock().await; - iv.tick().await; - iv.tick().await; + iv.as_mut().tick().await; + iv.as_mut().tick().await; }) .await .unwrap_err(); diff --git a/tests/sync_mutex_owned.rs b/tests/sync_mutex_owned.rs index 394a670..0f1399c 100644 --- a/tests/sync_mutex_owned.rs +++ b/tests/sync_mutex_owned.rs @@ -58,10 +58,11 @@ async fn aborted_future_1() { let m2 = m1.clone(); // Try to lock mutex in a future that is aborted prematurely timeout(Duration::from_millis(1u64), async move { - let mut iv = interval(Duration::from_millis(1000)); + let iv = interval(Duration::from_millis(1000)); + tokio::pin!(iv); m2.lock_owned().await; - iv.tick().await; - iv.tick().await; + iv.as_mut().tick().await; + iv.as_mut().tick().await; }) .await .unwrap_err(); diff --git a/tests/sync_semaphore.rs b/tests/sync_semaphore.rs index 1cb0c74..a33b878 100644 --- a/tests/sync_semaphore.rs +++ b/tests/sync_semaphore.rs @@ -79,3 +79,17 @@ async fn stresstest() { let _p5 = sem.try_acquire().unwrap(); assert!(sem.try_acquire().is_err()); } + +#[test] +fn add_max_amount_permits() { + let s = tokio::sync::Semaphore::new(0); + s.add_permits(usize::MAX >> 3); + assert_eq!(s.available_permits(), usize::MAX >> 3); +} + +#[test] +#[should_panic] +fn add_more_than_max_amount_permits() { + let s = tokio::sync::Semaphore::new(1); + s.add_permits(usize::MAX >> 3); +} diff --git a/tests/task_abort.rs b/tests/task_abort.rs new file mode 100644 index 0000000..e84f19c --- /dev/null +++ b/tests/task_abort.rs @@ -0,0 +1,26 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +/// Checks that a suspended task can be aborted without panicking as reported in +/// issue #3157: . +#[test] +fn test_abort_without_panic_3157() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let handle = tokio::spawn(async move { + println!("task started"); + tokio::time::sleep(std::time::Duration::new(100, 0)).await + }); + + // wait for task to sleep. + tokio::time::sleep(std::time::Duration::new(1, 0)).await; + + handle.abort(); + let _ = handle.await; + }); +} diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index eec19cc..82bef8a 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -7,6 +7,10 @@ use tokio_test::assert_ok; use std::thread; use std::time::Duration; +mod support { + pub(crate) mod mpsc_stream; +} + #[tokio::test] async fn basic_blocking() { // Run a few times @@ -165,7 +169,8 @@ fn coop_disabled_in_block_in_place() { .build() .unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -175,7 +180,7 @@ fn coop_disabled_in_block_in_place() { let jh = tokio::spawn(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) @@ -195,7 +200,8 @@ fn coop_disabled_in_block_in_place_in_block_on() { thread::spawn(move || { let outer = tokio::runtime::Runtime::new().unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -204,7 +210,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { outer.block_on(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) diff --git a/tests/tcp_accept.rs b/tests/tcp_accept.rs index 4c0d682..5ffb946 100644 --- a/tests/tcp_accept.rs +++ b/tests/tcp_accept.rs @@ -46,7 +46,7 @@ use std::sync::{ Arc, }; use std::task::{Context, Poll}; -use tokio::stream::{Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt}; struct TrackPolls<'a> { npolls: Arc, @@ -88,7 +88,7 @@ async fn no_extra_poll() { assert_eq!(npolls.load(SeqCst), 1); let _ = assert_ok!(TcpStream::connect(&addr).await); - accepted_rx.next().await.unwrap(); + accepted_rx.recv().await.unwrap(); // should have been polled twice more: once to yield Some(), then once to yield Pending assert_eq!(npolls.load(SeqCst), 1 + 2); diff --git a/tests/tcp_connect.rs b/tests/tcp_connect.rs index 44942c4..cbe68fa 100644 --- a/tests/tcp_connect.rs +++ b/tests/tcp_connect.rs @@ -169,7 +169,7 @@ async fn connect_addr_host_str_port_tuple() { #[cfg(target_os = "linux")] mod linux { use tokio::net::{TcpListener, TcpStream}; - use tokio::prelude::*; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_test::assert_ok; use mio::unix::UnixReady; diff --git a/tests/tcp_echo.rs b/tests/tcp_echo.rs index d9cb456..5bb7ff0 100644 --- a/tests/tcp_echo.rs +++ b/tests/tcp_echo.rs @@ -1,8 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; use tokio::sync::oneshot; use tokio_test::assert_ok; diff --git a/tests/tcp_into_std.rs b/tests/tcp_into_std.rs new file mode 100644 index 0000000..a46aace --- /dev/null +++ b/tests/tcp_into_std.rs @@ -0,0 +1,44 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::io::Read; +use std::io::Result; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +#[tokio::test] +async fn tcp_into_std() -> Result<()> { + let mut data = [0u8; 12]; + let listener = TcpListener::bind("127.0.0.1:34254").await?; + + let handle = tokio::spawn(async { + let stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap(); + stream + }); + + let (tokio_tcp_stream, _) = listener.accept().await?; + let mut std_tcp_stream = tokio_tcp_stream.into_std()?; + std_tcp_stream + .set_nonblocking(false) + .expect("set_nonblocking call failed"); + + let mut client = handle.await.expect("The task being joined has panicked"); + client.write_all(b"Hello world!").await?; + + std_tcp_stream + .read_exact(&mut data) + .expect("std TcpStream read failed!"); + assert_eq!(b"Hello world!", &data); + + // test back to tokio stream + std_tcp_stream + .set_nonblocking(true) + .expect("set_nonblocking call failed"); + let mut tokio_tcp_stream = TcpStream::from_std(std_tcp_stream)?; + client.write_all(b"Hello tokio!").await?; + let _size = tokio_tcp_stream.read_exact(&mut data).await?; + assert_eq!(b"Hello tokio!", &data); + + Ok(()) +} diff --git a/tests/tcp_shutdown.rs b/tests/tcp_shutdown.rs index 615855f..536a161 100644 --- a/tests/tcp_shutdown.rs +++ b/tests/tcp_shutdown.rs @@ -1,9 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::{self, AsyncWriteExt}; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; use tokio_test::assert_ok; #[tokio::test] @@ -16,7 +15,7 @@ async fn shutdown() { assert_ok!(AsyncWriteExt::shutdown(&mut stream).await); - let mut buf = [0; 1]; + let mut buf = [0u8; 1]; let n = assert_ok!(stream.read(&mut buf).await); assert_eq!(n, 0); }); diff --git a/tests/time_interval.rs b/tests/time_interval.rs index 5ac6ae6..a3c7f08 100644 --- a/tests/time_interval.rs +++ b/tests/time_interval.rs @@ -44,20 +44,6 @@ async fn usage() { assert_pending!(poll_next(&mut i)); } -#[tokio::test] -async fn usage_stream() { - use tokio::stream::StreamExt; - - let start = Instant::now(); - let mut interval = time::interval(ms(10)); - - for _ in 0..3 { - interval.next().await.unwrap(); - } - - assert!(start.elapsed() > ms(20)); -} - fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| { tokio::pin! { diff --git a/tests/time_pause.rs b/tests/time_pause.rs new file mode 100644 index 0000000..49a7677 --- /dev/null +++ b/tests/time_pause.rs @@ -0,0 +1,33 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio_test::assert_err; + +#[tokio::test] +async fn pause_time_in_main() { + tokio::time::pause(); +} + +#[tokio::test] +async fn pause_time_in_task() { + let t = tokio::spawn(async { + tokio::time::pause(); + }); + + t.await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[should_panic] +async fn pause_time_in_main_threads() { + tokio::time::pause(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn pause_time_in_spawn_threads() { + let t = tokio::spawn(async { + tokio::time::pause(); + }); + + assert_err!(t.await); +} diff --git a/tests/time_rt.rs b/tests/time_rt.rs index 85db78d..0775343 100644 --- a/tests/time_rt.rs +++ b/tests/time_rt.rs @@ -68,7 +68,7 @@ async fn starving() { } let when = Instant::now() + Duration::from_millis(20); - let starve = Starve(sleep_until(when), 0); + let starve = Starve(Box::pin(sleep_until(when)), 0); starve.await; assert!(Instant::now() >= when); diff --git a/tests/time_sleep.rs b/tests/time_sleep.rs index 955d833..2736258 100644 --- a/tests/time_sleep.rs +++ b/tests/time_sleep.rs @@ -1,6 +1,11 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::future::Future; +use std::task::Context; + +use futures::task::noop_waker_ref; + use tokio::time::{self, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; @@ -30,6 +35,25 @@ async fn immediate_sleep() { assert_elapsed!(now, 0); } +#[tokio::test] +async fn is_elapsed() { + time::pause(); + + let sleep = time::sleep(Duration::from_millis(50)); + + tokio::pin!(sleep); + + assert!(!sleep.is_elapsed()); + + assert!(futures::poll!(sleep.as_mut()).is_pending()); + + assert!(!sleep.is_elapsed()); + + sleep.as_mut().await; + + assert!(sleep.is_elapsed()); +} + #[tokio::test] async fn delayed_sleep_level_0() { time::pause(); @@ -75,12 +99,12 @@ async fn reset_future_sleep_before_fire() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); - sleep.reset(Instant::now() + ms(200)); + sleep.as_mut().reset(Instant::now() + ms(200)); sleep.await; assert_elapsed!(now, 200); @@ -92,12 +116,12 @@ async fn reset_past_sleep_before_turn() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); - sleep.reset(now + ms(80)); + sleep.as_mut().reset(now + ms(80)); sleep.await; assert_elapsed!(now, 80); @@ -109,14 +133,14 @@ async fn reset_past_sleep_before_fire() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); let mut sleep = sleep.into_inner(); time::sleep(ms(10)).await; - sleep.reset(now + ms(80)); + sleep.as_mut().reset(now + ms(80)); sleep.await; assert_elapsed!(now, 80); @@ -127,12 +151,12 @@ async fn reset_future_sleep_after_fire() { time::pause(); let now = Instant::now(); - let mut sleep = time::sleep_until(now + ms(100)); + let mut sleep = Box::pin(time::sleep_until(now + ms(100))); - (&mut sleep).await; + sleep.as_mut().await; assert_elapsed!(now, 100); - sleep.reset(now + ms(110)); + sleep.as_mut().reset(now + ms(110)); sleep.await; assert_elapsed!(now, 110); } @@ -143,16 +167,17 @@ async fn reset_sleep_to_past() { let now = Instant::now(); - let mut sleep = task::spawn(time::sleep_until(now + ms(100))); + let mut sleep = task::spawn(Box::pin(time::sleep_until(now + ms(100)))); assert_pending!(sleep.poll()); time::sleep(ms(50)).await; assert!(!sleep.is_woken()); - sleep.reset(now + ms(40)); + sleep.as_mut().reset(now + ms(40)); - assert!(sleep.is_woken()); + // TODO: is this required? + //assert!(sleep.is_woken()); assert_ready!(sleep.poll()); } @@ -167,22 +192,110 @@ fn creating_sleep_outside_of_context() { let _fut = time::sleep_until(now + ms(500)); } -#[should_panic] #[tokio::test] async fn greater_than_max() { const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + time::pause(); time::sleep_until(Instant::now() + ms(YR_5)).await; } +#[tokio::test] +async fn short_sleeps() { + for i in 0..10000 { + if (i % 10) == 0 { + eprintln!("=== {}", i); + } + tokio::time::sleep(std::time::Duration::from_millis(0)).await; + } +} + +#[tokio::test] +async fn multi_long_sleeps() { + tokio::time::pause(); + + for _ in 0..5u32 { + tokio::time::sleep(Duration::from_secs( + // about a year + 365 * 24 * 3600, + )) + .await; + } + + let deadline = tokio::time::Instant::now() + + Duration::from_secs( + // about 10 years + 10 * 365 * 24 * 3600, + ); + + tokio::time::sleep_until(deadline).await; + + assert!(tokio::time::Instant::now() >= deadline); +} + +#[tokio::test] +async fn long_sleeps() { + tokio::time::pause(); + + let deadline = tokio::time::Instant::now() + + Duration::from_secs( + // about 10 years + 10 * 365 * 24 * 3600, + ); + + tokio::time::sleep_until(deadline).await; + + assert!(tokio::time::Instant::now() >= deadline); + assert!(tokio::time::Instant::now() <= deadline + Duration::from_millis(1)); +} + +#[tokio::test] +#[should_panic(expected = "Duration too far into the future")] +async fn very_long_sleeps() { + tokio::time::pause(); + + // Some platforms (eg macos) can't represent times this far in the future + if let Some(deadline) = tokio::time::Instant::now().checked_add(Duration::from_secs(1u64 << 62)) + { + tokio::time::sleep_until(deadline).await; + } else { + // make it pass anyway (we can't skip/ignore the test based on the + // result of checked_add) + panic!("Duration too far into the future (test ignored)") + } +} + +#[tokio::test] +async fn reset_after_firing() { + let timer = tokio::time::sleep(std::time::Duration::from_millis(1)); + tokio::pin!(timer); + + let deadline = timer.deadline(); + + timer.as_mut().await; + assert_ready!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); + timer + .as_mut() + .reset(tokio::time::Instant::now() + std::time::Duration::from_secs(600)); + + assert_ne!(deadline, timer.deadline()); + + assert_pending!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); + assert_pending!(timer + .as_mut() + .poll(&mut Context::from_waker(noop_waker_ref()))); +} + const NUM_LEVELS: usize = 6; const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; -#[should_panic] #[tokio::test] async fn exactly_max() { - // TODO: this should not panic but `time::ms()` is acting up - // If fixed, make sure to update documentation on `time::sleep` too. + time::pause(); time::sleep(ms(MAX_DURATION)).await; } @@ -195,3 +308,79 @@ async fn no_out_of_bounds_close_to_max() { fn ms(n: u64) -> Duration { Duration::from_millis(n) } + +#[tokio::test] +async fn drop_after_reschedule_at_new_scheduled_time() { + use futures::poll; + + tokio::time::pause(); + + let start = tokio::time::Instant::now(); + + let mut a = Box::pin(tokio::time::sleep(Duration::from_millis(5))); + let mut b = Box::pin(tokio::time::sleep(Duration::from_millis(5))); + let mut c = Box::pin(tokio::time::sleep(Duration::from_millis(10))); + + let _ = poll!(&mut a); + let _ = poll!(&mut b); + let _ = poll!(&mut c); + + b.as_mut().reset(start + Duration::from_millis(10)); + a.await; + + drop(b); +} + +#[tokio::test] +async fn drop_from_wake() { + use std::future::Future; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + use std::task::Context; + + let panicked = Arc::new(AtomicBool::new(false)); + let list: Arc>>>> = Arc::new(Mutex::new(Vec::new())); + + let arc_wake = Arc::new(DropWaker(panicked.clone(), list.clone())); + let arc_wake = futures::task::waker(arc_wake); + + tokio::time::pause(); + + let mut lock = list.lock().unwrap(); + + for _ in 0..100 { + let mut timer = Box::pin(tokio::time::sleep(Duration::from_millis(10))); + + let _ = timer.as_mut().poll(&mut Context::from_waker(&arc_wake)); + + lock.push(timer); + } + + drop(lock); + + tokio::time::sleep(Duration::from_millis(11)).await; + + assert!( + !panicked.load(Ordering::SeqCst), + "paniced when dropping timers" + ); + + #[derive(Clone)] + struct DropWaker( + Arc, + Arc>>>>, + ); + + impl futures::task::ArcWake for DropWaker { + fn wake_by_ref(arc_self: &Arc) { + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + *arc_self.1.lock().expect("panic in lock") = Vec::new() + })); + + if result.is_err() { + arc_self.0.store(true, Ordering::SeqCst); + } + } + } +} diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs deleted file mode 100644 index c886319..0000000 --- a/tests/time_throttle.rs +++ /dev/null @@ -1,28 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "full")] - -use tokio::stream::StreamExt; -use tokio::time; -use tokio_test::*; - -use std::time::Duration; - -#[tokio::test] -async fn usage() { - time::pause(); - - let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100))); - - assert_ready!(stream.poll_next()); - assert_pending!(stream.poll_next()); - - time::advance(Duration::from_millis(90)).await; - - assert_pending!(stream.poll_next()); - - time::advance(Duration::from_millis(101)).await; - - assert!(stream.is_woken()); - - assert_ready!(stream.poll_next()); -} diff --git a/tests/udp.rs b/tests/udp.rs index 291267e..7cbba1b 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -66,7 +66,7 @@ async fn send_to_recv_from_poll() -> std::io::Result<()> { let receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver_addr = receiver.local_addr()?; - poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; let mut recv_buf = [0u8; 32]; let mut read = ReadBuf::new(&mut recv_buf); @@ -83,7 +83,7 @@ async fn send_to_peek_from() -> std::io::Result<()> { let receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver_addr = receiver.local_addr()?; - poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; // peek let mut recv_buf = [0u8; 32]; @@ -111,7 +111,7 @@ async fn send_to_peek_from_poll() -> std::io::Result<()> { let receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver_addr = receiver.local_addr()?; - poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?; let mut recv_buf = [0u8; 32]; let mut read = ReadBuf::new(&mut recv_buf); @@ -192,7 +192,7 @@ async fn split_chan_poll() -> std::io::Result<()> { let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec, std::net::SocketAddr)>(1_000); tokio::spawn(async move { while let Some((bytes, addr)) = rx.recv().await { - poll_fn(|cx| s.poll_send_to(cx, &bytes, &addr)) + poll_fn(|cx| s.poll_send_to(cx, &bytes, addr)) .await .unwrap(); } @@ -209,7 +209,7 @@ async fn split_chan_poll() -> std::io::Result<()> { // test that we can send a value and get back some response let sender = UdpSocket::bind("127.0.0.1:0").await?; - poll_fn(|cx| sender.poll_send_to(cx, MSG, &addr)).await?; + poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?; let mut recv_buf = [0u8; 32]; let mut read = ReadBuf::new(&mut recv_buf); diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs index ec2f6f8..cdabd7b 100644 --- a/tests/uds_datagram.rs +++ b/tests/uds_datagram.rs @@ -2,6 +2,8 @@ #![cfg(feature = "full")] #![cfg(unix)] +use futures::future::poll_fn; +use tokio::io::ReadBuf; use tokio::net::UnixDatagram; use tokio::try_join; @@ -82,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Send until we hit the OS `net.unix.max_dgram_qlen`. loop { + dgram1.writable().await.unwrap(); + match dgram1.try_send(payload) { Err(err) => match err.kind() { io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, @@ -96,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Read every dgram we sent. while count > 0 { + dgram2.readable().await.unwrap(); let len = dgram2.try_recv(&mut recv_buf[..])?; assert_eq!(len, payload.len()); assert_eq!(payload, &recv_buf[..len]); @@ -134,3 +139,94 @@ async fn split() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn send_to_recv_from_poll() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let sender_path = dir.path().join("sender.sock"); + let receiver_path = dir.path().join("receiver.sock"); + + let sender = UnixDatagram::bind(&sender_path)?; + let receiver = UnixDatagram::bind(&receiver_path)?; + + let msg = b"hello"; + poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + assert_eq!(addr.as_pathname(), Some(sender_path.as_ref())); + Ok(()) +} + +#[tokio::test] +async fn send_recv_poll() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let sender_path = dir.path().join("sender.sock"); + let receiver_path = dir.path().join("receiver.sock"); + + let sender = UnixDatagram::bind(&sender_path)?; + let receiver = UnixDatagram::bind(&receiver_path)?; + + sender.connect(&receiver_path)?; + receiver.connect(&sender_path)?; + + let msg = b"hello"; + poll_fn(|cx| sender.poll_send(cx, msg)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + Ok(()) +} + +#[tokio::test] +async fn try_send_to_recv_from() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + // Create listener + let server = UnixDatagram::bind(&server_path)?; + + // Create socket pair + let client = UnixDatagram::bind(&client_path)?; + + for _ in 0..5 { + loop { + client.writable().await?; + + match client.try_send_to(b"hello world", &server_path) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await?; + + let mut buf = [0; 512]; + + match server.try_recv_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr.as_pathname(), Some(client_path.as_ref())); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } + + Ok(()) +} diff --git a/tests/uds_split.rs b/tests/uds_split.rs index 76ff461..8161423 100644 --- a/tests/uds_split.rs +++ b/tests/uds_split.rs @@ -2,8 +2,8 @@ #![cfg(feature = "full")] #![cfg(unix)] +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::UnixStream; -use tokio::prelude::*; /// Checks that `UnixStream` can be split into a read half and a write half using /// `UnixStream::split` and `UnixStream::split_mut`. diff --git a/tests/uds_stream.rs b/tests/uds_stream.rs index cd557e5..5160f17 100644 --- a/tests/uds_stream.rs +++ b/tests/uds_stream.rs @@ -2,10 +2,14 @@ #![warn(rust_2018_idioms)] #![cfg(unix)] -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::io; +use std::task::Poll; + +use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; use tokio::net::{UnixListener, UnixStream}; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; -use futures::future::try_join; +use futures::future::{poll_fn, try_join}; #[tokio::test] async fn accept_read_write() -> std::io::Result<()> { @@ -56,3 +60,195 @@ async fn shutdown() -> std::io::Result<()> { assert_eq!(n, 0); Ok(()) } + +#[tokio::test] +async fn try_read_write() -> std::io::Result<()> { + let msg = b"hello world"; + + let dir = tempfile::tempdir()?; + let bind_path = dir.path().join("bind.sock"); + + // Create listener + let listener = UnixListener::bind(&bind_path)?; + + // Create socket pair + let client = UnixStream::connect(&bind_path).await?; + + let (server, _) = listener.accept().await?; + let mut written = msg.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await?; + assert_eq!(msg.len(), client.try_write(msg)?); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(msg) { + Ok(n) => written.extend(&msg[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await?; + + match server.try_read(&mut read[i..]) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await?; + + if ready.is_read_closed() { + break; + } else { + tokio::task::yield_now().await; + } + } + + Ok(()) +} + +async fn create_pair() -> (UnixStream, UnixStream) { + let dir = assert_ok!(tempfile::tempdir()); + let bind_path = dir.path().join("bind.sock"); + + let listener = assert_ok!(UnixListener::bind(&bind_path)); + + let accept = listener.accept(); + let connect = UnixStream::connect(&bind_path); + let ((server, _), client) = assert_ok!(try_join(accept, connect).await); + + (client, server) +} + +macro_rules! assert_readable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await); + }; +} + +macro_rules! assert_not_readable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_read_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +macro_rules! assert_writable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); + }; +} + +macro_rules! assert_not_writable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_write_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +#[tokio::test] +async fn poll_read_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - not readable. + assert_not_readable_by_polling!(server); + + // There is data in the buffer - readable. + assert_ok!(client.write_all(b"ping").await); + assert_readable_by_polling!(server); + + // Readable until calls to `poll_read` return `Poll::Pending`. + let mut buf = [0u8; 4]; + assert_ok!(server.read_exact(&mut buf).await); + assert_readable_by_polling!(server); + read_until_pending(&mut server); + assert_not_readable_by_polling!(server); + + // Detect the client disconnect. + drop(client); + assert_readable_by_polling!(server); +} + +#[tokio::test] +async fn poll_write_ready() { + let (mut client, server) = create_pair().await; + + // Initial state - writable. + assert_writable_by_polling!(client); + + // No space to write - not writable. + write_until_pending(&mut client); + assert_not_writable_by_polling!(client); + + // Detect the server disconnect. + drop(server); + assert_writable_by_polling!(client); +} + +fn read_until_pending(stream: &mut UnixStream) { + let mut buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_read(&mut buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} + +fn write_until_pending(stream: &mut UnixStream) { + let buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_write(&buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} -- cgit v1.2.3