diff options
author | Joel Galenson <jgalenson@google.com> | 2021-05-25 14:35:44 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-05-25 14:35:44 +0000 |
commit | f8ee0f58266dc150af2b07766eef612908f0c885 (patch) | |
tree | f49654bde67a81be71689ee632d27bcd373dd536 | |
parent | 714e5c0e39012ec93e1e1323aa4fe4b9efef8c58 (diff) | |
parent | 8d095576f9e768d797d86f6b55daf4b19770e8ed (diff) | |
download | futures-f8ee0f58266dc150af2b07766eef612908f0c885.tar.gz |
Upgrade rust/crates/futures to 0.3.15 am: 2259736903 am: 188829b8d1 am: 8d095576f9
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures/+/1712420
Change-Id: I7588542b9fb72be6e648036e0166e545e468cedf
65 files changed, 1646 insertions, 1887 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f3ad3ab..ec6442e 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" + "sha1": "fc080d153bc7bf00429ec5e2b91e2f21f2243846" } } @@ -67,15 +67,16 @@ rust_library { } // dependent_library ["feature_list"] -// futures-channel-0.3.14 "alloc,futures-sink,sink,std" -// futures-core-0.3.14 "alloc,std" -// futures-executor-0.3.14 "std" -// futures-io-0.3.14 "std" -// futures-macro-0.3.14 -// futures-sink-0.3.14 "alloc,std" -// futures-task-0.3.14 "alloc,std" -// futures-util-0.3.14 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" -// memchr-2.3.4 "default,std" +// autocfg-1.0.1 +// futures-channel-0.3.15 "alloc,futures-sink,sink,std" +// futures-core-0.3.15 "alloc,std" +// futures-executor-0.3.15 "std" +// futures-io-0.3.15 "std" +// futures-macro-0.3.15 +// futures-sink-0.3.15 "alloc,std" +// futures-task-0.3.15 "alloc,std" +// futures-util-0.3.15 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" +// memchr-2.4.0 "default,std" // pin-project-lite-0.2.6 // pin-utils-0.1.0 // proc-macro-hack-0.5.19 @@ -83,5 +84,5 @@ rust_library { // proc-macro2-1.0.26 "default,proc-macro" // quote-1.0.9 "default,proc-macro" // slab-0.4.3 "default,std" -// syn-1.0.70 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" -// unicode-xid-0.2.1 "default" +// syn-1.0.72 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" +// unicode-xid-0.2.2 "default" @@ -13,7 +13,7 @@ [package] edition = "2018" name = "futures" -version = "0.3.13" +version = "0.3.15" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -30,33 +30,33 @@ rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] [dependencies.futures-channel] -version = "0.3.13" +version = "0.3.15" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.13" +version = "0.3.15" default-features = false [dependencies.futures-executor] -version = "0.3.13" +version = "0.3.15" optional = true default-features = false [dependencies.futures-io] -version = "0.3.13" +version = "0.3.15" default-features = false [dependencies.futures-sink] -version = "0.3.13" +version = "0.3.15" default-features = false [dependencies.futures-task] -version = "0.3.13" +version = "0.3.15" default-features = false [dependencies.futures-util] -version = "0.3.13" +version = "0.3.15" features = ["sink"] default-features = false [dev-dependencies.assert_matches] @@ -78,7 +78,7 @@ version = "0.1.11" alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] async-await = ["futures-util/async-await", "futures-util/async-await-macro"] bilock = ["futures-util/bilock"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] +cfg-target-has-atomic = [] compat = ["std", "futures-util/compat"] default = ["std", "async-await", "executor"] executor = ["std", "futures-executor/std"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index d046c54..c62e99e 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures" edition = "2018" -version = "0.3.13" +version = "0.3.15" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" readme = "../README.md" @@ -16,13 +16,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.13", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.13", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.13", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.13", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.15", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.15", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.15", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.15", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.15", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.15", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.15", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } @@ -47,11 +47,14 @@ thread-pool = ["executor", "futures-executor/thread-pool"] # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] bilock = ["futures-util/bilock"] read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] write-all-vectored = ["futures-util/write-all-vectored"] +# These features are no longer used. +# TODO: remove in the next major version. +cfg-target-has-atomic = [] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.13.crate" + value: "https://static.crates.io/crates/futures/futures-0.3.15.crate" } - version: "0.3.13" + version: "0.3.15" license_type: NOTICE last_upgrade_date { year: 2021 - month: 4 - day: 1 + month: 5 + day: 19 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index ec169ff..d940add 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -2,40 +2,142 @@ { "presubmit": [ { - "name": "anyhow_device_test_tests_test_ffi" + "name": "ZipFuseTest" }, { - "name": "anyhow_device_test_tests_test_context" + "name": "anyhow_device_test_src_lib" }, { - "name": "anyhow_device_test_tests_test_repr" + "name": "anyhow_device_test_tests_test_autotrait" + }, + { + "name": "anyhow_device_test_tests_test_boxed" + }, + { + "name": "anyhow_device_test_tests_test_chain" + }, + { + "name": "anyhow_device_test_tests_test_context" }, { "name": "anyhow_device_test_tests_test_convert" }, { + "name": "anyhow_device_test_tests_test_downcast" + }, + { + "name": "anyhow_device_test_tests_test_ffi" + }, + { "name": "anyhow_device_test_tests_test_fmt" }, { - "name": "anyhow_device_test_tests_test_boxed" + "name": "anyhow_device_test_tests_test_macros" }, { - "name": "anyhow_device_test_tests_test_downcast" + "name": "anyhow_device_test_tests_test_repr" }, { "name": "anyhow_device_test_tests_test_source" }, { - "name": "anyhow_device_test_tests_test_macros" + "name": "authfs_device_test_src_lib" }, { - "name": "anyhow_device_test_src_lib" + "name": "tokio_device_test_tests_buffered" }, { - "name": "anyhow_device_test_tests_test_autotrait" + "name": "tokio_device_test_tests_io_async_read" }, { - "name": "anyhow_device_test_tests_test_chain" + "name": "tokio_device_test_tests_io_copy_bidirectional" + }, + { + "name": "tokio_device_test_tests_io_lines" + }, + { + "name": "tokio_device_test_tests_io_mem_stream" + }, + { + "name": "tokio_device_test_tests_io_read" + }, + { + "name": "tokio_device_test_tests_io_read_buf" + }, + { + "name": "tokio_device_test_tests_io_read_to_end" + }, + { + "name": "tokio_device_test_tests_io_take" + }, + { + "name": "tokio_device_test_tests_io_write" + }, + { + "name": "tokio_device_test_tests_io_write_all" + }, + { + "name": "tokio_device_test_tests_io_write_buf" + }, + { + "name": "tokio_device_test_tests_io_write_int" + }, + { + "name": "tokio_device_test_tests_macros_join" + }, + { + "name": "tokio_device_test_tests_no_rt" + }, + { + "name": "tokio_device_test_tests_rt_basic" + }, + { + "name": "tokio_device_test_tests_rt_threaded" + }, + { + "name": "tokio_device_test_tests_sync_barrier" + }, + { + "name": "tokio_device_test_tests_sync_broadcast" + }, + { + "name": "tokio_device_test_tests_sync_errors" + }, + { + "name": "tokio_device_test_tests_sync_mpsc" + }, + { + "name": "tokio_device_test_tests_sync_mutex_owned" + }, + { + "name": "tokio_device_test_tests_sync_rwlock" + }, + { + "name": "tokio_device_test_tests_sync_watch" + }, + { + "name": "tokio_device_test_tests_task_local" + }, + { + "name": "tokio_device_test_tests_task_local_set" + }, + { + "name": "tokio_device_test_tests_tcp_accept" + }, + { + "name": "tokio_device_test_tests_tcp_echo" + }, + { + "name": "tokio_device_test_tests_tcp_into_std" + }, + { + "name": "tokio_device_test_tests_tcp_shutdown" + }, + { + "name": "tokio_device_test_tests_time_rt" + }, + { + "name": "tokio_device_test_tests_uds_split" } ] } @@ -3,12 +3,12 @@ //! This crate provides a number of core abstractions for writing asynchronous //! code: //! -//! - [Futures](crate::future::Future) are single eventual values produced by +//! - [Futures](crate::future) are single eventual values produced by //! asynchronous computations. Some programming languages (e.g. JavaScript) //! call this concept "promise". -//! - [Streams](crate::stream::Stream) represent a series of values +//! - [Streams](crate::stream) represent a series of values //! produced asynchronously. -//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of +//! - [Sinks](crate::sink) provide support for asynchronous writing of //! data. //! - [Executors](crate::executor) are responsible for running asynchronous //! tasks. @@ -29,7 +29,7 @@ //! # use futures::executor; ///standard executors to provide a context for futures and streams //! # use futures::executor::ThreadPool; //! # use futures::StreamExt; -//! +//! # //! fn main() { //! let pool = ThreadPool::new().expect("Failed to build pool"); //! let (tx, rx) = mpsc::unbounded::<i32>(); @@ -78,22 +78,15 @@ //! The majority of examples and code snippets in this crate assume that they are //! inside an async block as written above. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] - #![cfg_attr(not(feature = "std"), no_std)] - #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. #![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); diff --git a/tests/_require_features.rs b/tests/_require_features.rs index da76dcd..8046cc9 100644 --- a/tests/_require_features.rs +++ b/tests/_require_features.rs @@ -1,8 +1,13 @@ #[cfg(not(all( - feature = "std", feature = "alloc", feature = "async-await", - feature = "compat", feature = "io-compat", - feature = "executor", feature = "thread-pool", + feature = "std", + feature = "alloc", + feature = "async-await", + feature = "compat", + feature = "io-compat", + feature = "executor", + feature = "thread-pool", )))] -compile_error!("`futures` tests must have all stable features activated: \ +compile_error!( + "`futures` tests must have all stable features activated: \ use `--all-features` or `--features default,thread-pool,io-compat`" ); diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs deleted file mode 100644 index d19a83d..0000000 --- a/tests/arc_wake.rs +++ /dev/null @@ -1,82 +0,0 @@ -mod countingwaker { - use futures::task::{self, ArcWake, Waker}; - use std::sync::{Arc, Mutex}; - - struct CountingWaker { - nr_wake: Mutex<i32>, - } - - impl CountingWaker { - fn new() -> Self { - Self { - nr_wake: Mutex::new(0), - } - } - - fn wakes(&self) -> i32 { - *self.nr_wake.lock().unwrap() - } - } - - impl ArcWake for CountingWaker { - fn wake_by_ref(arc_self: &Arc<Self>) { - let mut lock = arc_self.nr_wake.lock().unwrap(); - *lock += 1; - } - } - - #[test] - fn create_from_arc() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!(2, Arc::strong_count(&some_w)); - w1.wake_by_ref(); - assert_eq!(1, some_w.wakes()); - - let w2 = w1.clone(); - assert_eq!(3, Arc::strong_count(&some_w)); - - w2.wake_by_ref(); - assert_eq!(2, some_w.wakes()); - - drop(w2); - assert_eq!(2, Arc::strong_count(&some_w)); - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); - } - - #[test] - fn ref_wake_same() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - let w2 = task::waker_ref(&some_w); - let w3 = w2.clone(); - - assert!(w1.will_wake(&w2)); - assert!(w2.will_wake(&w3)); - } -} - -#[test] -fn proper_refcount_on_wake_panic() { - use futures::task::{self, ArcWake, Waker}; - use std::sync::Arc; - - struct PanicWaker; - - impl ArcWake for PanicWaker { - fn wake_by_ref(_arc_self: &Arc<Self>) { - panic!("WAKE UP"); - } - } - - let some_w = Arc::new(PanicWaker); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()); - assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); // some_w -} diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs index bd586d6..3e461b5 100644 --- a/tests/async_await_macros.rs +++ b/tests/async_await_macros.rs @@ -1,9 +1,14 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, FutureExt}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use futures::{join, pending, pin_mut, poll, select, select_biased, try_join}; +use std::mem; + #[test] fn poll_and_pending() { - use futures::{pending, pin_mut, poll}; - use futures::executor::block_on; - use futures::task::Poll; - let pending_once = async { pending!() }; block_on(async { pin_mut!(pending_once); @@ -14,11 +19,6 @@ fn poll_and_pending() { #[test] fn join() { - use futures::{pin_mut, poll, join}; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::task::Poll; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = oneshot::channel::<i32>(); @@ -39,11 +39,6 @@ fn join() { #[test] fn select() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::<i32>(); let (_tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -62,11 +57,6 @@ fn select() { #[test] fn select_biased() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::select_biased; - let (tx1, rx1) = oneshot::channel::<i32>(); let (_tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -85,12 +75,6 @@ fn select_biased() { #[test] fn select_streams() { - use futures::select; - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - let (mut tx1, rx1) = mpsc::channel::<i32>(1); let (mut tx2, rx2) = mpsc::channel::<i32>(1); let mut rx1 = rx1.fuse(); @@ -134,11 +118,6 @@ fn select_streams() { #[test] fn select_can_move_uncompleted_futures() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -165,10 +144,6 @@ fn select_can_move_uncompleted_futures() { #[test] fn select_nested() { - use futures::select; - use futures::executor::block_on; - use futures::future; - let mut outer_fut = future::ready(1); let mut inner_fut = future::ready(2); let res = block_on(async { @@ -185,16 +160,13 @@ fn select_nested() { #[test] fn select_size() { - use futures::select; - use futures::future; - let fut = async { let mut ready = future::ready(0i32); select! { _ = ready => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 24); + assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let mut ready1 = future::ready(0i32); @@ -204,19 +176,13 @@ fn select_size() { _ = ready2 => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 40); + assert_eq!(mem::size_of_val(&fut), 40); } #[test] fn select_on_non_unpin_expressions() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -231,14 +197,8 @@ fn select_on_non_unpin_expressions() { #[test] fn select_on_non_unpin_expressions_with_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -254,13 +214,8 @@ fn select_on_non_unpin_expressions_with_default() { #[test] fn select_on_non_unpin_size() { - use futures::select; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let fut = async { let select_res; @@ -271,15 +226,11 @@ fn select_on_non_unpin_size() { select_res }; - assert_eq!(32, std::mem::size_of_val(&fut)); + assert_eq!(32, mem::size_of_val(&fut)); } #[test] fn select_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::ready(7) => x, @@ -291,11 +242,6 @@ fn select_can_be_used_as_expression() { #[test] fn select_with_default_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future::{FutureExt, poll_fn}; - use futures::task::{Context, Poll}; - fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> { Poll::Pending } @@ -312,10 +258,6 @@ fn select_with_default_can_be_used_as_expression() { #[test] fn select_with_complete_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::pending::<i32>() => x, @@ -330,10 +272,6 @@ fn select_with_complete_can_be_used_as_expression() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -351,10 +289,6 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -374,59 +308,42 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { #[test] fn join_size() { - use futures::join; - use futures::future; - let fut = async { let ready = future::ready(0i32); join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn try_join_size() { - use futures::try_join; - use futures::future; - let fut = async { let ready = future::ready(Ok::<i32, i32>(0)); try_join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(Ok::<i32, i32>(0)); let ready2 = future::ready(Ok::<i32, i32>(0)); try_join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn join_doesnt_require_unpin() { - use futures::join; - - let _ = async { - join!(async {}, async {}) - }; + let _ = async { join!(async {}, async {}) }; } #[test] fn try_join_doesnt_require_unpin() { - use futures::try_join; - - let _ = async { - try_join!( - async { Ok::<(), ()>(()) }, - async { Ok::<(), ()>(()) }, - ) - }; + let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; } diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs index 111fdf6..e0192a1 100644 --- a/tests/auto_traits.rs +++ b/tests/auto_traits.rs @@ -1383,6 +1383,26 @@ pub mod stream { assert_impl!(Next<'_, ()>: Unpin); assert_not_impl!(Next<'_, PhantomPinned>: Unpin); + assert_impl!(NextIf<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIf<'_, SendStream, ()>: Send); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin); + + assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin); + assert_impl!(Once<()>: Send); assert_not_impl!(Once<*const ()>: Send); assert_impl!(Once<()>: Sync); @@ -1780,25 +1800,40 @@ pub mod stream { assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin); assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin); - assert_not_impl!(futures_unordered::Iter<()>: Send); - assert_not_impl!(futures_unordered::Iter<()>: Sync); + assert_impl!(futures_unordered::Iter<()>: Send); + assert_not_impl!(futures_unordered::Iter<*const ()>: Send); + assert_impl!(futures_unordered::Iter<()>: Sync); + assert_not_impl!(futures_unordered::Iter<*const ()>: Sync); assert_impl!(futures_unordered::Iter<()>: Unpin); - // futures_unordered::Iter requires `Fut: Unpin` + // The definition of futures_unordered::Iter has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterMut<()>: Send); - assert_not_impl!(futures_unordered::IterMut<()>: Sync); + assert_impl!(futures_unordered::IterMut<()>: Send); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Send); + assert_impl!(futures_unordered::IterMut<()>: Sync); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync); assert_impl!(futures_unordered::IterMut<()>: Unpin); - // futures_unordered::IterMut requires `Fut: Unpin` + // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterPinMut<()>: Send); - assert_not_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_impl!(futures_unordered::IterPinMut<()>: Send); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send); + assert_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterPinRef<()>: Send); - assert_not_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin); } /// Assert Send/Sync/Unpin for all public types in `futures::task`. diff --git a/tests/compat.rs b/tests/compat.rs index 39adc7c..c4125d8 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -1,16 +1,14 @@ #![cfg(feature = "compat")] -use tokio::timer::Delay; -use tokio::runtime::Runtime; -use std::time::Instant; -use futures::prelude::*; use futures::compat::Future01CompatExt; +use futures::prelude::*; +use std::time::Instant; +use tokio::runtime::Runtime; +use tokio::timer::Delay; #[test] fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() { - let f = async { - Delay::new(Instant::now()).compat().await - }; + let f = async { Delay::new(Instant::now()).compat().await }; let mut runtime = Runtime::new().unwrap(); runtime.block_on(f.boxed().compat()).unwrap(); diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs index 11edb1b..9925077 100644 --- a/tests/eager_drop.rs +++ b/tests/eager_drop.rs @@ -1,16 +1,23 @@ +use futures::channel::oneshot; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use pin_project::pin_project; +use std::pin::Pin; +use std::sync::mpsc; + #[test] fn map_ok() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_ok` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::<Result<i32, i32>>(Err(1)) - .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_ok(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -22,17 +29,16 @@ fn map_ok() { #[test] fn map_err() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_err` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::<Result<i32, i32>>(Ok(1)) - .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_err(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -42,96 +48,74 @@ fn map_err() { rx2.recv().unwrap(); } -mod channelled { - use futures::future::Future; - use futures::task::{Context,Poll}; - use pin_project::pin_project; - use std::pin::Pin; - - #[pin_project] - struct FutureData<F, T> { - _data: T, - #[pin] - future: F, - } +#[pin_project] +struct FutureData<F, T> { + _data: T, + #[pin] + future: F, +} - impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { - type Output = F::Output; +impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { + type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { - self.project().future.poll(cx) - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { + self.project().future.poll(cx) } +} - #[test] - fn then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<()>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(()) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(()).unwrap(); - rx2.recv().unwrap(); - } +#[test] +fn then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); - #[test] - fn and_then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .and_then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Ok(())).unwrap(); - rx2.recv().unwrap(); - } + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); - #[test] - fn or_else_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .or_else(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready::<Result<(), ()>>(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Err(())).unwrap(); - rx2.recv().unwrap(); - } + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn and_then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn or_else_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::<Result<(), ()>>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); } diff --git a/tests/abortable.rs b/tests/future_abortable.rs index 6b5a25c..e119f0b 100644 --- a/tests/abortable.rs +++ b/tests/future_abortable.rs @@ -1,45 +1,44 @@ +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{abortable, Aborted, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::new_count_waker; + #[test] fn abortable_works() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted}; - use futures::executor::block_on; - let (_tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, abort_handle) = abortable(a_rx); abort_handle.abort(); + assert!(abortable_rx.is_aborted()); assert_eq!(Err(Aborted), block_on(abortable_rx)); } #[test] fn abortable_awakens() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted, FutureExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); + assert_eq!(counter, 0); assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx)); assert_eq!(counter, 0); + abort_handle.abort(); assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); } #[test] fn abortable_resolves() { - use futures::channel::oneshot; - use futures::future::abortable; - use futures::executor::block_on; let (tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, _abort_handle) = abortable(a_rx); tx.send(()).unwrap(); + assert!(!abortable_rx.is_aborted()); assert_eq!(Ok(Ok(())), block_on(abortable_rx)); } diff --git a/tests/basic_combinators.rs b/tests/future_basic_combinators.rs index fa65b6f..372ab48 100644 --- a/tests/basic_combinators.rs +++ b/tests/future_basic_combinators.rs @@ -13,17 +13,21 @@ fn basic_future_combinators() { tx1.send(x).unwrap(); // Send 1 tx1.send(2).unwrap(); // Send 2 future::ready(3) - }).map(move |x| { + }) + .map(move |x| { tx2.send(x).unwrap(); // Send 3 tx2.send(4).unwrap(); // Send 4 5 - }).map(move |x| { + }) + .map(move |x| { tx3.send(x).unwrap(); // Send 5 }); assert!(rx.try_recv().is_err()); // Not started yet fut.run_in_background(); // Start it - for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it + for i in 1..=5 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it assert!(rx.recv().is_err()); // Should be done } @@ -93,6 +97,8 @@ fn basic_try_future_combinators() { assert!(rx.try_recv().is_err()); // Not started yet fut.run_in_background(); // Start it - for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it + for i in 1..=12 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it assert!(rx.recv().is_err()); // Should be done } diff --git a/tests/fuse.rs b/tests/future_fuse.rs index 83f2c1c..83f2c1c 100644 --- a/tests/fuse.rs +++ b/tests/future_fuse.rs diff --git a/tests/future_inspect.rs b/tests/future_inspect.rs new file mode 100644 index 0000000..eacd1f7 --- /dev/null +++ b/tests/future_inspect.rs @@ -0,0 +1,16 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt}; + +#[test] +fn smoke() { + let mut counter = 0; + + { + let work = future::ready::<i32>(40).inspect(|val| { + counter += *val; + }); + assert_eq!(block_on(work), 40); + } + + assert_eq!(counter, 40); +} diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs new file mode 100644 index 0000000..ae05a21 --- /dev/null +++ b/tests/future_join_all.rs @@ -0,0 +1,42 @@ +use futures::executor::block_on; +use futures::future::{join_all, ready, Future, JoinAll}; +use std::fmt::Debug; + +fn assert_done<T, F>(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); + assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + // REVIEW: should this be implemented? + // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]); + + // TODO: needs more tests +} + +#[test] +fn join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `JoinAll`. + fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> { + let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); + Box::new(join_all(iter)) + } + + assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); +} + +#[test] +fn join_all_from_iter() { + assert_done( + || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()), + vec![1, 2], + ) +} diff --git a/tests/future_obj.rs b/tests/future_obj.rs index c6b18fc..0e52534 100644 --- a/tests/future_obj.rs +++ b/tests/future_obj.rs @@ -1,6 +1,6 @@ -use futures::future::{Future, FutureObj, FutureExt}; -use std::pin::Pin; +use futures::future::{Future, FutureExt, FutureObj}; use futures::task::{Context, Poll}; +use std::pin::Pin; #[test] fn dropping_does_not_segfault() { diff --git a/tests/select_all.rs b/tests/future_select_all.rs index 540db2c..299b479 100644 --- a/tests/select_all.rs +++ b/tests/future_select_all.rs @@ -1,14 +1,10 @@ +use futures::executor::block_on; +use futures::future::{ready, select_all}; +use std::collections::HashSet; + #[test] fn smoke() { - use futures::executor::block_on; - use futures::future::{ready, select_all}; - use std::collections::HashSet; - - let v = vec![ - ready(1), - ready(2), - ready(3), - ]; + let v = vec![ready(1), ready(2), ready(3)]; let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>(); diff --git a/tests/select_ok.rs b/tests/future_select_ok.rs index 81cadb7..8aec003 100644 --- a/tests/select_ok.rs +++ b/tests/future_select_ok.rs @@ -1,14 +1,9 @@ +use futures::executor::block_on; +use futures::future::{err, ok, select_ok}; + #[test] fn ignore_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - err(1), - err(2), - ok(3), - ok(4), - ]; + let v = vec![err(1), err(2), ok(3), ok(4)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 3); @@ -23,14 +18,7 @@ fn ignore_err() { #[test] fn last_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - ok(1), - err(2), - err(3), - ]; + let v = vec![ok(1), err(2), err(3)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 1); diff --git a/tests/shared.rs b/tests/future_shared.rs index cc0c6a2..718d6c4 100644 --- a/tests/shared.rs +++ b/tests/future_shared.rs @@ -1,22 +1,22 @@ -mod count_clone { - use std::cell::Cell; - use std::rc::Rc; - - pub struct CountClone(pub Rc<Cell<i32>>); - - impl Clone for CountClone { - fn clone(&self) -> Self { - self.0.set(self.0.get() + 1); - Self(self.0.clone()) - } +use futures::channel::oneshot; +use futures::executor::{block_on, LocalPool}; +use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; +use futures::task::LocalSpawn; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::task::Poll; +use std::thread; + +struct CountClone(Rc<Cell<i32>>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + Self(self.0.clone()) } } fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::thread; let (tx, rx) = oneshot::channel::<i32>(); let f = rx.shared(); let join_handles = (0..threads_number) @@ -53,11 +53,6 @@ fn many_threads() { #[test] fn drop_on_one_task_ok() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use std::thread; - let (tx, rx) = oneshot::channel::<u32>(); let f1 = rx.shared(); let f2 = f1.clone(); @@ -86,11 +81,6 @@ fn drop_on_one_task_ok() { #[test] fn drop_in_poll() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, LocalFutureObj}; - use std::cell::RefCell; - use std::rc::Rc; - let slot1 = Rc::new(RefCell::new(None)); let slot2 = slot1.clone(); @@ -108,11 +98,6 @@ fn drop_in_poll() { #[test] fn peek() { - use futures::channel::oneshot; - use futures::executor::LocalPool; - use futures::future::{FutureExt, LocalFutureObj}; - use futures::task::LocalSpawn; - let mut local_pool = LocalPool::new(); let spawn = &mut local_pool.spawner(); @@ -134,9 +119,7 @@ fn peek() { } // Once the Shared has been polled, the value is peekable on the clone. - spawn - .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))) - .unwrap(); + spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); local_pool.run(); for _ in 0..2 { assert_eq!(*f2.peek().unwrap(), Ok(42)); @@ -145,10 +128,6 @@ fn peek() { #[test] fn downgrade() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx, rx) = oneshot::channel::<i32>(); let shared = rx.shared(); // Since there are outstanding `Shared`s, we can get a `WeakShared`. @@ -173,14 +152,6 @@ fn downgrade() { #[test] fn dont_clone_in_single_owner_shared_future() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -193,14 +164,6 @@ fn dont_clone_in_single_owner_shared_future() { #[test] fn dont_do_unnecessary_clones_on_output() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -215,11 +178,6 @@ fn dont_do_unnecessary_clones_on_output() { #[test] fn shared_future_that_wakes_itself_until_pending_is_returned() { - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::task::Poll; - let proceed = Cell::new(false); let fut = futures::future::poll_fn(|cx| { if proceed.get() { @@ -233,8 +191,5 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() { // The join future can only complete if the second future gets a chance to run after the first // has returned pending - assert_eq!( - block_on(futures::future::join(fut, async { proceed.set(true) })), - ((), ()) - ); + assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs index 4a614f9..82ae1ba 100644 --- a/tests/future_try_flatten_stream.rs +++ b/tests/future_try_flatten_stream.rs @@ -1,9 +1,14 @@ +use futures::executor::block_on_stream; +use futures::future::{err, ok, TryFutureExt}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::stream::{self, StreamExt}; +use futures::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + #[test] fn successful_future() { - use futures::executor::block_on_stream; - use futures::future::{ok, TryFutureExt}; - use futures::stream::{self, StreamExt}; - let stream_items = vec![17, 19]; let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); @@ -17,15 +22,8 @@ fn successful_future() { #[test] fn failed_future() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::executor::block_on_stream; - use futures::future::{err, TryFutureExt}; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - struct PanickingStream<T, E> { - _marker: PhantomData<(T, E)> + _marker: PhantomData<(T, E)>, } impl<T, E> Stream for PanickingStream<T, E> { @@ -45,13 +43,6 @@ fn failed_future() { #[test] fn assert_impls() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::sink::Sink; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - use futures::future::{ok, TryFutureExt}; - struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>); impl<T, E, Item> Stream for StreamSink<T, E, Item> { diff --git a/tests/try_join_all.rs b/tests/future_try_join_all.rs index 8e579a2..a4b3bb7 100644 --- a/tests/try_join_all.rs +++ b/tests/future_try_join_all.rs @@ -1,24 +1,19 @@ -mod util { - use std::future::Future; - use futures::executor::block_on; - use std::fmt::Debug; - - pub fn assert_done<T, F>(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, - { - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } +use futures::executor::block_on; +use futures_util::future::{err, ok, try_join_all, TryJoinAll}; +use std::fmt::Debug; +use std::future::Future; + +fn assert_done<T, F>(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); } #[test] fn collect_collects() { - use futures_util::future::{err, ok, try_join_all}; - - use util::assert_done; - assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); @@ -30,11 +25,6 @@ fn collect_collects() { #[test] fn try_join_all_iter_lifetime() { - use futures_util::future::{ok, try_join_all}; - use std::future::Future; - - use util::assert_done; - // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `TryJoinAll`. fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> { @@ -42,15 +32,11 @@ fn try_join_all_iter_lifetime() { Box::new(try_join_all(iter)) } - assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); + assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); } #[test] fn try_join_all_from_iter() { - use futures_util::future::{ok, TryJoinAll}; - - use util::assert_done; - assert_done( || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()), Ok::<_, usize>(vec![1, 2]), diff --git a/tests/inspect.rs b/tests/inspect.rs deleted file mode 100644 index 375778b..0000000 --- a/tests/inspect.rs +++ /dev/null @@ -1,14 +0,0 @@ -#[test] -fn smoke() { - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - let mut counter = 0; - - { - let work = future::ready::<i32>(40).inspect(|val| { counter += *val; }); - assert_eq!(block_on(work), 40); - } - - assert_eq!(counter, 40); -} diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs index f8f9d14..d60df87 100644 --- a/tests/io_buf_reader.rs +++ b/tests/io_buf_reader.rs @@ -1,9 +1,17 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, + BufReader, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::cmp; +use std::io; +use std::pin::Pin; + macro_rules! run_fill_buf { ($reader:expr) => {{ - use futures_test::task::noop_context; - use futures::task::Poll; - use std::pin::Pin; - let mut cx = noop_context(); loop { if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { @@ -13,80 +21,65 @@ macro_rules! run_fill_buf { }}; } -mod util { - use futures::future::Future; - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } -mod maybe_pending { - use futures::task::{Context,Poll}; - use std::{cmp,io}; - use std::pin::Pin; - use futures::io::{AsyncRead,AsyncBufRead}; +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} - pub struct MaybePending<'a> { - inner: &'a [u8], - ready_read: bool, - ready_fill_buf: bool, +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } } +} - impl<'a> MaybePending<'a> { - pub fn new(inner: &'a [u8]) -> Self { - Self { inner, ready_read: false, ready_fill_buf: false } +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending } } +} - impl AsyncRead for MaybePending<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { - if self.ready_read { - self.ready_read = false; - Pin::new(&mut self.inner).poll_read(cx, buf) - } else { - self.ready_read = true; - Poll::Pending +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending } } - impl AsyncBufRead for MaybePending<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) - -> Poll<io::Result<&[u8]>> - { - if self.ready_fill_buf { - self.ready_fill_buf = false; - if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } - let len = cmp::min(2, self.inner.len()); - Poll::Ready(Ok(&self.inner[0..len])) - } else { - self.ready_fill_buf = true; - Poll::Pending - } - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.inner = &self.inner[amt..]; - } + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; } } #[test] fn test_buffered_reader() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, BufReader}; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, inner); @@ -124,11 +117,6 @@ fn test_buffered_reader() { #[test] fn test_buffered_reader_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom}; - use std::pin::Pin; - use util::run; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); @@ -144,13 +132,9 @@ fn test_buffered_reader_seek() { #[test] fn test_buffered_reader_seek_underflow() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom}; - use std::io; - // gimmick reader that yields its position modulo 256 for each byte struct PositionReader { - pos: u64 + pos: u64, } impl io::Read for PositionReader { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { @@ -181,7 +165,7 @@ fn test_buffered_reader_seek_underflow() { let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); - assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5)); + assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5)); assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); // the following seek will require two underlying seeks let expected = 9_223_372_036_854_775_802; @@ -194,10 +178,6 @@ fn test_buffered_reader_seek_underflow() { #[test] fn test_short_reads() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, AllowStdIo, BufReader}; - use std::io; - /// A dummy reader intended at testing short-reads propagation. struct ShortReader { lengths: Vec<usize>, @@ -227,10 +207,6 @@ fn test_short_reads() { #[test] fn maybe_pending() { - use futures::io::{AsyncReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); @@ -268,10 +244,6 @@ fn maybe_pending() { #[test] fn maybe_pending_buf_read() { - use futures::io::{AsyncBufReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); let mut reader = BufReader::with_capacity(2, inner); let mut v = Vec::new(); @@ -291,36 +263,32 @@ fn maybe_pending_buf_read() { // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 #[test] fn maybe_pending_seek() { - use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader, - Cursor, SeekFrom - }; - use futures::task::{Context,Poll}; - use std::io; - use std::pin::Pin; - use util::run; - pub struct MaybePendingSeek<'a> { + struct MaybePendingSeek<'a> { inner: Cursor<&'a [u8]>, ready: bool, } impl<'a> MaybePendingSeek<'a> { - pub fn new(inner: &'a [u8]) -> Self { + fn new(inner: &'a [u8]) -> Self { Self { inner: Cursor::new(inner), ready: true } } } impl AsyncRead for MaybePendingSeek<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { Pin::new(&mut self.inner).poll_read(cx, buf) } } impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<io::Result<&[u8]>> - { + fn poll_fill_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<&[u8]>> { let this: *mut Self = &mut *self as *mut _; Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) } @@ -331,9 +299,11 @@ fn maybe_pending_seek() { } impl AsyncSeek for MaybePendingSeek<'_> { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll<io::Result<u64>> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { if self.ready { self.ready = false; Pin::new(&mut self.inner).poll_seek(cx, pos) diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs index d58a6d8..b264cd5 100644 --- a/tests/io_buf_writer.rs +++ b/tests/io_buf_writer.rs @@ -1,67 +1,59 @@ -mod maybe_pending { - use futures::io::AsyncWrite; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - pub struct MaybePending { - pub inner: Vec<u8>, - ready: bool, - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io; +use std::pin::Pin; + +struct MaybePending { + inner: Vec<u8>, + ready: bool, +} - impl MaybePending { - pub fn new(inner: Vec<u8>) -> Self { - Self { inner, ready: false } - } +impl MaybePending { + fn new(inner: Vec<u8>) -> Self { + Self { inner, ready: false } } +} - impl AsyncWrite for MaybePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_write(cx, buf) - } else { - self.ready = true; - Poll::Pending - } +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending } + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - Pin::new(&mut self.inner).poll_flush(cx) - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - Pin::new(&mut self.inner).poll_close(cx) - } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_close(cx) } } -mod util { - use futures::future::Future; - - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } #[test] fn buf_writer() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut writer = BufWriter::with_capacity(2, Vec::new()); block_on(writer.write(&[0, 1])).unwrap(); @@ -104,9 +96,6 @@ fn buf_writer() { #[test] fn buf_writer_inner_flushes() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut w = BufWriter::with_capacity(3, Vec::new()); block_on(w.write(&[0, 1])).unwrap(); assert_eq!(*w.get_ref(), []); @@ -117,9 +106,6 @@ fn buf_writer_inner_flushes() { #[test] fn buf_writer_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, // use `Vec::new` instead of `vec![0; 8]`. let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); @@ -135,11 +121,6 @@ fn buf_writer_seek() { #[test] fn maybe_pending_buf_writer() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); run(writer.write(&[0, 1])).unwrap(); @@ -182,11 +163,6 @@ fn maybe_pending_buf_writer() { #[test] fn maybe_pending_buf_writer_inner_flushes() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); run(w.write(&[0, 1])).unwrap(); assert_eq!(&w.get_ref().inner, &[]); @@ -197,13 +173,6 @@ fn maybe_pending_buf_writer_inner_flushes() { #[test] fn maybe_pending_buf_writer_seek() { - use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - use util::run; - struct MaybePendingSeek { inner: Cursor<Vec<u8>>, ready_write: bool, @@ -241,9 +210,11 @@ fn maybe_pending_buf_writer_seek() { } impl AsyncSeek for MaybePendingSeek { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll<io::Result<u64>> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { if self.ready_seek { self.ready_seek = false; Pin::new(&mut self.inner).poll_seek(cx, pos) diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs index 4ba6342..435ea5a 100644 --- a/tests/io_cursor.rs +++ b/tests/io_cursor.rs @@ -1,13 +1,14 @@ +use assert_matches::assert_matches; +use futures::executor::block_on; +use futures::future::lazy; +use futures::io::{AsyncWrite, Cursor}; +use futures::task::Poll; +use std::pin::Pin; + #[test] fn cursor_asyncwrite_vec() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5]); - futures::executor::block_on(lazy(|cx| { + block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2))); @@ -18,14 +19,8 @@ fn cursor_asyncwrite_vec() { #[test] fn cursor_asyncwrite_box() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); - futures::executor::block_on(lazy(|cx| { + block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1))); diff --git a/tests/io_lines.rs b/tests/io_lines.rs index 2552c7c..5ce01a6 100644 --- a/tests/io_lines.rs +++ b/tests/io_lines.rs @@ -1,32 +1,34 @@ -mod util { - use futures::future::Future; - - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } -#[test] -fn lines() { - use futures::executor::block_on; - use futures::stream::StreamExt; - use futures::io::{AsyncBufReadExt, Cursor}; +macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; +} - macro_rules! block_on_next { - ($expr:expr) => { - block_on($expr.next()).unwrap().unwrap() - }; - } +macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; +} +#[test] +fn lines() { let buf = Cursor::new(&b"12\r"[..]); let mut s = buf.lines(); assert_eq!(block_on_next!(s), "12\r".to_string()); @@ -41,22 +43,8 @@ fn lines() { #[test] fn maybe_pending() { - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - - use util::run; - - macro_rules! run_next { - ($expr:expr) => { - run($expr.next()).unwrap().unwrap() - }; - } - - let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]]) - .map(Ok) - .into_async_read() - .interleave_pending(); + let buf = + stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending(); let mut s = buf.lines(); assert_eq!(run_next!(s), "12\r".to_string()); assert!(run(s.next()).is_none()); diff --git a/tests/io_read.rs b/tests/io_read.rs index 5902ad0..d39a6ea 100644 --- a/tests/io_read.rs +++ b/tests/io_read.rs @@ -1,27 +1,26 @@ -mod mock_reader { - use futures::io::AsyncRead; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncRead; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockReader { - fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>, - } +struct MockReader { + fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>, +} - impl MockReader { - pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockReader { + fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncRead for MockReader { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll<io::Result<usize>> { - (self.get_mut().fun)(buf) - } +impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) } } @@ -29,14 +28,6 @@ mod mock_reader { /// calls `poll_read` with an empty slice if no buffers are provided. #[test] fn read_vectored_no_buffers() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -53,14 +44,6 @@ fn read_vectored_no_buffers() { /// calls `poll_read` with the first non-empty buffer. #[test] fn read_vectored_first_non_empty() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf.len(), 4); buf.copy_from_slice(b"four"); diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs index bd4b36d..6582e50 100644 --- a/tests/io_read_exact.rs +++ b/tests/io_read_exact.rs @@ -1,14 +1,14 @@ +use futures::executor::block_on; +use futures::io::AsyncReadExt; + #[test] fn read_exact() { - use futures::executor::block_on; - use futures::io::AsyncReadExt; - let mut reader: &[u8] = &[1, 2, 3, 4, 5]; let mut out = [0u8; 3]; let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out assert!(res.is_ok()); - assert_eq!(out, [1,2,3]); + assert_eq!(out, [1, 2, 3]); assert_eq!(reader.len(), 2); let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs index 51e8126..88a8779 100644 --- a/tests/io_read_line.rs +++ b/tests/io_read_line.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_line() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = String::new(); assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); @@ -22,34 +36,13 @@ fn read_line() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); assert_eq!(v, "12"); - let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]]) - .map(Ok) - .into_async_read() - .interleave_pending(); + let mut buf = + stream::iter(vec![&b"12"[..], &b"\n\n"[..]]).map(Ok).into_async_read().interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); assert_eq!(v, "12\n"); diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs index 892d463..7122511 100644 --- a/tests/io_read_to_end.rs +++ b/tests/io_read_to_end.rs @@ -1,4 +1,5 @@ use futures::{ + executor::block_on, io::{self, AsyncRead, AsyncReadExt}, task::{Context, Poll}, }; @@ -12,7 +13,7 @@ fn issue2310() { } impl MyRead { - pub fn new() -> Self { + fn new() -> Self { MyRead { first: false } } } @@ -39,7 +40,7 @@ fn issue2310() { } impl VecWrapper { - pub fn new() -> Self { + fn new() -> Self { VecWrapper { inner: Vec::new() } } } @@ -55,7 +56,7 @@ fn issue2310() { } } - futures::executor::block_on(async { + block_on(async { let mut vec = VecWrapper::new(); let mut read = MyRead::new(); diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs index 2e9c00a..ae6aaa2 100644 --- a/tests/io_read_to_string.rs +++ b/tests/io_read_to_string.rs @@ -1,8 +1,13 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + #[test] fn read_to_string() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, Cursor}; - let mut c = Cursor::new(&b""[..]); let mut v = String::new(); assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); @@ -20,16 +25,7 @@ fn read_to_string() { #[test] fn interleave_pending() { - use futures::future::Future; - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncReadExt; - use futures_test::io::AsyncReadTestExt; - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - let mut cx = noop_context(); loop { if let Poll::Ready(x) = f.poll_unpin(&mut cx) { diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs index 6fa22ee..71f857f 100644 --- a/tests/io_read_until.rs +++ b/tests/io_read_until.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_until() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); @@ -22,25 +36,6 @@ fn read_until() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); diff --git a/tests/io_write.rs b/tests/io_write.rs index 363f32b..6af2755 100644 --- a/tests/io_write.rs +++ b/tests/io_write.rs @@ -1,35 +1,34 @@ -mod mock_writer { - use futures::io::AsyncWrite; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockWriter { - fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>, - } +struct MockWriter { + fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>, +} - impl MockWriter { - pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockWriter { + fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncWrite for MockWriter { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - (self.get_mut().fun)(buf) - } +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) + } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { - panic!() - } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() + } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { - panic!() - } + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() } } @@ -37,14 +36,6 @@ mod mock_writer { /// calls `poll_write` with an empty slice if no buffers are provided. #[test] fn write_vectored_no_buffers() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -61,24 +52,12 @@ fn write_vectored_no_buffers() { /// calls `poll_write` with the first non-empty buffer. #[test] fn write_vectored_first_non_empty() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b"four"); Poll::Ready(Ok(4)) }); let cx = &mut panic_context(); - let bufs = &mut [ - io::IoSlice::new(&[]), - io::IoSlice::new(&[]), - io::IoSlice::new(b"four") - ]; + let bufs = &mut [io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(b"four")]; let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); let res = res.map_err(|e| e.kind()); diff --git a/tests/join_all.rs b/tests/join_all.rs deleted file mode 100644 index c322e58..0000000 --- a/tests/join_all.rs +++ /dev/null @@ -1,51 +0,0 @@ -mod util { - use std::future::Future; - use std::fmt::Debug; - - pub fn assert_done<T, F>(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, - { - use futures::executor::block_on; - - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } -} - -#[test] -fn collect_collects() { - use futures_util::future::{join_all,ready}; - - util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); - // REVIEW: should this be implemented? - // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]); - - // TODO: needs more tests -} - -#[test] -fn join_all_iter_lifetime() { - use futures_util::future::{join_all,ready}; - use std::future::Future; - // In futures-rs version 0.1, this function would fail to typecheck due to an overly - // conservative type parameterization of `JoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> { - let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); - Box::new(join_all(iter)) - } - - util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]); -} - -#[test] -fn join_all_from_iter() { - use futures_util::future::{JoinAll,ready}; - - util::assert_done( - || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()), - vec![1, 2], - ) -} diff --git a/tests/mutex.rs b/tests/lock_mutex.rs index 68e0301..7c33864 100644 --- a/tests/mutex.rs +++ b/tests/lock_mutex.rs @@ -1,9 +1,15 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, ThreadPool}; +use futures::future::{ready, FutureExt}; +use futures::lock::Mutex; +use futures::stream::StreamExt; +use futures::task::{Context, SpawnExt}; +use futures_test::future::FutureTestExt; +use futures_test::task::{new_count_waker, panic_context}; +use std::sync::Arc; + #[test] fn mutex_acquire_uncontested() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures_test::task::panic_context; - let mutex = Mutex::new(()); for _ in 0..10 { assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); @@ -12,11 +18,6 @@ fn mutex_acquire_uncontested() { #[test] fn mutex_wakes_waiters() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures::task::Context; - use futures_test::task::{new_count_waker, panic_context}; - let mutex = Mutex::new(()); let (waker, counter) = new_count_waker(); let lock = mutex.lock().poll_unpin(&mut panic_context()); @@ -35,20 +36,8 @@ fn mutex_wakes_waiters() { #[test] fn mutex_contested() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::ready; - use futures::lock::Mutex; - use futures::stream::StreamExt; - use futures::task::SpawnExt; - use futures_test::future::FutureTestExt; - use std::sync::Arc; - let (tx, mut rx) = mpsc::unbounded(); - let pool = futures::executor::ThreadPool::builder() - .pool_size(16) - .create() - .unwrap(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); let tx = Arc::new(tx); let mutex = Arc::new(Mutex::new(0)); diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs index ca13163..85871e9 100644 --- a/tests/macro_comma_support.rs +++ b/tests/macro_comma_support.rs @@ -1,12 +1,13 @@ +use futures::{ + executor::block_on, + future::{self, FutureExt}, + join, ready, + task::Poll, + try_join, +}; + #[test] fn ready() { - use futures::{ - executor::block_on, - future, - task::Poll, - ready, - }; - block_on(future::poll_fn(|_| { ready!(Poll::Ready(()),); Poll::Ready(()) @@ -15,11 +16,7 @@ fn ready() { #[test] fn poll() { - use futures::{ - executor::block_on, - future::FutureExt, - poll, - }; + use futures::poll; block_on(async { let _ = poll!(async {}.boxed(),); @@ -28,11 +25,6 @@ fn poll() { #[test] fn join() { - use futures::{ - executor::block_on, - join - }; - block_on(async { let future1 = async { 1 }; let future2 = async { 2 }; @@ -42,12 +34,6 @@ fn join() { #[test] fn try_join() { - use futures::{ - executor::block_on, - future::FutureExt, - try_join, - }; - block_on(async { let future1 = async { 1 }.never_error(); let future2 = async { 2 }.never_error(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index 2494306..34b78a3 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,11 +1,11 @@ +use futures::channel::oneshot; +use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; +use std::thread; + #[test] fn oneshot_send1() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -17,12 +17,6 @@ fn oneshot_send1() { #[test] fn oneshot_send2() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -33,12 +27,6 @@ fn oneshot_send2() { #[test] fn oneshot_send3() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -49,11 +37,6 @@ fn oneshot_send3() { #[test] fn oneshot_drop_tx1() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -65,12 +48,6 @@ fn oneshot_drop_tx1() { #[test] fn oneshot_drop_tx2() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -83,9 +60,19 @@ fn oneshot_drop_tx2() { #[test] fn oneshot_drop_rx() { - use futures::channel::oneshot; - let (tx, rx) = oneshot::channel::<i32>(); drop(rx); assert_eq!(Err(2), tx.send(2)); } + +#[test] +fn oneshot_debug() { + let (tx, rx) = oneshot::channel::<i32>(); + assert_eq!(format!("{:?}", tx), "Sender { complete: false }"); + assert_eq!(format!("{:?}", rx), "Receiver { complete: false }"); + drop(rx); + assert_eq!(format!("{:?}", tx), "Sender { complete: true }"); + let (tx, rx) = oneshot::channel::<i32>(); + drop(tx); + assert_eq!(format!("{:?}", rx), "Receiver { complete: true }"); +} diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs index 9aa3636..8290132 100644 --- a/tests/ready_queue.rs +++ b/tests/ready_queue.rs @@ -1,18 +1,15 @@ -mod assert_send_sync { - use futures::stream::FuturesUnordered; - - pub trait AssertSendSync: Send + Sync {} - impl AssertSendSync for FuturesUnordered<()> {} -} +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Barrier}; +use std::thread; #[test] fn basic_usage() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -41,12 +38,6 @@ fn basic_usage() { #[test] fn resolving_errors() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -75,12 +66,6 @@ fn resolving_errors() { #[test] fn dropping_ready_queue() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::task::noop_context; - block_on(future::lazy(move |_| { let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); @@ -108,12 +93,6 @@ fn dropping_ready_queue() { #[test] fn stress() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - use std::sync::{Arc, Barrier}; - use std::thread; - const ITER: usize = 300; for i in 0..ITER { @@ -157,12 +136,6 @@ fn stress() { #[test] fn panicking_future_dropped() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use std::panic::{self, AssertUnwindSafe}; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() })); diff --git a/tests/recurse.rs b/tests/recurse.rs index a151f1b..d81753c 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -1,10 +1,10 @@ +use futures::executor::block_on; +use futures::future::{self, BoxFuture, FutureExt}; +use std::sync::mpsc; +use std::thread; + #[test] fn lots() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, BoxFuture}; - use std::sync::mpsc; - use std::thread; - #[cfg(not(futures_sanitizer))] const N: i32 = 1_000; #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136 @@ -20,8 +20,6 @@ fn lots() { } let (tx, rx) = mpsc::channel(); - thread::spawn(|| { - block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())) - }); + thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))); assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap()); } diff --git a/tests/sink.rs b/tests/sink.rs index 597ed34..f3cf11b 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,264 +1,221 @@ -mod sassert_next { - use futures::stream::{Stream, StreamExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::fmt; - - pub fn sassert_next<S>(s: &mut S, item: S::Item) - where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, - { - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), - } +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{self, Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next<S>(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), } } -mod unwrap { - use futures::task::Poll; - use std::fmt; - - pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), - } +fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), } } -mod flag_cx { - use futures::task::{self, ArcWake, Context}; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - - // An Unpark struct that records unpark events for inspection - pub struct Flag(AtomicBool); +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); - impl Flag { - pub fn new() -> Arc<Self> { - Arc::new(Self(AtomicBool::new(false))) - } - - pub fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } +impl Flag { + fn new() -> Arc<Self> { + Arc::new(Self(AtomicBool::new(false))) + } - pub fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) } - impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.set(true) - } + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) } +} - pub fn flag_cx<F, R>(f: F) -> R - where - F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, - { - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.set(true) } } -mod start_send_fut { - use futures::future::Future; - use futures::ready; - use futures::sink::Sink; - use futures::task::{Context, Poll}; - use std::pin::Pin; +fn flag_cx<F, R>(f: F) -> R +where + F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} - // Sends a value on an i32 channel sink - pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); - impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { - pub fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } +impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) } +} - impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { - type Output = Result<S, S::Error>; +impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { + type Output = Result<S, S::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; } + Poll::Ready(Ok(inner.take().unwrap())) } } -mod manual_flush { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::mem; - use std::pin::Pin; - - // Immediately accepts all requests to start pushing, but completion is managed - // by manually flushing - pub struct ManualFlush<T: Unpin> { - data: Vec<T>, - waiting_tasks: Vec<Waker>, - } +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T: Unpin> { + data: Vec<T>, + waiting_tasks: Vec<Waker>, +} - impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { - type Error = (); +impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { + type Error = (); - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) + fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); } + Ok(()) + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.poll_flush(cx) - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) } +} - impl<T: Unpin> ManualFlush<T> { - pub fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } - } +impl<T: Unpin> ManualFlush<T> { + fn new() -> Self { + Self { data: Vec::new(), waiting_tasks: Vec::new() } + } - pub fn force_flush(&mut self) -> Vec<T> { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.wake() } + mem::replace(&mut self.data, Vec::new()) } } -mod allowance { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::cell::{Cell, RefCell}; - use std::pin::Pin; - use std::rc::Rc; +struct ManualAllow<T: Unpin> { + data: Vec<T>, + allow: Rc<Allow>, +} - pub struct ManualAllow<T: Unpin> { - pub data: Vec<T>, - allow: Rc<Allow>, - } +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Waker>>, +} - pub struct Allow { - flag: Cell<bool>, - tasks: RefCell<Vec<Waker>>, +impl Allow { + fn new() -> Self { + Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) } } - impl Allow { - pub fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - pub fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - pub fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false } } - impl<T: Unpin> Sink<T> for ManualAllow<T> { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); } + } +} - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } +impl<T: Unpin> Sink<T> for ManualAllow<T> { + type Error = (); - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.allow.check(cx) { Poll::Ready(Ok(())) + } else { + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) } - pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() }; + (manual_allow, allow) } #[test] fn either_sink() { - use futures::sink::{Sink, SinkExt}; - use std::collections::VecDeque; - use std::pin::Pin; - - let mut s = if true { - Vec::<i32>::new().left_sink() - } else { - VecDeque::<i32>::new().right_sink() - }; + let mut s = + if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() }; Pin::new(&mut s).start_send(0).unwrap(); } #[test] fn vec_sink() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -269,10 +226,6 @@ fn vec_sink() { #[test] fn vecdeque_sink() { - use futures::sink::Sink; - use std::collections::VecDeque; - use std::pin::Pin; - let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -284,9 +237,6 @@ fn vecdeque_sink() { #[test] fn send() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -301,10 +251,6 @@ fn send() { #[test] fn send_all() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -321,15 +267,6 @@ fn send_all() { // channel is full #[test] fn mpsc_blocking_start_send() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use sassert_next::sassert_next; - use unwrap::unwrap; - let (mut tx, mut rx) = mpsc::channel::<i32>(0); block_on(future::lazy(|_| { @@ -353,17 +290,6 @@ fn mpsc_blocking_start_send() { // until a oneshot is completed #[test] fn with_flush() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures::never::Never; - use futures::sink::{Sink, SinkExt}; - use std::mem; - use std::pin::Pin; - - use flag_cx::flag_cx; - use unwrap::unwrap; - let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -390,11 +316,6 @@ fn with_flush() { // test simple use of with to change data #[test] fn with_as_map() { - use futures::executor::block_on; - use futures::future; - use futures::never::Never; - use futures::sink::SinkExt; - let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -405,10 +326,6 @@ fn with_as_map() { // test simple use of with_flat_map #[test] fn with_flat_map() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -421,16 +338,6 @@ fn with_flat_map() { // Regression test for the issue #1834. #[test] fn with_propagates_poll_ready() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use std::pin::Pin; - - use flag_cx::flag_cx; - use sassert_next::sassert_next; - let (tx, mut rx) = mpsc::channel::<i32>(0); let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10)); @@ -457,14 +364,6 @@ fn with_propagates_poll_ready() { // but doesn't claim to be flushed until the underlying sink is #[test] fn with_flush_propagate() { - use futures::future::{self, FutureExt}; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - - use manual_flush::ManualFlush; - use flag_cx::flag_cx; - use unwrap::unwrap; - let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -486,21 +385,13 @@ fn with_flush_propagate() { // test that `Clone` is implemented on `with` sinks #[test] fn with_implements_clone() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::{SinkExt, StreamExt}; - let (mut tx, rx) = mpsc::channel(5); { - let mut is_positive = tx - .clone() - .with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); + let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); - let mut is_long = tx - .clone() - .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); + let mut is_long = + tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); block_on(is_positive.clone().send(-1)).unwrap(); block_on(is_long.clone().send("123456")).unwrap(); @@ -512,18 +403,12 @@ fn with_implements_clone() { block_on(tx.close()).unwrap(); - assert_eq!( - block_on(rx.collect::<Vec<_>>()), - vec![false, true, false, true, false] - ); + assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]); } // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -539,15 +424,6 @@ fn buffer_noop() { // and writing out when the underlying sink is ready #[test] fn buffer() { - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - use allowance::manual_allow; - let (sink, allow) = manual_allow::<i32>(); let sink = sink.buffer(2); @@ -567,10 +443,6 @@ fn buffer() { #[test] fn fanout_smoke() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -582,16 +454,6 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -624,12 +486,6 @@ fn fanout_backpressure() { #[test] fn sink_map_err() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -639,20 +495,11 @@ fn sink_map_err() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), - Err(()) - ); + assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(())); } #[test] fn sink_unfold() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::sink::{self, Sink, SinkExt}; - use futures::task::Poll; - block_on(poll_fn(|cx| { let (tx, mut rx) = mpsc::channel(1); let unfold = sink::unfold((), |(), i: i32| { @@ -685,14 +532,8 @@ fn sink_unfold() { #[test] fn err_into() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkErrInto, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - #[derive(Copy, Clone, Debug, PartialEq, Eq)] - pub struct ErrIntoTest; + struct ErrIntoTest; impl From<mpsc::SendError> for ErrIntoTest { fn from(_: mpsc::SendError) -> Self { @@ -709,8 +550,5 @@ fn err_into() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_err_into()).start_send(()), - Err(ErrIntoTest) - ); + assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest)); } diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs index 7d1fa43..e57b2d8 100644 --- a/tests/sink_fanout.rs +++ b/tests/sink_fanout.rs @@ -1,11 +1,11 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::join3; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; + #[test] fn it_works() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::join3; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let (tx1, rx1) = mpsc::channel(1); let (tx2, rx2) = mpsc::channel(2); let tx = tx1.fanout(tx2).sink_map_err(|_| ()); diff --git a/tests/split.rs b/tests/split.rs deleted file mode 100644 index 86c2fc6..0000000 --- a/tests/split.rs +++ /dev/null @@ -1,75 +0,0 @@ -#[test] -fn test_split() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use futures::stream::{self, Stream, StreamExt}; - use futures::task::{Context, Poll}; - use pin_project::pin_project; - use std::pin::Pin; - - #[pin_project] - struct Join<T, U> { - #[pin] - stream: T, - #[pin] - sink: U, - } - - impl<T: Stream, U> Stream for Join<T, U> { - type Item = T::Item; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T::Item>> { - self.project().stream.poll_next(cx) - } - } - - impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { - type Error = U::Error; - - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_ready(cx) - } - - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { - self.project().sink.start_send(item) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_flush(cx) - } - - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_close(cx) - } - } - - let mut dest: Vec<i32> = Vec::new(); - { - let join = Join { - stream: stream::iter(vec![10, 20, 30]), - sink: &mut dest - }; - - let (sink, stream) = join.split(); - let join = sink.reunite(stream).expect("test_split: reunite error"); - let (mut sink, stream) = join.split(); - let mut stream = stream.map(Ok); - block_on(sink.send_all(&mut stream)).unwrap(); - } - assert_eq!(dest, vec![10, 20, 30]); -} diff --git a/tests/stream.rs b/tests/stream.rs index 14b283d..0d453d1 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,8 +1,14 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::{self, Future}; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use futures::FutureExt; +use futures_test::task::noop_context; + #[test] fn select() { - use futures::executor::block_on; - use futures::stream::{self, StreamExt}; - fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) { let a = stream::iter(a); let b = stream::iter(b); @@ -17,19 +23,12 @@ fn select() { #[test] fn flat_map() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { - let st = stream::iter(vec![ - stream::iter(0..=4u8), - stream::iter(6..=10), - stream::iter(0..=2), - ]); - - let values: Vec<_> = st - .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))) - .collect() - .await; + block_on(async { + let st = + stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]); + + let values: Vec<_> = + st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await; assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]); }); @@ -37,28 +36,21 @@ fn flat_map() { #[test] fn scan() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { - assert_eq!( - stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) - .scan(1, |state, e| { - *state += 1; - futures::future::ready(if e < *state { Some(e) } else { None }) - }) - .collect::<Vec<_>>() - .await, - vec![1u8, 2, 3, 4] - ); + block_on(async { + let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) + .scan(1, |state, e| { + *state += 1; + futures::future::ready(if e < *state { Some(e) } else { None }) + }) + .collect::<Vec<_>>() + .await; + + assert_eq!(values, vec![1u8, 2, 3, 4]); }); } #[test] fn take_until() { - use futures::future::{self, Future}; - use futures::stream::{self, StreamExt}; - use futures::task::Poll; - fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { let mut i = 0; future::poll_fn(move |_cx| { @@ -71,7 +63,7 @@ fn take_until() { }) } - futures::executor::block_on(async { + block_on(async { // Verify stopping works: let stream = stream::iter(1u32..=10); let stop_fut = make_stop_fut(5); @@ -123,10 +115,15 @@ fn take_until() { #[test] #[should_panic] -fn ready_chunks_panic_on_cap_zero() { - use futures::channel::mpsc; - use futures::stream::StreamExt; +fn chunks_panic_on_cap_zero() { + let (_, rx1) = mpsc::channel::<()>(1); + + let _ = rx1.chunks(0); +} +#[test] +#[should_panic] +fn ready_chunks_panic_on_cap_zero() { let (_, rx1) = mpsc::channel::<()>(1); let _ = rx1.ready_chunks(0); @@ -134,12 +131,6 @@ fn ready_chunks_panic_on_cap_zero() { #[test] fn ready_chunks() { - use futures::channel::mpsc; - use futures::stream::StreamExt; - use futures::sink::SinkExt; - use futures::FutureExt; - use futures_test::task::noop_context; - let (mut tx, rx1) = mpsc::channel::<i32>(16); let mut s = rx1.ready_chunks(2); @@ -147,14 +138,14 @@ fn ready_chunks() { let mut cx = noop_context(); assert!(s.next().poll_unpin(&mut cx).is_pending()); - futures::executor::block_on(async { + block_on(async { tx.send(1).await.unwrap(); assert_eq!(s.next().await.unwrap(), vec![1]); tx.send(2).await.unwrap(); tx.send(3).await.unwrap(); tx.send(4).await.unwrap(); - assert_eq!(s.next().await.unwrap(), vec![2,3]); + assert_eq!(s.next().await.unwrap(), vec![2, 3]); assert_eq!(s.next().await.unwrap(), vec![4]); }); } diff --git a/tests/stream_abortable.rs b/tests/stream_abortable.rs new file mode 100644 index 0000000..2339dd0 --- /dev/null +++ b/tests/stream_abortable.rs @@ -0,0 +1,46 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::stream::{abortable, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures::SinkExt; +use futures_test::task::new_count_waker; +use std::pin::Pin; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert!(abortable_rx.is_aborted()); + assert_eq!(None, block_on(abortable_rx.next())); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); + assert_eq!(counter, 0); + + abort_handle.abort(); + assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); + assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (mut tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, _abort_handle) = abortable(a_rx); + + block_on(tx.send(())).unwrap(); + + assert!(!abortable_rx.is_aborted()); + assert_eq!(Some(()), block_on(abortable_rx.next())); +} diff --git a/tests/buffer_unordered.rs b/tests/stream_buffer_unordered.rs index 5c8b8bf..9a2ee17 100644 --- a/tests/buffer_unordered.rs +++ b/tests/stream_buffer_unordered.rs @@ -1,13 +1,13 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::mpsc as std_mpsc; +use std::thread; + #[test] #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 fn works() { - use futures::channel::{oneshot, mpsc}; - use futures::executor::{block_on, block_on_stream}; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - use std::sync::mpsc as std_mpsc; - use std::thread; - const N: usize = 4; let (mut tx, rx) = mpsc::channel(1); diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs index 272558c..8b23a0a 100644 --- a/tests/stream_catch_unwind.rs +++ b/tests/stream_catch_unwind.rs @@ -1,8 +1,8 @@ +use futures::executor::block_on_stream; +use futures::stream::{self, StreamExt}; + #[test] fn panic_in_the_middle_of_the_stream() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element @@ -16,9 +16,6 @@ fn panic_in_the_middle_of_the_stream() { #[test] fn no_panic() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); diff --git a/tests/futures_ordered.rs b/tests/stream_futures_ordered.rs index 7f21c82..7506c65 100644 --- a/tests/futures_ordered.rs +++ b/tests/stream_futures_ordered.rs @@ -1,10 +1,12 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; +use futures::stream::{FuturesOrdered, StreamExt}; +use futures_test::task::noop_context; +use std::any::Any; + #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); @@ -26,19 +28,13 @@ fn works_1() { #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![ - a_rx.boxed(), - join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), - ].into_iter().collect::<FuturesOrdered<_>>(); + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesOrdered<_>>(); let mut cx = noop_context(); a_tx.send(33).unwrap(); @@ -51,37 +47,29 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{StreamExt, FuturesOrdered}; - - let stream = vec![ - future::ready::<i32>(1), - future::ready::<i32>(2), - future::ready::<i32>(3) - ].into_iter().collect::<FuturesOrdered<_>>(); + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesOrdered<_>>(); assert_eq!(stream.len(), 3); - assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]); + assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } #[test] fn queue_never_unblocked() { - use futures::channel::oneshot; - use futures::future::{self, Future, TryFutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - use std::any::Any; - let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let mut stream = vec![ Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>, - Box::new(future::try_select(b_rx, c_rx) - .map_err(|e| e.factor_first().0) - .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _, - ].into_iter().collect::<FuturesOrdered<_>>(); + Box::new( + future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)), + ) as _, + ] + .into_iter() + .collect::<FuturesOrdered<_>>(); let cx = &mut noop_context(); for _ in 0..10 { diff --git a/tests/futures_unordered.rs b/tests/stream_futures_unordered.rs index ac0817e..4b9afcc 100644 --- a/tests/futures_unordered.rs +++ b/tests/stream_futures_unordered.rs @@ -1,17 +1,17 @@ -use futures::future::Future; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt}; +use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; use futures_test::task::noop_context; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; use std::iter::FromIterator; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; #[test] fn is_terminated() { - use futures::future; - use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = FuturesUnordered::new(); @@ -39,19 +39,12 @@ fn is_terminated() { #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut iter = block_on_stream( - vec![a_rx, b_rx, c_rx] - .into_iter() - .collect::<FuturesUnordered<_>>(), - ); + let mut iter = + block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>()); b_tx.send(99).unwrap(); assert_eq!(Some(Ok(99)), iter.next()); @@ -65,22 +58,13 @@ fn works_1() { #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![ - a_rx.boxed(), - join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesUnordered<_>>(); a_tx.send(9).unwrap(); b_tx.send(10).unwrap(); @@ -94,29 +78,15 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - - let stream = vec![ - future::ready::<i32>(1), - future::ready::<i32>(2), - future::ready::<i32>(3), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesUnordered<_>>(); assert_eq!(stream.len(), 3); assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } #[test] fn finished_future() { - use std::marker::Unpin; - use futures::channel::oneshot; - use futures::future::{self, Future, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::task::noop_context; - let (_a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); @@ -142,17 +112,11 @@ fn finished_future() { #[test] fn iter_mut_cancel() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![a_rx, b_rx, c_rx] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); for rx in stream.iter_mut() { rx.close(); @@ -172,16 +136,10 @@ fn iter_mut_cancel() { #[test] fn iter_mut_len() { - use futures::future; - use futures::stream::FuturesUnordered; - - let mut stream = vec![ - future::pending::<()>(), - future::pending::<()>(), - future::pending::<()>(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = + vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); let mut iter_mut = stream.iter_mut(); assert_eq!(iter_mut.len(), 3); @@ -196,15 +154,6 @@ fn iter_mut_len() { #[test] fn iter_cancel() { - use std::marker::Unpin; - use std::pin::Pin; - use std::sync::atomic::{AtomicBool, Ordering}; - - use futures::executor::block_on_stream; - use futures::future::{self, Future, FutureExt}; - use futures::stream::FuturesUnordered; - use futures::task::{Context, Poll}; - struct AtomicCancel<F> { future: F, cancel: AtomicBool, @@ -250,16 +199,9 @@ fn iter_cancel() { #[test] fn iter_len() { - use futures::future; - use futures::stream::FuturesUnordered; - - let stream = vec![ - future::pending::<()>(), - future::pending::<()>(), - future::pending::<()>(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); let mut iter = stream.iter(); assert_eq!(iter.len(), 3); @@ -273,12 +215,52 @@ fn iter_len() { } #[test] -fn futures_not_moved_after_poll() { - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::future::FutureTestExt; - use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::<FuturesUnordered<_>>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + +#[test] +fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, // asserting that it does not move. let fut = future::ready(()).pending_once().assert_unmoved(); @@ -292,11 +274,6 @@ fn futures_not_moved_after_poll() { #[test] fn len_valid_during_out_of_order_completion() { - use futures::channel::oneshot; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - // Complete futures out-of-order and add new futures afterwards to ensure // length values remain correct. let (a_tx, a_rx) = oneshot::channel::<i32>(); @@ -368,3 +345,25 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::<F>::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); } + +#[test] +fn clear() { + let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(future::ready(3)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs index 222c985..60188d3 100644 --- a/tests/stream_into_async_read.rs +++ b/tests/stream_into_async_read.rs @@ -1,31 +1,51 @@ -#[test] -fn test_into_async_read() { - use core::pin::Pin; - use futures::io::AsyncRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_read { - ($reader:expr, $buf:expr, $item:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $item); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } +use core::pin::Pin; +use futures::io::{AsyncBufRead, AsyncRead}; +use futures::stream::{self, TryStreamExt}; +use futures::task::Poll; +use futures_test::{stream::StreamTestExt, task::noop_context}; + +macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; } } - }; - } + } + }; +} +#[test] +fn test_into_async_read() { let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); let mut buf = vec![0; 3]; @@ -53,32 +73,6 @@ fn test_into_async_read() { #[test] fn test_into_async_bufread() { - use core::pin::Pin; - use futures::io::AsyncBufRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_fill_buf { - ($reader:expr, $buf:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $buf); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } - } - } - }; - } - let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs index 66a7385..2fa7f3a 100644 --- a/tests/stream_peekable.rs +++ b/tests/stream_peekable.rs @@ -1,9 +1,9 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + #[test] fn peekable() { - use futures::executor::block_on; - use futures::pin_mut; - use futures::stream::{self, Peekable, StreamExt}; - block_on(async { let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); pin_mut!(peekable); @@ -11,3 +11,29 @@ fn peekable() { assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]); }); } + +#[test] +fn peekable_next_if_eq() { + block_on(async { + // first, try on references + let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable(); + pin_mut!(s); + // try before `peek()` + assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None); + assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart")); + // try after peek() + assert_eq!(s.as_mut().peek().await, Some(&"of")); + assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of")); + assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None); + // make sure `next()` still behaves + assert_eq!(s.next().await, Some("Gold")); + + // make sure comparison works for owned values + let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable(); + pin_mut!(s); + // make sure basic functionality works + assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into())); + assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into())); + assert_eq!(s.as_mut().next_if_eq("").await, None); + }); +} diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs index 6178412..4ae0735 100644 --- a/tests/stream_select_all.rs +++ b/tests/stream_select_all.rs @@ -1,10 +1,12 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, FutureExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; + #[test] fn is_terminated() { - use futures::future::{self, FutureExt}; - use futures::stream::{FusedStream, SelectAll, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = SelectAll::new(); @@ -30,9 +32,6 @@ fn is_terminated() { #[test] fn issue_1626() { - use futures::executor::block_on_stream; - use futures::stream; - let a = stream::iter(0..=2); let b = stream::iter(10..=14); @@ -51,10 +50,6 @@ fn issue_1626() { #[test] fn works_1() { - use futures::channel::mpsc; - use futures::executor::block_on_stream; - use futures::stream::select_all; - let (a_tx, a_rx) = mpsc::unbounded::<u32>(); let (b_tx, b_rx) = mpsc::unbounded::<u32>(); let (c_tx, c_rx) = mpsc::unbounded::<u32>(); @@ -81,3 +76,122 @@ fn works_1() { drop((a_tx, b_tx, c_tx)); assert_eq!(None, stream.next()); } + +#[test] +fn clear() { + let mut tasks = + select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(stream::iter(vec![3].into_iter())); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} + +#[test] +fn iter_mut() { + let mut stream = + vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.into_iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs index bec5262..8252ad7 100644 --- a/tests/stream_select_next_some.rs +++ b/tests/stream_select_next_some.rs @@ -1,11 +1,13 @@ +use futures::executor::block_on; +use futures::future::{self, FusedFuture, FutureExt}; +use futures::select; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::new_count_waker; + #[test] fn is_terminated() { - use futures::future; - use futures::future::{FusedFuture, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); @@ -30,15 +32,11 @@ fn is_terminated() { #[test] fn select() { - use futures::{future, select}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; @@ -61,17 +59,13 @@ fn select() { // Check that `select!` macro does not fail when importing from `futures_util`. #[test] fn futures_util_select() { - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - use futures_util::select; // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; diff --git a/tests/stream_split.rs b/tests/stream_split.rs new file mode 100644 index 0000000..694c151 --- /dev/null +++ b/tests/stream_split.rs @@ -0,0 +1,57 @@ +use futures::executor::block_on; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_project::pin_project; +use std::pin::Pin; + +#[test] +fn test_split() { + #[pin_project] + struct Join<T, U> { + #[pin] + stream: T, + #[pin] + sink: U, + } + + impl<T: Stream, U> Stream for Join<T, U> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + self.project().stream.poll_next(cx) + } + } + + impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { + type Error = U::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.project().sink.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_close(cx) + } + } + + let mut dest: Vec<i32> = Vec::new(); + { + let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest }; + + let (sink, stream) = join.split(); + let join = sink.reunite(stream).expect("test_split: reunite error"); + let (mut sink, stream) = join.split(); + let mut stream = stream.map(Ok); + block_on(sink.send_all(&mut stream)).unwrap(); + } + assert_eq!(dest, vec![10, 20, 30]); +} diff --git a/tests/try_stream.rs b/tests/stream_try_stream.rs index 194e74d..194e74d 100644 --- a/tests/try_stream.rs +++ b/tests/stream_try_stream.rs diff --git a/tests/unfold.rs b/tests/stream_unfold.rs index 95722cf..16b1081 100644 --- a/tests/unfold.rs +++ b/tests/stream_unfold.rs @@ -1,10 +1,7 @@ use futures::future; use futures::stream; - use futures_test::future::FutureTestExt; -use futures_test::{ - assert_stream_done, assert_stream_next, assert_stream_pending, -}; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; #[test] fn unfold1() { diff --git a/tests/task_arc_wake.rs b/tests/task_arc_wake.rs new file mode 100644 index 0000000..aedc15b --- /dev/null +++ b/tests/task_arc_wake.rs @@ -0,0 +1,79 @@ +use futures::task::{self, ArcWake, Waker}; +use std::panic; +use std::sync::{Arc, Mutex}; + +struct CountingWaker { + nr_wake: Mutex<i32>, +} + +impl CountingWaker { + fn new() -> Self { + Self { nr_wake: Mutex::new(0) } + } + + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() + } +} + +impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc<Self>) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; + } +} + +#[test] +fn create_from_arc() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); + + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); + + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); + + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); +} + +#[test] +fn ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); + + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); +} + +#[test] +fn proper_refcount_on_wake_panic() { + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + let some_w = Arc::new(PanicWaker); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!( + "WAKE UP", + *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap() + ); + assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); // some_w +} diff --git a/tests/atomic_waker.rs b/tests/task_atomic_waker.rs index bf15d0f..cec3db2 100644 --- a/tests/atomic_waker.rs +++ b/tests/task_atomic_waker.rs @@ -1,14 +1,13 @@ +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::task::{AtomicWaker, Poll}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; + #[test] fn basic() { - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - use std::sync::Arc; - use std::thread; - - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::task::{AtomicWaker, Poll}; - let atomic_waker = Arc::new(AtomicWaker::new()); let atomic_waker_copy = atomic_waker.clone(); diff --git a/tests/test_macro.rs b/tests/test_macro.rs new file mode 100644 index 0000000..6adf51d --- /dev/null +++ b/tests/test_macro.rs @@ -0,0 +1,20 @@ +#[futures_test::test] +async fn it_works() { + let fut = async { true }; + assert!(fut.await); + + let fut = async { false }; + assert!(!fut.await); +} + +#[should_panic] +#[futures_test::test] +async fn it_is_being_run() { + let fut = async { false }; + assert!(fut.await); +} + +#[futures_test::test] +async fn return_ty() -> Result<(), ()> { + Ok(()) +} diff --git a/tests/try_join.rs b/tests/try_join.rs index 6c6d084..8b0b38c 100644 --- a/tests/try_join.rs +++ b/tests/try_join.rs @@ -1,6 +1,6 @@ #![deny(unreachable_code)] -use futures::{try_join, executor::block_on}; +use futures::{executor::block_on, try_join}; // TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to // test behaviour of the `try_join!` macro with the never type before it is @@ -14,7 +14,6 @@ impl<T> MyTrait for fn() -> T { } type Never = <fn() -> ! as MyTrait>::Output; - #[test] fn try_join_never_error() { block_on(async { diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs index 6c7e11c..a7a5710 100644 --- a/tests_disabled/all.rs +++ b/tests_disabled/all.rs @@ -1,27 +1,27 @@ -use futures::future; -use futures::executor::block_on; use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future; use std::sync::mpsc::{channel, TryRecvError}; -mod support; -use support::*; +// mod support; +// use support::*; fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> { match r { - Ok(Either::Left((t, _))) | - Ok(Either::Right((t, _))) => Ok(t), - Err(Either::Left((e, _))) | - Err(Either::Right((e, _))) => Err(e), + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e), } } #[test] fn result_smoke() { fn is_future_v<A, B, C>(_: C) - where A: Send + 'static, - B: Send + 'static, - C: Future<Item=A, Error=B> - {} + where + A: Send + 'static, + B: Send + 'static, + C: Future<Item = A, Error = B>, + { + } is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1)); is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1)); @@ -64,7 +64,9 @@ fn result_smoke() { #[test] fn test_empty() { - fn empty() -> Empty<i32, u32> { future::empty() } + fn empty() -> Empty<i32, u32> { + future::empty() + } assert_empty(|| empty()); assert_empty(|| empty().select(empty())); @@ -105,16 +107,22 @@ fn flatten() { #[test] fn smoke_oneshot() { - assert_done(|| { - let (c, p) = oneshot::channel(); - c.send(1).unwrap(); - p - }, Ok(1)); - assert_done(|| { - let (c, p) = oneshot::channel::<i32>(); - drop(c); - p - }, Err(Canceled)); + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + p + }, + Err(Canceled), + ); let mut completes = Vec::new(); assert_empty(|| { let (a, b) = oneshot::channel::<i32>(); @@ -129,9 +137,7 @@ fn smoke_oneshot() { let (c, p) = oneshot::channel::<i32>(); drop(c); let (tx, rx) = channel(); - p.then(move |_| { - tx.send(()) - }).forget(); + p.then(move |_| tx.send(())).forget(); rx.recv().unwrap(); } @@ -139,8 +145,14 @@ fn smoke_oneshot() { fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); // assert!(f.poll(&mut Task::new()).is_pending()); @@ -156,8 +168,14 @@ fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); assert!(f.poll(lw).ok().unwrap().is_pending()); @@ -173,8 +191,14 @@ fn select_cancels() { fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.join(d); drop(a); @@ -185,8 +209,14 @@ fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let (tx, rx) = channel(); let f = b.join(d); @@ -194,7 +224,8 @@ fn join_cancels() { tx.send(()).unwrap(); let res: Result<(), ()> = Ok(()); res - }).forget(); + }) + .forget(); assert!(rx.try_recv().is_err()); drop(a); rx.recv().unwrap(); @@ -243,7 +274,6 @@ fn join_incomplete() { }) } - #[test] fn select2() { assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); @@ -251,14 +281,15 @@ fn select2() { assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); - assert_done(|| { - f_ok(1).select(f_ok(2)) - .map_err(|_| 0) - .and_then(|either_tup| { - let (a, b) = either_tup.into_inner(); - b.map(move |b| a + b) - }) - }, Ok(3)); + assert_done( + || { + f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, + Ok(3), + ); // Finish one half of a select and then fail the second, ensuring that we // get the notification of the second one. @@ -297,8 +328,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let f = b.select(d); drop(f); assert!(drx.recv().is_err()); @@ -309,8 +346,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let mut f = b.select(d); let _res = noop_waker_lw(|lw| f.poll(lw)); drop(f); @@ -322,8 +365,14 @@ fn select2() { { let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let (tx, rx) = channel(); b.select(d).map(move |_| tx.send(()).unwrap()).forget(); drop(a); diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs index c1bc33f..0166ca4 100644 --- a/tests_disabled/bilock.rs +++ b/tests_disabled/bilock.rs @@ -1,11 +1,11 @@ -use futures::task; -use futures::stream; use futures::future; +use futures::stream; +use futures::task; use futures_util::lock::BiLock; use std::thread; -mod support; -use support::*; +// mod support; +// use support::*; #[test] fn smoke() { @@ -41,9 +41,9 @@ fn smoke() { }); assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); } #[test] @@ -51,10 +51,7 @@ fn concurrent() { const N: usize = 10000; let (a, b) = BiLock::new(0); - let a = Increment { - a: Some(a), - remaining: N, - }; + let a = Increment { a: Some(a), remaining: N }; let b = stream::iter_ok(0..N).fold(b, |b, _n| { b.lock().map(|mut b| { *b += 1; @@ -89,7 +86,7 @@ fn concurrent() { fn poll(&mut self) -> Poll<BiLock<usize>, ()> { loop { if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()) + return Ok(self.a.take().unwrap().into()); } let a = self.a.as_ref().unwrap(); diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs index ef0676d..854dbad 100644 --- a/tests_disabled/stream.rs +++ b/tests_disabled/stream.rs @@ -1,26 +1,26 @@ +use futures::channel::mpsc; +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{err, ok}; use futures::stream::{empty, iter_ok, poll_fn, Peekable}; -use futures::channel::oneshot; -use futures::channel::mpsc; -mod support; -use support::*; +// mod support; +// use support::*; pub struct Iter<I> { iter: I, } pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter> - where J: IntoIterator<Item=Result<T, E>>, +where + J: IntoIterator<Item = Result<T, E>>, { - Iter { - iter: i.into_iter(), - } + Iter { iter: i.into_iter() } } impl<I, T, E> Stream for Iter<I> - where I: Iterator<Item=Result<T, E>>, +where + I: Iterator<Item = Result<T, E>>, { type Item = T; type Error = E; @@ -34,21 +34,15 @@ impl<I, T, E> Stream for Iter<I> } } -fn list() -> Box<Stream<Item=i32, Error=u32> + Send> { +fn list() -> Box<Stream<Item = i32, Error = u32> + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Ok(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } -fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> { +fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Err(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } @@ -89,40 +83,31 @@ fn filter() { #[test] fn filter_map() { - assert_done(|| list().filter_map(|x| { - ok(if x % 2 == 0 { - Some(x + 10) - } else { - None - }) - }).collect(), Ok(vec![12])); + assert_done( + || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(), + Ok(vec![12]), + ); } #[test] fn and_then() { assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); - assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), - Err(1)); + assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1)); } #[test] fn then() { assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); - } #[test] fn or_else() { - assert_done(|| err_list().or_else(|a| { - ok::<i32, u32>(a as i32) - }).collect(), Ok(vec![1, 2, 3])); + assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3])); } #[test] fn flatten() { - assert_done(|| list().map(|_| list()).flatten().collect(), - Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); - + assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); } #[test] @@ -132,9 +117,7 @@ fn skip() { #[test] fn skip_passes_errors_through() { - let mut s = block_on_stream( - iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) - ); + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(4))); @@ -144,8 +127,7 @@ fn skip_passes_errors_through() { #[test] fn skip_while() { - assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), - Ok(vec![2, 3])); + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3])); } #[test] fn take() { @@ -154,8 +136,7 @@ fn take() { #[test] fn take_while() { - assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), - Ok(vec![1, 2])); + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2])); } #[test] @@ -193,9 +174,9 @@ fn buffered() { let (a, b) = oneshot::channel::<u32>(); let (c, d) = oneshot::channel::<u32>(); - tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(2); sassert_empty(&mut rx); @@ -211,9 +192,9 @@ fn buffered() { let (a, b) = oneshot::channel::<u32>(); let (c, d) = oneshot::channel::<u32>(); - tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(1); sassert_empty(&mut rx); @@ -233,8 +214,8 @@ fn unordered() { let (c, d) = oneshot::channel::<u32>(); tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); let mut rx = rx.buffer_unordered(2); sassert_empty(&mut rx); @@ -250,8 +231,8 @@ fn unordered() { let (c, d) = oneshot::channel::<u32>(); tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); // We don't even get to see `c` until `a` completes. let mut rx = rx.buffer_unordered(1); @@ -267,21 +248,17 @@ fn unordered() { #[test] fn zip() { - assert_done(|| list().zip(list()).collect(), - Ok(vec![(1, 1), (2, 2), (3, 3)])); - assert_done(|| list().zip(list().take(2)).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| list().take(2).zip(list()).collect(), - Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)])); assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3)); - assert_done(|| list().zip(list().map(|x| x + 1)).collect(), - Ok(vec![(1, 2), (2, 3), (3, 4)])); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)])); } #[test] fn peek() { struct Peek { - inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>> + inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>, } impl Future for Peek { @@ -299,15 +276,12 @@ fn peek() { } } - block_on(Peek { - inner: list().peekable(), - }).unwrap() + block_on(Peek { inner: list().peekable() }).unwrap() } #[test] fn wait() { - assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), - Ok(vec![1, 2, 3])); + assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3])); } #[test] @@ -337,8 +311,10 @@ fn forward() { let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; assert_eq!(v, vec![0, 1, 2, 3]); - assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), - Ok(vec![0, 1, 2, 3, 4, 5])); + assert_done( + move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5]), + ); } #[test] |