diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:02:15 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:02:15 +0000 |
commit | 1de9c728aa47509b5675e6a8e79f076bb86236ec (patch) | |
tree | e2532a639f92776d56f172cf560f5ba2f62642bc | |
parent | 71ac4c321fa28139119502996c08650fb2167eb7 (diff) | |
parent | 6661c46def7a4e79589003500e402827370bd285 (diff) | |
download | futures-android13-mainline-extservices-release.tar.gz |
Snap for 8564071 from 6661c46def7a4e79589003500e402827370bd285 to mainline-extservices-releaseaml_ext_331814220aml_ext_331412000aml_ext_331312000aml_ext_331112010aml_ext_331012020android13-mainline-extservices-release
Change-Id: I85765edb256f7f15cc233a5107f3099b584d06cf
69 files changed, 2391 insertions, 2071 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index f3ad3ab..e483977 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" - } -} + "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + }, + "path_in_vcs": "futures" +}
\ No newline at end of file @@ -41,6 +41,8 @@ rust_library { name: "libfutures", host_supported: true, crate_name: "futures", + cargo_env_compat: true, + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -62,28 +64,9 @@ rust_library { ], apex_available: [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt", ], min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// futures-channel-0.3.14 "alloc,futures-sink,sink,std" -// futures-core-0.3.14 "alloc,std" -// futures-executor-0.3.14 "std" -// futures-io-0.3.14 "std" -// futures-macro-0.3.14 -// futures-sink-0.3.14 "alloc,std" -// futures-task-0.3.14 "alloc,std" -// futures-util-0.3.14 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" -// memchr-2.3.4 "default,std" -// pin-project-lite-0.2.6 -// pin-utils-0.1.0 -// proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.7 -// proc-macro2-1.0.26 "default,proc-macro" -// quote-1.0.9 "default,proc-macro" -// slab-0.4.3 "default,std" -// syn-1.0.70 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" -// unicode-xid-0.2.1 "default" @@ -3,62 +3,80 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" +rust-version = "1.45" name = "futures" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] -description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" +version = "0.3.21" +description = """ +An implementation of futures and streams featuring zero allocations, +composability, and iterator-like interfaces. +""" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3" readme = "../README.md" -keywords = ["futures", "async", "future"] +keywords = [ + "futures", + "async", + "future", +] categories = ["asynchronous"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] [package.metadata.playground] -features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] +features = [ + "std", + "async-await", + "compat", + "io-compat", + "executor", + "thread-pool", +] + [dependencies.futures-channel] -version = "0.3.13" +version = "0.3.21" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-executor] -version = "0.3.13" +version = "0.3.21" optional = true default-features = false [dependencies.futures-io] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-sink] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-task] -version = "0.3.13" +version = "0.3.21" default-features = false [dependencies.futures-util] -version = "0.3.13" +version = "0.3.21" features = ["sink"] default-features = false + [dev-dependencies.assert_matches] version = "1.3.0" @@ -75,16 +93,55 @@ version = "1" version = "0.1.11" [features] -alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] -async-await = ["futures-util/async-await", "futures-util/async-await-macro"] +alloc = [ + "futures-core/alloc", + "futures-task/alloc", + "futures-sink/alloc", + "futures-channel/alloc", + "futures-util/alloc", +] +async-await = [ + "futures-util/async-await", + "futures-util/async-await-macro", +] bilock = ["futures-util/bilock"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] -compat = ["std", "futures-util/compat"] -default = ["std", "async-await", "executor"] -executor = ["std", "futures-executor/std"] -io-compat = ["compat", "futures-util/io-compat"] -read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] -std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"] -thread-pool = ["executor", "futures-executor/thread-pool"] -unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] +cfg-target-has-atomic = [] +compat = [ + "std", + "futures-util/compat", +] +default = [ + "std", + "async-await", + "executor", +] +executor = [ + "std", + "futures-executor/std", +] +io-compat = [ + "compat", + "futures-util/io-compat", +] +std = [ + "alloc", + "futures-core/std", + "futures-task/std", + "futures-io/std", + "futures-sink/std", + "futures-util/std", + "futures-util/io", + "futures-util/channel", +] +thread-pool = [ + "executor", + "futures-executor/thread-pool", +] +unstable = [ + "futures-core/unstable", + "futures-task/unstable", + "futures-channel/unstable", + "futures-io/unstable", + "futures-util/unstable", +] write-all-vectored = ["futures-util/write-all-vectored"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index d046c54..6871f47 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,14 +1,13 @@ [package] name = "futures" +version = "0.3.21" edition = "2018" -version = "0.3.13" -authors = ["Alex Crichton <alex@alexcrichton.com>"] +rust-version = "1.45" license = "MIT OR Apache-2.0" readme = "../README.md" keywords = ["futures", "async", "future"] repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3" description = """ An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces. @@ -16,13 +15,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.13", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.13", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.13", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.13", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.21", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } @@ -47,11 +46,13 @@ thread-pool = ["executor", "futures-executor/thread-pool"] # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] -cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] bilock = ["futures-util/bilock"] -read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] write-all-vectored = ["futures-util/write-all-vectored"] +# These features are no longer used. +# TODO: remove in the next major version. +cfg-target-has-atomic = [] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.13.crate" + value: "https://static.crates.io/crates/futures/futures-0.3.21.crate" } - version: "0.3.13" + version: "0.3.21" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 4 + year: 2022 + month: 3 day: 1 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index ec169ff..e2df61d 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -1,41 +1,39 @@ // Generated by update_crate_tests.py for tests that depend on this crate. { - "presubmit": [ - { - "name": "anyhow_device_test_tests_test_ffi" - }, - { - "name": "anyhow_device_test_tests_test_context" - }, + "imports": [ { - "name": "anyhow_device_test_tests_test_repr" + "path": "external/rust/crates/anyhow" }, { - "name": "anyhow_device_test_tests_test_convert" - }, + "path": "external/rust/crates/tokio" + } + ], + "presubmit": [ { - "name": "anyhow_device_test_tests_test_fmt" + "name": "ZipFuseTest" }, { - "name": "anyhow_device_test_tests_test_boxed" + "name": "authfs_device_test_src_lib" }, { - "name": "anyhow_device_test_tests_test_downcast" + "name": "doh_unit_test" }, { - "name": "anyhow_device_test_tests_test_source" - }, + "name": "virtualizationservice_device_test" + } + ], + "presubmit-rust": [ { - "name": "anyhow_device_test_tests_test_macros" + "name": "ZipFuseTest" }, { - "name": "anyhow_device_test_src_lib" + "name": "authfs_device_test_src_lib" }, { - "name": "anyhow_device_test_tests_test_autotrait" + "name": "doh_unit_test" }, { - "name": "anyhow_device_test_tests_test_chain" + "name": "virtualizationservice_device_test" } ] } diff --git a/cargo2android.json b/cargo2android.json index 01465d0..a7e2a4b 100644 --- a/cargo2android.json +++ b/cargo2android.json @@ -1,11 +1,12 @@ { "apex-available": [ "//apex_available:platform", + "com.android.bluetooth", "com.android.resolv", "com.android.virt" ], - "min_sdk_version": "29", "dependencies": true, "device": true, + "min-sdk-version": "29", "run": true -}
\ No newline at end of file +} @@ -3,12 +3,12 @@ //! This crate provides a number of core abstractions for writing asynchronous //! code: //! -//! - [Futures](crate::future::Future) are single eventual values produced by +//! - [Futures](crate::future) are single eventual values produced by //! asynchronous computations. Some programming languages (e.g. JavaScript) //! call this concept "promise". -//! - [Streams](crate::stream::Stream) represent a series of values +//! - [Streams](crate::stream) represent a series of values //! produced asynchronously. -//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of +//! - [Sinks](crate::sink) provide support for asynchronous writing of //! data. //! - [Executors](crate::executor) are responsible for running asynchronous //! tasks. @@ -25,11 +25,12 @@ //! within macros and keywords such as async and await!. //! //! ```rust +//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 //! # use futures::channel::mpsc; //! # use futures::executor; ///standard executors to provide a context for futures and streams //! # use futures::executor::ThreadPool; //! # use futures::StreamExt; -//! +//! # //! fn main() { //! let pool = ThreadPool::new().expect("Failed to build pool"); //! let (tx, rx) = mpsc::unbounded::<i32>(); @@ -67,7 +68,7 @@ //! }; //! //! // Actually execute the above future, which will invoke Future::poll and -//! // subsequenty chain appropriate Future::poll and methods needing executors +//! // subsequently chain appropriate Future::poll and methods needing executors //! // to drive all futures. Eventually fut_values will be driven to completion. //! let values: Vec<i32> = executor::block_on(fut_values); //! @@ -78,48 +79,46 @@ //! The majority of examples and code snippets in this crate assume that they are //! inside an async block as written above. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] -#![cfg_attr(feature = "read-initializer", feature(read_initializer))] - #![cfg_attr(not(feature = "std"), no_std)] - -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] -// It cannot be included in the published code because this lints have false positives in the minimum required version. -#![cfg_attr(test, warn(single_use_lifetimes))] -#![warn(clippy::all)] -#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - -#[doc(hidden)] +#[doc(no_inline)] pub use futures_core::future::{Future, TryFuture}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::future::{FutureExt, TryFutureExt}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_core::stream::{Stream, TryStream}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::stream::{StreamExt, TryStreamExt}; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_sink::Sink; -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::sink::SinkExt; #[cfg(feature = "std")] -#[doc(hidden)] +#[doc(no_inline)] pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; #[cfg(feature = "std")] -#[doc(hidden)] +#[doc(no_inline)] pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; // Macro reexports @@ -135,6 +134,10 @@ pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-a #[doc(inline)] pub use futures_util::{future, never, sink, stream, task}; +#[cfg(feature = "std")] +#[cfg(feature = "async-await")] +pub use futures_util::stream_select; + #[cfg(feature = "alloc")] #[doc(inline)] pub use futures_channel as channel; @@ -148,13 +151,73 @@ pub use futures_util::io; #[cfg(feature = "executor")] #[cfg_attr(docsrs, doc(cfg(feature = "executor")))] -#[doc(inline)] -pub use futures_executor as executor; +pub mod executor { + //! Built-in executors and related tools. + //! + //! All asynchronous computation occurs within an executor, which is + //! capable of spawning futures as tasks. This module provides several + //! built-in executors, as well as tools for building your own. + //! + //! + //! This module is only available when the `executor` feature of this + //! library is activated. + //! + //! # Using a thread pool (M:N task scheduling) + //! + //! Most of the time tasks should be executed on a [thread pool](ThreadPool). + //! A small set of worker threads can handle a very large set of spawned tasks + //! (which are much lighter weight than threads). Tasks spawned onto the pool + //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on + //! the created threads. + //! + //! # Spawning additional tasks + //! + //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method + //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used + //! instead. + //! + //! # Single-threaded execution + //! + //! In addition to thread pools, it's possible to run a task (and the tasks + //! it spawns) entirely within a single thread via the [`LocalPool`] executor. + //! Aside from cutting down on synchronization costs, this executor also makes + //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The + //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively + //! little work between I/O operations. + //! + //! There is also a convenience function [`block_on`] for simply running a + //! future to completion on the current thread. + //! + //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj + //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj + + pub use futures_executor::{ + block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool, + LocalSpawner, + }; + + #[cfg(feature = "thread-pool")] + #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] + pub use futures_executor::{ThreadPool, ThreadPoolBuilder}; +} #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] -#[doc(inline)] -pub use futures_util::compat; +pub mod compat { + //! Interop between `futures` 0.1 and 0.3. + //! + //! This module is only available when the `compat` feature of this + //! library is activated. + + pub use futures_util::compat::{ + Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt, + Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt, + }; + + #[cfg(feature = "io-compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] + pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; +} pub mod prelude { //! A "prelude" for crates using the `futures` crate. @@ -175,10 +238,12 @@ pub mod prelude { pub use crate::stream::{self, Stream, TryStream}; #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::future::{FutureExt as _, TryFutureExt as _}; #[doc(no_inline)] pub use crate::sink::SinkExt as _; #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::stream::{StreamExt as _, TryStreamExt as _}; #[cfg(feature = "std")] @@ -186,6 +251,7 @@ pub mod prelude { #[cfg(feature = "std")] #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::io::{ AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _, }; diff --git a/tests/_require_features.rs b/tests/_require_features.rs index da76dcd..8046cc9 100644 --- a/tests/_require_features.rs +++ b/tests/_require_features.rs @@ -1,8 +1,13 @@ #[cfg(not(all( - feature = "std", feature = "alloc", feature = "async-await", - feature = "compat", feature = "io-compat", - feature = "executor", feature = "thread-pool", + feature = "std", + feature = "alloc", + feature = "async-await", + feature = "compat", + feature = "io-compat", + feature = "executor", + feature = "thread-pool", )))] -compile_error!("`futures` tests must have all stable features activated: \ +compile_error!( + "`futures` tests must have all stable features activated: \ use `--all-features` or `--features default,thread-pool,io-compat`" ); diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs deleted file mode 100644 index d19a83d..0000000 --- a/tests/arc_wake.rs +++ /dev/null @@ -1,82 +0,0 @@ -mod countingwaker { - use futures::task::{self, ArcWake, Waker}; - use std::sync::{Arc, Mutex}; - - struct CountingWaker { - nr_wake: Mutex<i32>, - } - - impl CountingWaker { - fn new() -> Self { - Self { - nr_wake: Mutex::new(0), - } - } - - fn wakes(&self) -> i32 { - *self.nr_wake.lock().unwrap() - } - } - - impl ArcWake for CountingWaker { - fn wake_by_ref(arc_self: &Arc<Self>) { - let mut lock = arc_self.nr_wake.lock().unwrap(); - *lock += 1; - } - } - - #[test] - fn create_from_arc() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!(2, Arc::strong_count(&some_w)); - w1.wake_by_ref(); - assert_eq!(1, some_w.wakes()); - - let w2 = w1.clone(); - assert_eq!(3, Arc::strong_count(&some_w)); - - w2.wake_by_ref(); - assert_eq!(2, some_w.wakes()); - - drop(w2); - assert_eq!(2, Arc::strong_count(&some_w)); - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); - } - - #[test] - fn ref_wake_same() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - let w2 = task::waker_ref(&some_w); - let w3 = w2.clone(); - - assert!(w1.will_wake(&w2)); - assert!(w2.will_wake(&w3)); - } -} - -#[test] -fn proper_refcount_on_wake_panic() { - use futures::task::{self, ArcWake, Waker}; - use std::sync::Arc; - - struct PanicWaker; - - impl ArcWake for PanicWaker { - fn wake_by_ref(_arc_self: &Arc<Self>) { - panic!("WAKE UP"); - } - } - - let some_w = Arc::new(PanicWaker); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()); - assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); // some_w -} diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs index bd586d6..ce1f3a3 100644 --- a/tests/async_await_macros.rs +++ b/tests/async_await_macros.rs @@ -1,9 +1,16 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, FutureExt}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use futures::{ + join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, +}; +use std::mem; + #[test] fn poll_and_pending() { - use futures::{pending, pin_mut, poll}; - use futures::executor::block_on; - use futures::task::Poll; - let pending_once = async { pending!() }; block_on(async { pin_mut!(pending_once); @@ -14,11 +21,6 @@ fn poll_and_pending() { #[test] fn join() { - use futures::{pin_mut, poll, join}; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::task::Poll; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = oneshot::channel::<i32>(); @@ -39,11 +41,6 @@ fn join() { #[test] fn select() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::<i32>(); let (_tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -62,11 +59,6 @@ fn select() { #[test] fn select_biased() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::select_biased; - let (tx1, rx1) = oneshot::channel::<i32>(); let (_tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -85,12 +77,6 @@ fn select_biased() { #[test] fn select_streams() { - use futures::select; - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - let (mut tx1, rx1) = mpsc::channel::<i32>(1); let (mut tx2, rx2) = mpsc::channel::<i32>(1); let mut rx1 = rx1.fuse(); @@ -134,11 +120,6 @@ fn select_streams() { #[test] fn select_can_move_uncompleted_futures() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = oneshot::channel::<i32>(); tx1.send(1).unwrap(); @@ -165,10 +146,6 @@ fn select_can_move_uncompleted_futures() { #[test] fn select_nested() { - use futures::select; - use futures::executor::block_on; - use futures::future; - let mut outer_fut = future::ready(1); let mut inner_fut = future::ready(2); let res = block_on(async { @@ -183,18 +160,16 @@ fn select_nested() { assert_eq!(res, 3); } +#[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn select_size() { - use futures::select; - use futures::future; - let fut = async { let mut ready = future::ready(0i32); select! { _ = ready => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 24); + assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let mut ready1 = future::ready(0i32); @@ -204,19 +179,13 @@ fn select_size() { _ = ready2 => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 40); + assert_eq!(mem::size_of_val(&fut), 40); } #[test] fn select_on_non_unpin_expressions() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -231,14 +200,8 @@ fn select_on_non_unpin_expressions() { #[test] fn select_on_non_unpin_expressions_with_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -252,15 +215,11 @@ fn select_on_non_unpin_expressions_with_default() { assert_eq!(res, 5); } +#[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn select_on_non_unpin_size() { - use futures::select; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let fut = async { let select_res; @@ -271,15 +230,11 @@ fn select_on_non_unpin_size() { select_res }; - assert_eq!(32, std::mem::size_of_val(&fut)); + assert_eq!(32, mem::size_of_val(&fut)); } #[test] fn select_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::ready(7) => x, @@ -291,11 +246,6 @@ fn select_can_be_used_as_expression() { #[test] fn select_with_default_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future::{FutureExt, poll_fn}; - use futures::task::{Context, Poll}; - fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> { Poll::Pending } @@ -312,10 +262,6 @@ fn select_with_default_can_be_used_as_expression() { #[test] fn select_with_complete_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::pending::<i32>() => x, @@ -330,10 +276,6 @@ fn select_with_complete_can_be_used_as_expression() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -351,10 +293,6 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -373,60 +311,79 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { } #[test] -fn join_size() { - use futures::join; - use futures::future; +#[allow(unused_assignments)] +fn stream_select() { + // stream_select! macro + block_on(async { + let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); + + let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); + assert_eq!(endless_ones.next().await, Some(1)); + assert_eq!(endless_ones.next().await, Some(1)); + + let mut finite_list = + stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, None); + + let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); + // Take 1000, and assert a somewhat even distribution of values. + // The fairness is randomized, but over 1000 samples we should be pretty close to even. + // This test may be a bit flaky. Feel free to adjust the margins as you see fit. + let mut count = 0; + let results = endless_mixed + .take_while(move |_| { + count += 1; + let ret = count < 1000; + async move { ret } + }) + .collect::<Vec<_>>() + .await; + assert!(results.iter().filter(|x| **x == 1).count() >= 299); + assert!(results.iter().filter(|x| **x == 2).count() >= 299); + assert!(results.iter().filter(|x| **x == 3).count() >= 299); + }); +} +#[test] +fn join_size() { let fut = async { let ready = future::ready(0i32); join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn try_join_size() { - use futures::try_join; - use futures::future; - let fut = async { let ready = future::ready(Ok::<i32, i32>(0)); try_join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(Ok::<i32, i32>(0)); let ready2 = future::ready(Ok::<i32, i32>(0)); try_join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn join_doesnt_require_unpin() { - use futures::join; - - let _ = async { - join!(async {}, async {}) - }; + let _ = async { join!(async {}, async {}) }; } #[test] fn try_join_doesnt_require_unpin() { - use futures::try_join; - - let _ = async { - try_join!( - async { Ok::<(), ()>(()) }, - async { Ok::<(), ()>(()) }, - ) - }; + let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; } diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs index 111fdf6..b3d8b00 100644 --- a/tests/auto_traits.rs +++ b/tests/auto_traits.rs @@ -470,6 +470,13 @@ pub mod future { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn<PhantomPinned>: Unpin); + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + assert_impl!(Ready<()>: Send); assert_not_impl!(Ready<*const ()>: Send); assert_impl!(Ready<()>: Sync); @@ -810,6 +817,12 @@ pub mod io { assert_impl!(Seek<'_, ()>: Unpin); assert_not_impl!(Seek<'_, PhantomPinned>: Unpin); + assert_impl!(SeeKRelative<'_, ()>: Send); + assert_not_impl!(SeeKRelative<'_, *const ()>: Send); + assert_impl!(SeeKRelative<'_, ()>: Sync); + assert_not_impl!(SeeKRelative<'_, *const ()>: Sync); + assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin); + assert_impl!(Sink: Send); assert_impl!(Sink: Sync); assert_impl!(Sink: Unpin); @@ -1383,6 +1396,26 @@ pub mod stream { assert_impl!(Next<'_, ()>: Unpin); assert_not_impl!(Next<'_, PhantomPinned>: Unpin); + assert_impl!(NextIf<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIf<'_, SendStream, ()>: Send); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin); + + assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin); + assert_impl!(Once<()>: Send); assert_not_impl!(Once<*const ()>: Send); assert_impl!(Once<()>: Sync); @@ -1410,6 +1443,14 @@ pub mod stream { assert_not_impl!(Peek<'_, LocalStream<()>>: Sync); assert_impl!(Peek<'_, PinnedStream>: Unpin); + assert_impl!(PeekMut<'_, SendStream<()>>: Send); + assert_not_impl!(PeekMut<'_, SendStream>: Send); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send); + assert_impl!(PeekMut<'_, SyncStream<()>>: Sync); + assert_not_impl!(PeekMut<'_, SyncStream>: Sync); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync); + assert_impl!(PeekMut<'_, PinnedStream>: Unpin); + assert_impl!(Peekable<SendStream<()>>: Send); assert_not_impl!(Peekable<SendStream>: Send); assert_not_impl!(Peekable<LocalStream>: Send); @@ -1431,6 +1472,13 @@ pub mod stream { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn<PhantomPinned>: Unpin); + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + assert_impl!(ReadyChunks<SendStream<()>>: Send); assert_not_impl!(ReadyChunks<SendStream>: Send); assert_not_impl!(ReadyChunks<LocalStream>: Send); @@ -1780,25 +1828,40 @@ pub mod stream { assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin); assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin); - assert_not_impl!(futures_unordered::Iter<()>: Send); - assert_not_impl!(futures_unordered::Iter<()>: Sync); + assert_impl!(futures_unordered::Iter<()>: Send); + assert_not_impl!(futures_unordered::Iter<*const ()>: Send); + assert_impl!(futures_unordered::Iter<()>: Sync); + assert_not_impl!(futures_unordered::Iter<*const ()>: Sync); assert_impl!(futures_unordered::Iter<()>: Unpin); - // futures_unordered::Iter requires `Fut: Unpin` + // The definition of futures_unordered::Iter has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterMut<()>: Send); - assert_not_impl!(futures_unordered::IterMut<()>: Sync); + assert_impl!(futures_unordered::IterMut<()>: Send); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Send); + assert_impl!(futures_unordered::IterMut<()>: Sync); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync); assert_impl!(futures_unordered::IterMut<()>: Unpin); - // futures_unordered::IterMut requires `Fut: Unpin` + // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds. // assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterPinMut<()>: Send); - assert_not_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_impl!(futures_unordered::IterPinMut<()>: Send); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send); + assert_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin); - assert_not_impl!(futures_unordered::IterPinRef<()>: Send); - assert_not_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin); } /// Assert Send/Sync/Unpin for all public types in `futures::task`. diff --git a/tests/compat.rs b/tests/compat.rs index 39adc7c..ac04a95 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -1,16 +1,15 @@ #![cfg(feature = "compat")] +#![cfg(not(miri))] // Miri does not support epoll -use tokio::timer::Delay; -use tokio::runtime::Runtime; -use std::time::Instant; -use futures::prelude::*; use futures::compat::Future01CompatExt; +use futures::prelude::*; +use std::time::Instant; +use tokio::runtime::Runtime; +use tokio::timer::Delay; #[test] fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() { - let f = async { - Delay::new(Instant::now()).compat().await - }; + let f = async { Delay::new(Instant::now()).compat().await }; let mut runtime = Runtime::new().unwrap(); runtime.block_on(f.boxed().compat()).unwrap(); diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs index 11edb1b..9925077 100644 --- a/tests/eager_drop.rs +++ b/tests/eager_drop.rs @@ -1,16 +1,23 @@ +use futures::channel::oneshot; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use pin_project::pin_project; +use std::pin::Pin; +use std::sync::mpsc; + #[test] fn map_ok() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_ok` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::<Result<i32, i32>>(Err(1)) - .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_ok(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -22,17 +29,16 @@ fn map_ok() { #[test] fn map_err() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_err` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::<Result<i32, i32>>(Ok(1)) - .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_err(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -42,96 +48,74 @@ fn map_err() { rx2.recv().unwrap(); } -mod channelled { - use futures::future::Future; - use futures::task::{Context,Poll}; - use pin_project::pin_project; - use std::pin::Pin; - - #[pin_project] - struct FutureData<F, T> { - _data: T, - #[pin] - future: F, - } +#[pin_project] +struct FutureData<F, T> { + _data: T, + #[pin] + future: F, +} - impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { - type Output = F::Output; +impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { + type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { - self.project().future.poll(cx) - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { + self.project().future.poll(cx) } +} - #[test] - fn then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<()>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(()) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(()).unwrap(); - rx2.recv().unwrap(); - } +#[test] +fn then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); - #[test] - fn and_then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .and_then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Ok(())).unwrap(); - rx2.recv().unwrap(); - } + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); - #[test] - fn or_else_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .or_else(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready::<Result<(), ()>>(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Err(())).unwrap(); - rx2.recv().unwrap(); - } + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn and_then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn or_else_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::<Result<(), ()>>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); } diff --git a/tests/eventual.rs b/tests/eventual.rs index bff000d..3461380 100644 --- a/tests/eventual.rs +++ b/tests/eventual.rs @@ -1,3 +1,5 @@ +#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 + use futures::channel::oneshot; use futures::executor::ThreadPool; use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; diff --git a/tests/abortable.rs b/tests/future_abortable.rs index 6b5a25c..e119f0b 100644 --- a/tests/abortable.rs +++ b/tests/future_abortable.rs @@ -1,45 +1,44 @@ +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{abortable, Aborted, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::new_count_waker; + #[test] fn abortable_works() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted}; - use futures::executor::block_on; - let (_tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, abort_handle) = abortable(a_rx); abort_handle.abort(); + assert!(abortable_rx.is_aborted()); assert_eq!(Err(Aborted), block_on(abortable_rx)); } #[test] fn abortable_awakens() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted, FutureExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); + assert_eq!(counter, 0); assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx)); assert_eq!(counter, 0); + abort_handle.abort(); assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); } #[test] fn abortable_resolves() { - use futures::channel::oneshot; - use futures::future::abortable; - use futures::executor::block_on; let (tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, _abort_handle) = abortable(a_rx); tx.send(()).unwrap(); + assert!(!abortable_rx.is_aborted()); assert_eq!(Ok(Ok(())), block_on(abortable_rx)); } diff --git a/tests/basic_combinators.rs b/tests/future_basic_combinators.rs index fa65b6f..372ab48 100644 --- a/tests/basic_combinators.rs +++ b/tests/future_basic_combinators.rs @@ -13,17 +13,21 @@ fn basic_future_combinators() { tx1.send(x).unwrap(); // Send 1 tx1.send(2).unwrap(); // Send 2 future::ready(3) - }).map(move |x| { + }) + .map(move |x| { tx2.send(x).unwrap(); // Send 3 tx2.send(4).unwrap(); // Send 4 5 - }).map(move |x| { + }) + .map(move |x| { tx3.send(x).unwrap(); // Send 5 }); assert!(rx.try_recv().is_err()); // Not started yet fut.run_in_background(); // Start it - for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it + for i in 1..=5 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it assert!(rx.recv().is_err()); // Should be done } @@ -93,6 +97,8 @@ fn basic_try_future_combinators() { assert!(rx.try_recv().is_err()); // Not started yet fut.run_in_background(); // Start it - for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it + for i in 1..=12 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it assert!(rx.recv().is_err()); // Should be done } diff --git a/tests/fuse.rs b/tests/future_fuse.rs index 83f2c1c..83f2c1c 100644 --- a/tests/fuse.rs +++ b/tests/future_fuse.rs diff --git a/tests/future_inspect.rs b/tests/future_inspect.rs new file mode 100644 index 0000000..eacd1f7 --- /dev/null +++ b/tests/future_inspect.rs @@ -0,0 +1,16 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt}; + +#[test] +fn smoke() { + let mut counter = 0; + + { + let work = future::ready::<i32>(40).inspect(|val| { + counter += *val; + }); + assert_eq!(block_on(work), 40); + } + + assert_eq!(counter, 40); +} diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs new file mode 100644 index 0000000..44486e1 --- /dev/null +++ b/tests/future_join_all.rs @@ -0,0 +1,41 @@ +use futures::executor::block_on; +use futures::future::{join_all, ready, Future, JoinAll}; +use futures::pin_mut; +use std::fmt::Debug; + +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) +where + T: PartialEq + Debug, +{ + pin_mut!(actual_fut); + let output = block_on(actual_fut); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]); + assert_done(join_all(vec![ready(1)]), vec![1]); + // REVIEW: should this be implemented? + // assert_done(join_all(Vec::<i32>::new()), vec![]); + + // TODO: needs more tests +} + +#[test] +fn join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `JoinAll`. + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> { + let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); + join_all(iter) + } + + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); +} + +#[test] +fn join_all_from_iter() { + assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2]) +} diff --git a/tests/future_obj.rs b/tests/future_obj.rs index c6b18fc..0e52534 100644 --- a/tests/future_obj.rs +++ b/tests/future_obj.rs @@ -1,6 +1,6 @@ -use futures::future::{Future, FutureObj, FutureExt}; -use std::pin::Pin; +use futures::future::{Future, FutureExt, FutureObj}; use futures::task::{Context, Poll}; +use std::pin::Pin; #[test] fn dropping_does_not_segfault() { diff --git a/tests/select_all.rs b/tests/future_select_all.rs index 540db2c..299b479 100644 --- a/tests/select_all.rs +++ b/tests/future_select_all.rs @@ -1,14 +1,10 @@ +use futures::executor::block_on; +use futures::future::{ready, select_all}; +use std::collections::HashSet; + #[test] fn smoke() { - use futures::executor::block_on; - use futures::future::{ready, select_all}; - use std::collections::HashSet; - - let v = vec![ - ready(1), - ready(2), - ready(3), - ]; + let v = vec![ready(1), ready(2), ready(3)]; let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>(); diff --git a/tests/select_ok.rs b/tests/future_select_ok.rs index 81cadb7..8aec003 100644 --- a/tests/select_ok.rs +++ b/tests/future_select_ok.rs @@ -1,14 +1,9 @@ +use futures::executor::block_on; +use futures::future::{err, ok, select_ok}; + #[test] fn ignore_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - err(1), - err(2), - ok(3), - ok(4), - ]; + let v = vec![err(1), err(2), ok(3), ok(4)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 3); @@ -23,14 +18,7 @@ fn ignore_err() { #[test] fn last_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - ok(1), - err(2), - err(3), - ]; + let v = vec![ok(1), err(2), err(3)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 1); diff --git a/tests/shared.rs b/tests/future_shared.rs index cc0c6a2..3ceaebb 100644 --- a/tests/shared.rs +++ b/tests/future_shared.rs @@ -1,22 +1,22 @@ -mod count_clone { - use std::cell::Cell; - use std::rc::Rc; - - pub struct CountClone(pub Rc<Cell<i32>>); - - impl Clone for CountClone { - fn clone(&self) -> Self { - self.0.set(self.0.get() + 1); - Self(self.0.clone()) - } +use futures::channel::oneshot; +use futures::executor::{block_on, LocalPool}; +use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; +use futures::task::LocalSpawn; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::task::Poll; +use std::thread; + +struct CountClone(Rc<Cell<i32>>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + Self(self.0.clone()) } } fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::thread; let (tx, rx) = oneshot::channel::<i32>(); let f = rx.shared(); let join_handles = (0..threads_number) @@ -53,11 +53,6 @@ fn many_threads() { #[test] fn drop_on_one_task_ok() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use std::thread; - let (tx, rx) = oneshot::channel::<u32>(); let f1 = rx.shared(); let f2 = f1.clone(); @@ -86,11 +81,6 @@ fn drop_on_one_task_ok() { #[test] fn drop_in_poll() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, LocalFutureObj}; - use std::cell::RefCell; - use std::rc::Rc; - let slot1 = Rc::new(RefCell::new(None)); let slot2 = slot1.clone(); @@ -106,13 +96,9 @@ fn drop_in_poll() { assert_eq!(block_on(future1), 1); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn peek() { - use futures::channel::oneshot; - use futures::executor::LocalPool; - use futures::future::{FutureExt, LocalFutureObj}; - use futures::task::LocalSpawn; - let mut local_pool = LocalPool::new(); let spawn = &mut local_pool.spawner(); @@ -134,9 +120,7 @@ fn peek() { } // Once the Shared has been polled, the value is peekable on the clone. - spawn - .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))) - .unwrap(); + spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); local_pool.run(); for _ in 0..2 { assert_eq!(*f2.peek().unwrap(), Ok(42)); @@ -145,10 +129,6 @@ fn peek() { #[test] fn downgrade() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx, rx) = oneshot::channel::<i32>(); let shared = rx.shared(); // Since there are outstanding `Shared`s, we can get a `WeakShared`. @@ -173,14 +153,6 @@ fn downgrade() { #[test] fn dont_clone_in_single_owner_shared_future() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -193,14 +165,6 @@ fn dont_clone_in_single_owner_shared_future() { #[test] fn dont_do_unnecessary_clones_on_output() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -215,11 +179,6 @@ fn dont_do_unnecessary_clones_on_output() { #[test] fn shared_future_that_wakes_itself_until_pending_is_returned() { - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::task::Poll; - let proceed = Cell::new(false); let fut = futures::future::poll_fn(|cx| { if proceed.get() { @@ -233,8 +192,5 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() { // The join future can only complete if the second future gets a chance to run after the first // has returned pending - assert_eq!( - block_on(futures::future::join(fut, async { proceed.set(true) })), - ((), ()) - ); + assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs index 4a614f9..82ae1ba 100644 --- a/tests/future_try_flatten_stream.rs +++ b/tests/future_try_flatten_stream.rs @@ -1,9 +1,14 @@ +use futures::executor::block_on_stream; +use futures::future::{err, ok, TryFutureExt}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::stream::{self, StreamExt}; +use futures::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + #[test] fn successful_future() { - use futures::executor::block_on_stream; - use futures::future::{ok, TryFutureExt}; - use futures::stream::{self, StreamExt}; - let stream_items = vec![17, 19]; let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); @@ -17,15 +22,8 @@ fn successful_future() { #[test] fn failed_future() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::executor::block_on_stream; - use futures::future::{err, TryFutureExt}; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - struct PanickingStream<T, E> { - _marker: PhantomData<(T, E)> + _marker: PhantomData<(T, E)>, } impl<T, E> Stream for PanickingStream<T, E> { @@ -45,13 +43,6 @@ fn failed_future() { #[test] fn assert_impls() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::sink::Sink; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - use futures::future::{ok, TryFutureExt}; - struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>); impl<T, E, Item> Stream for StreamSink<T, E, Item> { diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs new file mode 100644 index 0000000..9a82487 --- /dev/null +++ b/tests/future_try_join_all.rs @@ -0,0 +1,46 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures_util::future::{err, ok, try_join_all, TryJoinAll}; +use std::fmt::Debug; +use std::future::Future; + +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) +where + T: PartialEq + Debug, +{ + pin_mut!(actual_fut); + let output = block_on(actual_fut); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2])); + assert_done(try_join_all(vec![ok(1), err(2)]), Err(2)); + assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1])); + // REVIEW: should this be implemented? + // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![])); + + // TODO: needs more tests +} + +#[test] +fn try_join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `TryJoinAll`. + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> { + let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len())); + try_join_all(iter) + } + + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); +} + +#[test] +fn try_join_all_from_iter() { + assert_done( + vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(), + Ok::<_, usize>(vec![1, 2]), + ) +} diff --git a/tests/inspect.rs b/tests/inspect.rs deleted file mode 100644 index 375778b..0000000 --- a/tests/inspect.rs +++ /dev/null @@ -1,14 +0,0 @@ -#[test] -fn smoke() { - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - let mut counter = 0; - - { - let work = future::ready::<i32>(40).inspect(|val| { counter += *val; }); - assert_eq!(block_on(work), 40); - } - - assert_eq!(counter, 40); -} diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs index f8f9d14..717297c 100644 --- a/tests/io_buf_reader.rs +++ b/tests/io_buf_reader.rs @@ -1,156 +1,240 @@ -macro_rules! run_fill_buf { - ($reader:expr) => {{ - use futures_test::task::noop_context; - use futures::task::Poll; - use std::pin::Pin; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - break x; - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, + BufReader, SeekFrom, +}; +use futures::pin_mut; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use pin_project::pin_project; +use std::cmp; +use std::io; +use std::pin::Pin; + +// helper for maybe_pending_* tests +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } - }}; + } } -mod util { - use futures::future::Future; - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } +// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719 +#[pin_project(!Unpin)] +struct Cursor<T> { + #[pin] + inner: futures::io::Cursor<T>, +} + +impl<T> Cursor<T> { + fn new(inner: T) -> Self { + Self { inner: futures::io::Cursor::new(inner) } } } -mod maybe_pending { - use futures::task::{Context,Poll}; - use std::{cmp,io}; - use std::pin::Pin; - use futures::io::{AsyncRead,AsyncBufRead}; +impl AsyncRead for Cursor<&[u8]> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_read(cx, buf) + } +} - pub struct MaybePending<'a> { - inner: &'a [u8], - ready_read: bool, - ready_fill_buf: bool, +impl AsyncBufRead for Cursor<&[u8]> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) } - impl<'a> MaybePending<'a> { - pub fn new(inner: &'a [u8]) -> Self { - Self { inner, ready_read: false, ready_fill_buf: false } - } + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) } +} - impl AsyncRead for MaybePending<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { - if self.ready_read { - self.ready_read = false; - Pin::new(&mut self.inner).poll_read(cx, buf) - } else { - self.ready_read = true; - Poll::Pending - } +impl AsyncSeek for Cursor<&[u8]> { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + self.project().inner.poll_seek(cx, pos) + } +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending } } +} - impl AsyncBufRead for MaybePending<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) - -> Poll<io::Result<&[u8]>> - { - if self.ready_fill_buf { - self.ready_fill_buf = false; - if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } - let len = cmp::min(2, self.inner.len()); - Poll::Ready(Ok(&self.inner[0..len])) - } else { - self.ready_fill_buf = true; - Poll::Pending +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending } + } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.inner = &self.inner[amt..]; - } + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; } } #[test] fn test_buffered_reader() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, BufReader}; - - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, inner); - - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 3); - assert_eq!(buf, [5, 6, 7]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 2); - assert_eq!(buf, [0, 1]); - assert_eq!(reader.buffer(), []); - - let mut buf = [0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [2]); - assert_eq!(reader.buffer(), [3]); - - let mut buf = [0, 0, 0]; - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [3, 0, 0]); - assert_eq!(reader.buffer(), []); + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); +} - let nread = block_on(reader.read(&mut buf)); - assert_eq!(nread.unwrap(), 1); - assert_eq!(buf, [4, 0, 0]); - assert_eq!(reader.buffer(), []); +#[test] +fn test_buffered_reader_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]); + reader.as_mut().consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); + }); +} - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +#[test] +fn test_buffered_reader_seek_relative() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert!(reader.as_mut().seek_relative(3).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(0).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]); + assert!(reader.as_mut().seek_relative(-1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(2).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]); + }); } #[test] -fn test_buffered_reader_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom}; - use std::pin::Pin; - use util::run; +fn test_buffered_reader_invalidated_after_read() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 5); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); +} - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); - - assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); - Pin::new(&mut reader).consume(1); - assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +#[test] +fn test_buffered_reader_invalidated_after_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + assert!(reader.seek(SeekFrom::Current(5)).await.is_ok()); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); } #[test] fn test_buffered_reader_seek_underflow() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom}; - use std::io; - // gimmick reader that yields its position modulo 256 for each byte struct PositionReader { - pos: u64 + pos: u64, } impl io::Read for PositionReader { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { @@ -172,32 +256,31 @@ fn test_buffered_reader_seek_underflow() { self.pos = self.pos.wrapping_add(n as u64); } SeekFrom::End(n) => { - self.pos = u64::max_value().wrapping_add(n as u64); + self.pos = u64::MAX.wrapping_add(n as u64); } } Ok(self.pos) } } - let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); - assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // the following seek will require two underlying seeks - let expected = 9_223_372_036_854_775_802; - assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); - assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); - // seeking to 0 should empty the buffer. - assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); - assert_eq!(reader.get_ref().get_ref().pos, expected); + block_on(async { + let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + pin_mut!(reader); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().get_ref().pos, expected); + }); } #[test] fn test_short_reads() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, AllowStdIo, BufReader}; - use std::io; - /// A dummy reader intended at testing short-reads propagation. struct ShortReader { lengths: Vec<usize>, @@ -213,24 +296,22 @@ fn test_short_reads() { } } - let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; - let mut reader = BufReader::new(AllowStdIo::new(inner)); - let mut buf = [0, 0]; - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); - assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + block_on(async { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); } #[test] fn maybe_pending() { - use futures::io::{AsyncReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); @@ -268,10 +349,6 @@ fn maybe_pending() { #[test] fn maybe_pending_buf_read() { - use futures::io::{AsyncBufReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); let mut reader = BufReader::with_capacity(2, inner); let mut v = Vec::new(); @@ -291,68 +368,65 @@ fn maybe_pending_buf_read() { // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 #[test] fn maybe_pending_seek() { - use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader, - Cursor, SeekFrom - }; - use futures::task::{Context,Poll}; - use std::io; - use std::pin::Pin; - use util::run; - pub struct MaybePendingSeek<'a> { + #[pin_project] + struct MaybePendingSeek<'a> { + #[pin] inner: Cursor<&'a [u8]>, ready: bool, } impl<'a> MaybePendingSeek<'a> { - pub fn new(inner: &'a [u8]) -> Self { + fn new(inner: &'a [u8]) -> Self { Self { inner: Cursor::new(inner), ready: true } } } impl AsyncRead for MaybePendingSeek<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { - Pin::new(&mut self.inner).poll_read(cx, buf) + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_read(cx, buf) } } impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<io::Result<&[u8]>> - { - let this: *mut Self = &mut *self as *mut _; - Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) } } impl AsyncSeek for MaybePendingSeek<'_> { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll<io::Result<u64>> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_seek(cx, pos) + *self.as_mut().project().ready = false; + self.project().inner.poll_seek(cx, pos) } else { - self.ready = true; + *self.project().ready = true; Poll::Pending } } } let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; - let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + pin_mut!(reader); assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); - assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..])); Pin::new(&mut reader).consume(1); assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); } diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs index d58a6d8..b264cd5 100644 --- a/tests/io_buf_writer.rs +++ b/tests/io_buf_writer.rs @@ -1,67 +1,59 @@ -mod maybe_pending { - use futures::io::AsyncWrite; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - pub struct MaybePending { - pub inner: Vec<u8>, - ready: bool, - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io; +use std::pin::Pin; + +struct MaybePending { + inner: Vec<u8>, + ready: bool, +} - impl MaybePending { - pub fn new(inner: Vec<u8>) -> Self { - Self { inner, ready: false } - } +impl MaybePending { + fn new(inner: Vec<u8>) -> Self { + Self { inner, ready: false } } +} - impl AsyncWrite for MaybePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_write(cx, buf) - } else { - self.ready = true; - Poll::Pending - } +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending } + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - Pin::new(&mut self.inner).poll_flush(cx) - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - Pin::new(&mut self.inner).poll_close(cx) - } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_close(cx) } } -mod util { - use futures::future::Future; - - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } #[test] fn buf_writer() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut writer = BufWriter::with_capacity(2, Vec::new()); block_on(writer.write(&[0, 1])).unwrap(); @@ -104,9 +96,6 @@ fn buf_writer() { #[test] fn buf_writer_inner_flushes() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut w = BufWriter::with_capacity(3, Vec::new()); block_on(w.write(&[0, 1])).unwrap(); assert_eq!(*w.get_ref(), []); @@ -117,9 +106,6 @@ fn buf_writer_inner_flushes() { #[test] fn buf_writer_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, // use `Vec::new` instead of `vec![0; 8]`. let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); @@ -135,11 +121,6 @@ fn buf_writer_seek() { #[test] fn maybe_pending_buf_writer() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); run(writer.write(&[0, 1])).unwrap(); @@ -182,11 +163,6 @@ fn maybe_pending_buf_writer() { #[test] fn maybe_pending_buf_writer_inner_flushes() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); run(w.write(&[0, 1])).unwrap(); assert_eq!(&w.get_ref().inner, &[]); @@ -197,13 +173,6 @@ fn maybe_pending_buf_writer_inner_flushes() { #[test] fn maybe_pending_buf_writer_seek() { - use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - use util::run; - struct MaybePendingSeek { inner: Cursor<Vec<u8>>, ready_write: bool, @@ -241,9 +210,11 @@ fn maybe_pending_buf_writer_seek() { } impl AsyncSeek for MaybePendingSeek { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll<io::Result<u64>> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { if self.ready_seek { self.ready_seek = false; Pin::new(&mut self.inner).poll_seek(cx, pos) diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs index 4ba6342..435ea5a 100644 --- a/tests/io_cursor.rs +++ b/tests/io_cursor.rs @@ -1,13 +1,14 @@ +use assert_matches::assert_matches; +use futures::executor::block_on; +use futures::future::lazy; +use futures::io::{AsyncWrite, Cursor}; +use futures::task::Poll; +use std::pin::Pin; + #[test] fn cursor_asyncwrite_vec() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5]); - futures::executor::block_on(lazy(|cx| { + block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2))); @@ -18,14 +19,8 @@ fn cursor_asyncwrite_vec() { #[test] fn cursor_asyncwrite_box() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); - futures::executor::block_on(lazy(|cx| { + block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1))); diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs new file mode 100644 index 0000000..b483e0f --- /dev/null +++ b/tests/io_line_writer.rs @@ -0,0 +1,73 @@ +use futures::executor::block_on; +use futures::io::{AsyncWriteExt, LineWriter}; +use std::io; + +#[test] +fn line_writer() { + let mut writer = LineWriter::new(Vec::new()); + + block_on(writer.write(&[0])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.write(&[1])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + + block_on(writer.write(&[3, b'\n'])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); +} + +#[test] +fn line_vectored() { + let mut line_writer = LineWriter::new(Vec::new()); + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"\n"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + ])) + .unwrap(), + 2 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"b"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"c"), + ])) + .unwrap(), + 3 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + block_on(line_writer.flush()).unwrap(); + assert_eq!(line_writer.get_ref(), b"\nabac"); + assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + ])) + .unwrap(), + 0 + ); + + assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3); + assert_eq!(line_writer.get_ref(), b"\nabaca\nb"); +} diff --git a/tests/io_lines.rs b/tests/io_lines.rs index 2552c7c..5ce01a6 100644 --- a/tests/io_lines.rs +++ b/tests/io_lines.rs @@ -1,32 +1,34 @@ -mod util { - use futures::future::Future; - - pub fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } -#[test] -fn lines() { - use futures::executor::block_on; - use futures::stream::StreamExt; - use futures::io::{AsyncBufReadExt, Cursor}; +macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; +} - macro_rules! block_on_next { - ($expr:expr) => { - block_on($expr.next()).unwrap().unwrap() - }; - } +macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; +} +#[test] +fn lines() { let buf = Cursor::new(&b"12\r"[..]); let mut s = buf.lines(); assert_eq!(block_on_next!(s), "12\r".to_string()); @@ -41,22 +43,8 @@ fn lines() { #[test] fn maybe_pending() { - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - - use util::run; - - macro_rules! run_next { - ($expr:expr) => { - run($expr.next()).unwrap().unwrap() - }; - } - - let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]]) - .map(Ok) - .into_async_read() - .interleave_pending(); + let buf = + stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending(); let mut s = buf.lines(); assert_eq!(run_next!(s), "12\r".to_string()); assert!(run(s.next()).is_none()); diff --git a/tests/io_read.rs b/tests/io_read.rs index 5902ad0..d39a6ea 100644 --- a/tests/io_read.rs +++ b/tests/io_read.rs @@ -1,27 +1,26 @@ -mod mock_reader { - use futures::io::AsyncRead; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncRead; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockReader { - fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>, - } +struct MockReader { + fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>, +} - impl MockReader { - pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockReader { + fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncRead for MockReader { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll<io::Result<usize>> { - (self.get_mut().fun)(buf) - } +impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) } } @@ -29,14 +28,6 @@ mod mock_reader { /// calls `poll_read` with an empty slice if no buffers are provided. #[test] fn read_vectored_no_buffers() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -53,14 +44,6 @@ fn read_vectored_no_buffers() { /// calls `poll_read` with the first non-empty buffer. #[test] fn read_vectored_first_non_empty() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf.len(), 4); buf.copy_from_slice(b"four"); diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs index bd4b36d..6582e50 100644 --- a/tests/io_read_exact.rs +++ b/tests/io_read_exact.rs @@ -1,14 +1,14 @@ +use futures::executor::block_on; +use futures::io::AsyncReadExt; + #[test] fn read_exact() { - use futures::executor::block_on; - use futures::io::AsyncReadExt; - let mut reader: &[u8] = &[1, 2, 3, 4, 5]; let mut out = [0u8; 3]; let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out assert!(res.is_ok()); - assert_eq!(out, [1,2,3]); + assert_eq!(out, [1, 2, 3]); assert_eq!(reader.len(), 2); let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs index 51e8126..88a8779 100644 --- a/tests/io_read_line.rs +++ b/tests/io_read_line.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_line() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = String::new(); assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); @@ -22,34 +36,13 @@ fn read_line() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); assert_eq!(v, "12"); - let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]]) - .map(Ok) - .into_async_read() - .interleave_pending(); + let mut buf = + stream::iter(vec![&b"12"[..], &b"\n\n"[..]]).map(Ok).into_async_read().interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); assert_eq!(v, "12\n"); diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs index 892d463..7122511 100644 --- a/tests/io_read_to_end.rs +++ b/tests/io_read_to_end.rs @@ -1,4 +1,5 @@ use futures::{ + executor::block_on, io::{self, AsyncRead, AsyncReadExt}, task::{Context, Poll}, }; @@ -12,7 +13,7 @@ fn issue2310() { } impl MyRead { - pub fn new() -> Self { + fn new() -> Self { MyRead { first: false } } } @@ -39,7 +40,7 @@ fn issue2310() { } impl VecWrapper { - pub fn new() -> Self { + fn new() -> Self { VecWrapper { inner: Vec::new() } } } @@ -55,7 +56,7 @@ fn issue2310() { } } - futures::executor::block_on(async { + block_on(async { let mut vec = VecWrapper::new(); let mut read = MyRead::new(); diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs index 2e9c00a..ae6aaa2 100644 --- a/tests/io_read_to_string.rs +++ b/tests/io_read_to_string.rs @@ -1,8 +1,13 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + #[test] fn read_to_string() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, Cursor}; - let mut c = Cursor::new(&b""[..]); let mut v = String::new(); assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); @@ -20,16 +25,7 @@ fn read_to_string() { #[test] fn interleave_pending() { - use futures::future::Future; - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncReadExt; - use futures_test::io::AsyncReadTestExt; - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - let mut cx = noop_context(); loop { if let Poll::Ready(x) = f.poll_unpin(&mut cx) { diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs index 6fa22ee..71f857f 100644 --- a/tests/io_read_until.rs +++ b/tests/io_read_until.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_until() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); @@ -22,25 +36,6 @@ fn read_until() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run<F: Future + Unpin>(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); diff --git a/tests/io_write.rs b/tests/io_write.rs index 363f32b..6af2755 100644 --- a/tests/io_write.rs +++ b/tests/io_write.rs @@ -1,35 +1,34 @@ -mod mock_writer { - use futures::io::AsyncWrite; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockWriter { - fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>, - } +struct MockWriter { + fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>, +} - impl MockWriter { - pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockWriter { + fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncWrite for MockWriter { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - (self.get_mut().fun)(buf) - } +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) + } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { - panic!() - } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() + } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { - panic!() - } + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() } } @@ -37,14 +36,6 @@ mod mock_writer { /// calls `poll_write` with an empty slice if no buffers are provided. #[test] fn write_vectored_no_buffers() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -61,24 +52,12 @@ fn write_vectored_no_buffers() { /// calls `poll_write` with the first non-empty buffer. #[test] fn write_vectored_first_non_empty() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b"four"); Poll::Ready(Ok(4)) }); let cx = &mut panic_context(); - let bufs = &mut [ - io::IoSlice::new(&[]), - io::IoSlice::new(&[]), - io::IoSlice::new(b"four") - ]; + let bufs = &mut [io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(b"four")]; let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); let res = res.map_err(|e| e.kind()); diff --git a/tests/join_all.rs b/tests/join_all.rs deleted file mode 100644 index c322e58..0000000 --- a/tests/join_all.rs +++ /dev/null @@ -1,51 +0,0 @@ -mod util { - use std::future::Future; - use std::fmt::Debug; - - pub fn assert_done<T, F>(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, - { - use futures::executor::block_on; - - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } -} - -#[test] -fn collect_collects() { - use futures_util::future::{join_all,ready}; - - util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); - // REVIEW: should this be implemented? - // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]); - - // TODO: needs more tests -} - -#[test] -fn join_all_iter_lifetime() { - use futures_util::future::{join_all,ready}; - use std::future::Future; - // In futures-rs version 0.1, this function would fail to typecheck due to an overly - // conservative type parameterization of `JoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> { - let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); - Box::new(join_all(iter)) - } - - util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]); -} - -#[test] -fn join_all_from_iter() { - use futures_util::future::{JoinAll,ready}; - - util::assert_done( - || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()), - vec![1, 2], - ) -} diff --git a/tests/mutex.rs b/tests/lock_mutex.rs index 68e0301..c92ef50 100644 --- a/tests/mutex.rs +++ b/tests/lock_mutex.rs @@ -1,9 +1,15 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, ThreadPool}; +use futures::future::{ready, FutureExt}; +use futures::lock::Mutex; +use futures::stream::StreamExt; +use futures::task::{Context, SpawnExt}; +use futures_test::future::FutureTestExt; +use futures_test::task::{new_count_waker, panic_context}; +use std::sync::Arc; + #[test] fn mutex_acquire_uncontested() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures_test::task::panic_context; - let mutex = Mutex::new(()); for _ in 0..10 { assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); @@ -12,11 +18,6 @@ fn mutex_acquire_uncontested() { #[test] fn mutex_wakes_waiters() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures::task::Context; - use futures_test::task::{new_count_waker, panic_context}; - let mutex = Mutex::new(()); let (waker, counter) = new_count_waker(); let lock = mutex.lock().poll_unpin(&mut panic_context()); @@ -33,22 +34,11 @@ fn mutex_wakes_waiters() { assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn mutex_contested() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::ready; - use futures::lock::Mutex; - use futures::stream::StreamExt; - use futures::task::SpawnExt; - use futures_test::future::FutureTestExt; - use std::sync::Arc; - let (tx, mut rx) = mpsc::unbounded(); - let pool = futures::executor::ThreadPool::builder() - .pool_size(16) - .create() - .unwrap(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); let tx = Arc::new(tx); let mutex = Arc::new(Mutex::new(0)); diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs index ca13163..3b082d2 100644 --- a/tests/macro_comma_support.rs +++ b/tests/macro_comma_support.rs @@ -1,25 +1,23 @@ +use futures::{ + executor::block_on, + future::{self, FutureExt}, + join, ready, + task::Poll, + try_join, +}; + #[test] fn ready() { - use futures::{ - executor::block_on, - future, - task::Poll, - ready, - }; - block_on(future::poll_fn(|_| { ready!(Poll::Ready(()),); Poll::Ready(()) })) } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn poll() { - use futures::{ - executor::block_on, - future::FutureExt, - poll, - }; + use futures::poll; block_on(async { let _ = poll!(async {}.boxed(),); @@ -28,11 +26,6 @@ fn poll() { #[test] fn join() { - use futures::{ - executor::block_on, - join - }; - block_on(async { let future1 = async { 1 }; let future2 = async { 2 }; @@ -42,12 +35,6 @@ fn join() { #[test] fn try_join() { - use futures::{ - executor::block_on, - future::FutureExt, - try_join, - }; - block_on(async { let future1 = async { 1 }.never_error(); let future2 = async { 2 }.never_error(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index 2494306..34b78a3 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,11 +1,11 @@ +use futures::channel::oneshot; +use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; +use std::thread; + #[test] fn oneshot_send1() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -17,12 +17,6 @@ fn oneshot_send1() { #[test] fn oneshot_send2() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -33,12 +27,6 @@ fn oneshot_send2() { #[test] fn oneshot_send3() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -49,11 +37,6 @@ fn oneshot_send3() { #[test] fn oneshot_drop_tx1() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -65,12 +48,6 @@ fn oneshot_drop_tx1() { #[test] fn oneshot_drop_tx2() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::<i32>(); let (tx2, rx2) = mpsc::channel(); @@ -83,9 +60,19 @@ fn oneshot_drop_tx2() { #[test] fn oneshot_drop_rx() { - use futures::channel::oneshot; - let (tx, rx) = oneshot::channel::<i32>(); drop(rx); assert_eq!(Err(2), tx.send(2)); } + +#[test] +fn oneshot_debug() { + let (tx, rx) = oneshot::channel::<i32>(); + assert_eq!(format!("{:?}", tx), "Sender { complete: false }"); + assert_eq!(format!("{:?}", rx), "Receiver { complete: false }"); + drop(rx); + assert_eq!(format!("{:?}", tx), "Sender { complete: true }"); + let (tx, rx) = oneshot::channel::<i32>(); + drop(tx); + assert_eq!(format!("{:?}", rx), "Receiver { complete: true }"); +} diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs index 9aa3636..afba8f2 100644 --- a/tests/ready_queue.rs +++ b/tests/ready_queue.rs @@ -1,18 +1,15 @@ -mod assert_send_sync { - use futures::stream::FuturesUnordered; - - pub trait AssertSendSync: Send + Sync {} - impl AssertSendSync for FuturesUnordered<()> {} -} +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Barrier}; +use std::thread; #[test] fn basic_usage() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -41,12 +38,6 @@ fn basic_usage() { #[test] fn resolving_errors() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -75,12 +66,6 @@ fn resolving_errors() { #[test] fn dropping_ready_queue() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::task::noop_context; - block_on(future::lazy(move |_| { let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); @@ -108,12 +93,9 @@ fn dropping_ready_queue() { #[test] fn stress() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - use std::sync::{Arc, Barrier}; - use std::thread; - + #[cfg(miri)] + const ITER: usize = 30; + #[cfg(not(miri))] const ITER: usize = 300; for i in 0..ITER { @@ -157,12 +139,6 @@ fn stress() { #[test] fn panicking_future_dropped() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use std::panic::{self, AssertUnwindSafe}; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() })); diff --git a/tests/recurse.rs b/tests/recurse.rs index a151f1b..f06524f 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -1,10 +1,11 @@ +use futures::executor::block_on; +use futures::future::{self, BoxFuture, FutureExt}; +use std::sync::mpsc; +use std::thread; + +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn lots() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, BoxFuture}; - use std::sync::mpsc; - use std::thread; - #[cfg(not(futures_sanitizer))] const N: i32 = 1_000; #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136 @@ -20,8 +21,6 @@ fn lots() { } let (tx, rx) = mpsc::channel(); - thread::spawn(|| { - block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())) - }); + thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))); assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap()); } diff --git a/tests/sink.rs b/tests/sink.rs index 597ed34..dc826bd 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,264 +1,221 @@ -mod sassert_next { - use futures::stream::{Stream, StreamExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::fmt; - - pub fn sassert_next<S>(s: &mut S, item: S::Item) - where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, - { - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), - } +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{self, Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next<S>(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), } } -mod unwrap { - use futures::task::Poll; - use std::fmt; - - pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), - } +fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), } } -mod flag_cx { - use futures::task::{self, ArcWake, Context}; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - - // An Unpark struct that records unpark events for inspection - pub struct Flag(AtomicBool); +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); - impl Flag { - pub fn new() -> Arc<Self> { - Arc::new(Self(AtomicBool::new(false))) - } - - pub fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } +impl Flag { + fn new() -> Arc<Self> { + Arc::new(Self(AtomicBool::new(false))) + } - pub fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) } - impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc<Self>) { - arc_self.set(true) - } + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) } +} - pub fn flag_cx<F, R>(f: F) -> R - where - F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, - { - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.set(true) } } -mod start_send_fut { - use futures::future::Future; - use futures::ready; - use futures::sink::Sink; - use futures::task::{Context, Poll}; - use std::pin::Pin; +fn flag_cx<F, R>(f: F) -> R +where + F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} - // Sends a value on an i32 channel sink - pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); - impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { - pub fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } +impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) } +} - impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { - type Output = Result<S, S::Error>; +impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { + type Output = Result<S, S::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; } + Poll::Ready(Ok(inner.take().unwrap())) } } -mod manual_flush { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::mem; - use std::pin::Pin; - - // Immediately accepts all requests to start pushing, but completion is managed - // by manually flushing - pub struct ManualFlush<T: Unpin> { - data: Vec<T>, - waiting_tasks: Vec<Waker>, - } +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T: Unpin> { + data: Vec<T>, + waiting_tasks: Vec<Waker>, +} - impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { - type Error = (); +impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { + type Error = (); - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) + fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); } + Ok(()) + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.poll_flush(cx) - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) } +} - impl<T: Unpin> ManualFlush<T> { - pub fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } - } +impl<T: Unpin> ManualFlush<T> { + fn new() -> Self { + Self { data: Vec::new(), waiting_tasks: Vec::new() } + } - pub fn force_flush(&mut self) -> Vec<T> { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.wake() } + mem::replace(&mut self.data, Vec::new()) } } -mod allowance { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::cell::{Cell, RefCell}; - use std::pin::Pin; - use std::rc::Rc; +struct ManualAllow<T: Unpin> { + data: Vec<T>, + allow: Rc<Allow>, +} - pub struct ManualAllow<T: Unpin> { - pub data: Vec<T>, - allow: Rc<Allow>, - } +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Waker>>, +} - pub struct Allow { - flag: Cell<bool>, - tasks: RefCell<Vec<Waker>>, +impl Allow { + fn new() -> Self { + Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) } } - impl Allow { - pub fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - pub fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - pub fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false } } - impl<T: Unpin> Sink<T> for ManualAllow<T> { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); } + } +} - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } +impl<T: Unpin> Sink<T> for ManualAllow<T> { + type Error = (); - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.allow.check(cx) { Poll::Ready(Ok(())) + } else { + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) } - pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() }; + (manual_allow, allow) } #[test] fn either_sink() { - use futures::sink::{Sink, SinkExt}; - use std::collections::VecDeque; - use std::pin::Pin; - - let mut s = if true { - Vec::<i32>::new().left_sink() - } else { - VecDeque::<i32>::new().right_sink() - }; + let mut s = + if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() }; Pin::new(&mut s).start_send(0).unwrap(); } #[test] fn vec_sink() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -269,10 +226,6 @@ fn vec_sink() { #[test] fn vecdeque_sink() { - use futures::sink::Sink; - use std::collections::VecDeque; - use std::pin::Pin; - let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -284,9 +237,6 @@ fn vecdeque_sink() { #[test] fn send() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -301,10 +251,6 @@ fn send() { #[test] fn send_all() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -321,15 +267,6 @@ fn send_all() { // channel is full #[test] fn mpsc_blocking_start_send() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use sassert_next::sassert_next; - use unwrap::unwrap; - let (mut tx, mut rx) = mpsc::channel::<i32>(0); block_on(future::lazy(|_| { @@ -351,19 +288,9 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn with_flush() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures::never::Never; - use futures::sink::{Sink, SinkExt}; - use std::mem; - use std::pin::Pin; - - use flag_cx::flag_cx; - use unwrap::unwrap; - let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -390,11 +317,6 @@ fn with_flush() { // test simple use of with to change data #[test] fn with_as_map() { - use futures::executor::block_on; - use futures::future; - use futures::never::Never; - use futures::sink::SinkExt; - let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -405,10 +327,6 @@ fn with_as_map() { // test simple use of with_flat_map #[test] fn with_flat_map() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -421,16 +339,6 @@ fn with_flat_map() { // Regression test for the issue #1834. #[test] fn with_propagates_poll_ready() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use std::pin::Pin; - - use flag_cx::flag_cx; - use sassert_next::sassert_next; - let (tx, mut rx) = mpsc::channel::<i32>(0); let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10)); @@ -457,14 +365,6 @@ fn with_propagates_poll_ready() { // but doesn't claim to be flushed until the underlying sink is #[test] fn with_flush_propagate() { - use futures::future::{self, FutureExt}; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - - use manual_flush::ManualFlush; - use flag_cx::flag_cx; - use unwrap::unwrap; - let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -486,21 +386,13 @@ fn with_flush_propagate() { // test that `Clone` is implemented on `with` sinks #[test] fn with_implements_clone() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::{SinkExt, StreamExt}; - let (mut tx, rx) = mpsc::channel(5); { - let mut is_positive = tx - .clone() - .with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); + let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); - let mut is_long = tx - .clone() - .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); + let mut is_long = + tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); block_on(is_positive.clone().send(-1)).unwrap(); block_on(is_long.clone().send("123456")).unwrap(); @@ -512,18 +404,12 @@ fn with_implements_clone() { block_on(tx.close()).unwrap(); - assert_eq!( - block_on(rx.collect::<Vec<_>>()), - vec![false, true, false, true, false] - ); + assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]); } // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -539,15 +425,6 @@ fn buffer_noop() { // and writing out when the underlying sink is ready #[test] fn buffer() { - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - use allowance::manual_allow; - let (sink, allow) = manual_allow::<i32>(); let sink = sink.buffer(2); @@ -567,10 +444,6 @@ fn buffer() { #[test] fn fanout_smoke() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -582,16 +455,6 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -624,12 +487,6 @@ fn fanout_backpressure() { #[test] fn sink_map_err() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -639,20 +496,11 @@ fn sink_map_err() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), - Err(()) - ); + assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(())); } #[test] fn sink_unfold() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::sink::{self, Sink, SinkExt}; - use futures::task::Poll; - block_on(poll_fn(|cx| { let (tx, mut rx) = mpsc::channel(1); let unfold = sink::unfold((), |(), i: i32| { @@ -685,14 +533,8 @@ fn sink_unfold() { #[test] fn err_into() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkErrInto, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - #[derive(Copy, Clone, Debug, PartialEq, Eq)] - pub struct ErrIntoTest; + struct ErrIntoTest; impl From<mpsc::SendError> for ErrIntoTest { fn from(_: mpsc::SendError) -> Self { @@ -709,8 +551,5 @@ fn err_into() { } let tx = mpsc::channel(0).0; - assert_eq!( - Pin::new(&mut tx.sink_err_into()).start_send(()), - Err(ErrIntoTest) - ); + assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest)); } diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs index 7d1fa43..e57b2d8 100644 --- a/tests/sink_fanout.rs +++ b/tests/sink_fanout.rs @@ -1,11 +1,11 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::join3; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; + #[test] fn it_works() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::join3; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let (tx1, rx1) = mpsc::channel(1); let (tx2, rx2) = mpsc::channel(2); let tx = tx1.fanout(tx2).sink_map_err(|_| ()); diff --git a/tests/split.rs b/tests/split.rs deleted file mode 100644 index 86c2fc6..0000000 --- a/tests/split.rs +++ /dev/null @@ -1,75 +0,0 @@ -#[test] -fn test_split() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use futures::stream::{self, Stream, StreamExt}; - use futures::task::{Context, Poll}; - use pin_project::pin_project; - use std::pin::Pin; - - #[pin_project] - struct Join<T, U> { - #[pin] - stream: T, - #[pin] - sink: U, - } - - impl<T: Stream, U> Stream for Join<T, U> { - type Item = T::Item; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T::Item>> { - self.project().stream.poll_next(cx) - } - } - - impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { - type Error = U::Error; - - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_ready(cx) - } - - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { - self.project().sink.start_send(item) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_flush(cx) - } - - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - self.project().sink.poll_close(cx) - } - } - - let mut dest: Vec<i32> = Vec::new(); - { - let join = Join { - stream: stream::iter(vec![10, 20, 30]), - sink: &mut dest - }; - - let (sink, stream) = join.split(); - let join = sink.reunite(stream).expect("test_split: reunite error"); - let (mut sink, stream) = join.split(); - let mut stream = stream.map(Ok); - block_on(sink.send_all(&mut stream)).unwrap(); - } - assert_eq!(dest, vec![10, 20, 30]); -} diff --git a/tests/stream.rs b/tests/stream.rs index 14b283d..71ec654 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,8 +1,18 @@ +use std::iter; +use std::sync::Arc; + +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::{self, Future}; +use futures::lock::Mutex; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use futures::{ready, FutureExt}; +use futures_test::task::noop_context; + #[test] fn select() { - use futures::executor::block_on; - use futures::stream::{self, StreamExt}; - fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) { let a = stream::iter(a); let b = stream::iter(b); @@ -17,19 +27,12 @@ fn select() { #[test] fn flat_map() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { - let st = stream::iter(vec![ - stream::iter(0..=4u8), - stream::iter(6..=10), - stream::iter(0..=2), - ]); - - let values: Vec<_> = st - .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))) - .collect() - .await; + block_on(async { + let st = + stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]); + + let values: Vec<_> = + st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await; assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]); }); @@ -37,28 +40,287 @@ fn flat_map() { #[test] fn scan() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { - assert_eq!( - stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) - .scan(1, |state, e| { - *state += 1; - futures::future::ready(if e < *state { Some(e) } else { None }) - }) + block_on(async { + let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) + .scan(1, |state, e| { + *state += 1; + futures::future::ready(if e < *state { Some(e) } else { None }) + }) + .collect::<Vec<_>>() + .await; + + assert_eq!(values, vec![1u8, 2, 3, 4]); + }); +} + +#[test] +fn flatten_unordered() { + use futures::executor::block_on; + use futures::stream::*; + use futures::task::*; + use std::convert::identity; + use std::pin::Pin; + use std::thread; + use std::time::Duration; + + struct DataStream { + data: Vec<u8>, + polled: bool, + wake_immediately: bool, + } + + impl Stream for DataStream { + type Item = u8; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = + Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + self.polled = true; + Poll::Pending + } else { + self.polled = false; + Poll::Ready(self.data.pop()) + } + } + } + + struct Interchanger { + polled: bool, + base: u8, + wake_immediately: bool, + } + + impl Stream for Interchanger { + type Item = DataStream; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + self.polled = true; + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = Duration::from_millis(self.base as u64); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + Poll::Pending + } else { + let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect(); + self.base += 1; + self.polled = false; + Poll::Ready(Some(DataStream { + polled: false, + data, + wake_immediately: self.wake_immediately && self.base % 2 == 0, + })) + } + } + } + + // basic behaviour + { + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(10..=12), + ]); + + let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await; + + assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]); + }); + + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(0..=2), + ]); + + let mut fm_unordered = st + .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0))) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]); + }); + } + + // wake up immediately + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // wake up after delay + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) .collect::<Vec<_>>() - .await, - vec![1u8, 2, 3, 4] + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let (mut fm_unordered, mut fl_unordered) = futures_util::join!( + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>(), + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + ); + + fm_unordered.sort_unstable(); + fl_unordered.sort_unstable(); + + assert_eq!(fm_unordered, fl_unordered); + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // waker panics + { + let stream = Arc::new(Mutex::new( + Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)), + )); + + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + + let panic_waker = waker(Arc::new(PanicWaker)); + let mut panic_cx = Context::from_waker(&panic_waker); + let _ = ready!(lock.poll_next_unpin(&mut panic_cx)); + + Poll::Ready(Some(())) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } + + // stream panics + { + let st = stream::iter(iter::once( + once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(), + )) + .chain( + Interchanger { polled: false, base: 0, wake_immediately: true } + .map(|stream| stream.right_stream()) + .take(10), ); - }); + + let stream = Arc::new(Mutex::new(st.flatten_unordered(10))); + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + let data = ready!(lock.poll_next_unpin(cx)); + + Poll::Ready(data) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } } #[test] fn take_until() { - use futures::future::{self, Future}; - use futures::stream::{self, StreamExt}; - use futures::task::Poll; - fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { let mut i = 0; future::poll_fn(move |_cx| { @@ -71,7 +333,7 @@ fn take_until() { }) } - futures::executor::block_on(async { + block_on(async { // Verify stopping works: let stream = stream::iter(1u32..=10); let stop_fut = make_stop_fut(5); @@ -123,10 +385,15 @@ fn take_until() { #[test] #[should_panic] -fn ready_chunks_panic_on_cap_zero() { - use futures::channel::mpsc; - use futures::stream::StreamExt; +fn chunks_panic_on_cap_zero() { + let (_, rx1) = mpsc::channel::<()>(1); + let _ = rx1.chunks(0); +} + +#[test] +#[should_panic] +fn ready_chunks_panic_on_cap_zero() { let (_, rx1) = mpsc::channel::<()>(1); let _ = rx1.ready_chunks(0); @@ -134,12 +401,6 @@ fn ready_chunks_panic_on_cap_zero() { #[test] fn ready_chunks() { - use futures::channel::mpsc; - use futures::stream::StreamExt; - use futures::sink::SinkExt; - use futures::FutureExt; - use futures_test::task::noop_context; - let (mut tx, rx1) = mpsc::channel::<i32>(16); let mut s = rx1.ready_chunks(2); @@ -147,14 +408,14 @@ fn ready_chunks() { let mut cx = noop_context(); assert!(s.next().poll_unpin(&mut cx).is_pending()); - futures::executor::block_on(async { + block_on(async { tx.send(1).await.unwrap(); assert_eq!(s.next().await.unwrap(), vec![1]); tx.send(2).await.unwrap(); tx.send(3).await.unwrap(); tx.send(4).await.unwrap(); - assert_eq!(s.next().await.unwrap(), vec![2,3]); + assert_eq!(s.next().await.unwrap(), vec![2, 3]); assert_eq!(s.next().await.unwrap(), vec![4]); }); } diff --git a/tests/stream_abortable.rs b/tests/stream_abortable.rs new file mode 100644 index 0000000..2339dd0 --- /dev/null +++ b/tests/stream_abortable.rs @@ -0,0 +1,46 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::stream::{abortable, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures::SinkExt; +use futures_test::task::new_count_waker; +use std::pin::Pin; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert!(abortable_rx.is_aborted()); + assert_eq!(None, block_on(abortable_rx.next())); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); + assert_eq!(counter, 0); + + abort_handle.abort(); + assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); + assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (mut tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, _abort_handle) = abortable(a_rx); + + block_on(tx.send(())).unwrap(); + + assert!(!abortable_rx.is_aborted()); + assert_eq!(Some(()), block_on(abortable_rx.next())); +} diff --git a/tests/buffer_unordered.rs b/tests/stream_buffer_unordered.rs index 5c8b8bf..9a2ee17 100644 --- a/tests/buffer_unordered.rs +++ b/tests/stream_buffer_unordered.rs @@ -1,13 +1,13 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::mpsc as std_mpsc; +use std::thread; + #[test] #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 fn works() { - use futures::channel::{oneshot, mpsc}; - use futures::executor::{block_on, block_on_stream}; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - use std::sync::mpsc as std_mpsc; - use std::thread; - const N: usize = 4; let (mut tx, rx) = mpsc::channel(1); diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs index 272558c..8b23a0a 100644 --- a/tests/stream_catch_unwind.rs +++ b/tests/stream_catch_unwind.rs @@ -1,8 +1,8 @@ +use futures::executor::block_on_stream; +use futures::stream::{self, StreamExt}; + #[test] fn panic_in_the_middle_of_the_stream() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element @@ -16,9 +16,6 @@ fn panic_in_the_middle_of_the_stream() { #[test] fn no_panic() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); diff --git a/tests/futures_ordered.rs b/tests/stream_futures_ordered.rs index 7f21c82..84e0bcc 100644 --- a/tests/futures_ordered.rs +++ b/tests/stream_futures_ordered.rs @@ -1,10 +1,12 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; +use futures::stream::{FuturesOrdered, StreamExt}; +use futures_test::task::noop_context; +use std::any::Any; + #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); @@ -24,21 +26,16 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![ - a_rx.boxed(), - join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), - ].into_iter().collect::<FuturesOrdered<_>>(); + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesOrdered<_>>(); let mut cx = noop_context(); a_tx.send(33).unwrap(); @@ -51,37 +48,30 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{StreamExt, FuturesOrdered}; - - let stream = vec![ - future::ready::<i32>(1), - future::ready::<i32>(2), - future::ready::<i32>(3) - ].into_iter().collect::<FuturesOrdered<_>>(); + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesOrdered<_>>(); assert_eq!(stream.len(), 3); - assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]); + assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn queue_never_unblocked() { - use futures::channel::oneshot; - use futures::future::{self, Future, TryFutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - use std::any::Any; - let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>(); let mut stream = vec![ Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>, - Box::new(future::try_select(b_rx, c_rx) - .map_err(|e| e.factor_first().0) - .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _, - ].into_iter().collect::<FuturesOrdered<_>>(); + Box::new( + future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)), + ) as _, + ] + .into_iter() + .collect::<FuturesOrdered<_>>(); let cx = &mut noop_context(); for _ in 0..10 { diff --git a/tests/futures_unordered.rs b/tests/stream_futures_unordered.rs index ac0817e..f62f733 100644 --- a/tests/futures_unordered.rs +++ b/tests/stream_futures_unordered.rs @@ -1,17 +1,17 @@ -use futures::future::Future; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt}; +use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; use futures_test::task::noop_context; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; use std::iter::FromIterator; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; #[test] fn is_terminated() { - use futures::future; - use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = FuturesUnordered::new(); @@ -39,19 +39,12 @@ fn is_terminated() { #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut iter = block_on_stream( - vec![a_rx, b_rx, c_rx] - .into_iter() - .collect::<FuturesUnordered<_>>(), - ); + let mut iter = + block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>()); b_tx.send(99).unwrap(); assert_eq!(Some(Ok(99)), iter.next()); @@ -63,24 +56,16 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![ - a_rx.boxed(), - join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesUnordered<_>>(); a_tx.send(9).unwrap(); b_tx.send(10).unwrap(); @@ -94,29 +79,16 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - - let stream = vec![ - future::ready::<i32>(1), - future::ready::<i32>(2), - future::ready::<i32>(3), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesUnordered<_>>(); assert_eq!(stream.len(), 3); assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn finished_future() { - use std::marker::Unpin; - use futures::channel::oneshot; - use futures::future::{self, Future, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::task::noop_context; - let (_a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); @@ -142,17 +114,11 @@ fn finished_future() { #[test] fn iter_mut_cancel() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::<i32>(); let (b_tx, b_rx) = oneshot::channel::<i32>(); let (c_tx, c_rx) = oneshot::channel::<i32>(); - let mut stream = vec![a_rx, b_rx, c_rx] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); for rx in stream.iter_mut() { rx.close(); @@ -172,16 +138,10 @@ fn iter_mut_cancel() { #[test] fn iter_mut_len() { - use futures::future; - use futures::stream::FuturesUnordered; - - let mut stream = vec![ - future::pending::<()>(), - future::pending::<()>(), - future::pending::<()>(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let mut stream = + vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); let mut iter_mut = stream.iter_mut(); assert_eq!(iter_mut.len(), 3); @@ -196,15 +156,6 @@ fn iter_mut_len() { #[test] fn iter_cancel() { - use std::marker::Unpin; - use std::pin::Pin; - use std::sync::atomic::{AtomicBool, Ordering}; - - use futures::executor::block_on_stream; - use futures::future::{self, Future, FutureExt}; - use futures::stream::FuturesUnordered; - use futures::task::{Context, Poll}; - struct AtomicCancel<F> { future: F, cancel: AtomicBool, @@ -250,16 +201,9 @@ fn iter_cancel() { #[test] fn iter_len() { - use futures::future; - use futures::stream::FuturesUnordered; - - let stream = vec![ - future::pending::<()>(), - future::pending::<()>(), - future::pending::<()>(), - ] - .into_iter() - .collect::<FuturesUnordered<_>>(); + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); let mut iter = stream.iter(); assert_eq!(iter.len(), 3); @@ -273,12 +217,52 @@ fn iter_len() { } #[test] -fn futures_not_moved_after_poll() { - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::future::FutureTestExt; - use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::<FuturesUnordered<_>>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + +#[test] +fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, // asserting that it does not move. let fut = future::ready(()).pending_once().assert_unmoved(); @@ -292,11 +276,6 @@ fn futures_not_moved_after_poll() { #[test] fn len_valid_during_out_of_order_completion() { - use futures::channel::oneshot; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - // Complete futures out-of-order and add new futures afterwards to ensure // length values remain correct. let (a_tx, a_rx) = oneshot::channel::<i32>(); @@ -368,3 +347,25 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::<F>::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); } + +#[test] +fn clear() { + let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(future::ready(3)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs index 222c985..60188d3 100644 --- a/tests/stream_into_async_read.rs +++ b/tests/stream_into_async_read.rs @@ -1,31 +1,51 @@ -#[test] -fn test_into_async_read() { - use core::pin::Pin; - use futures::io::AsyncRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_read { - ($reader:expr, $buf:expr, $item:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $item); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } +use core::pin::Pin; +use futures::io::{AsyncBufRead, AsyncRead}; +use futures::stream::{self, TryStreamExt}; +use futures::task::Poll; +use futures_test::{stream::StreamTestExt, task::noop_context}; + +macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; } } - }; - } + } + }; +} +#[test] +fn test_into_async_read() { let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); let mut buf = vec![0; 3]; @@ -53,32 +73,6 @@ fn test_into_async_read() { #[test] fn test_into_async_bufread() { - use core::pin::Pin; - use futures::io::AsyncBufRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_fill_buf { - ($reader:expr, $buf:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $buf); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } - } - } - }; - } - let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs index 66a7385..153fcc2 100644 --- a/tests/stream_peekable.rs +++ b/tests/stream_peekable.rs @@ -1,13 +1,58 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + #[test] fn peekable() { - use futures::executor::block_on; - use futures::pin_mut; - use futures::stream::{self, Peekable, StreamExt}; - block_on(async { let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); pin_mut!(peekable); assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]); + + let s = stream::once(async { 1 }).peekable(); + pin_mut!(s); + assert_eq!(s.as_mut().peek().await, Some(&1u8)); + assert_eq!(s.collect::<Vec<u8>>().await, vec![1]); + }); +} + +#[test] +fn peekable_mut() { + block_on(async { + let s = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(s); + if let Some(p) = s.as_mut().peek_mut().await { + if *p == 1 { + *p = 5; + } + } + assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]); + }); +} + +#[test] +fn peekable_next_if_eq() { + block_on(async { + // first, try on references + let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable(); + pin_mut!(s); + // try before `peek()` + assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None); + assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart")); + // try after peek() + assert_eq!(s.as_mut().peek().await, Some(&"of")); + assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of")); + assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None); + // make sure `next()` still behaves + assert_eq!(s.next().await, Some("Gold")); + + // make sure comparison works for owned values + let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable(); + pin_mut!(s); + // make sure basic functionality works + assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into())); + assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into())); + assert_eq!(s.as_mut().next_if_eq("").await, None); }); } diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs index 6178412..4ae0735 100644 --- a/tests/stream_select_all.rs +++ b/tests/stream_select_all.rs @@ -1,10 +1,12 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, FutureExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; + #[test] fn is_terminated() { - use futures::future::{self, FutureExt}; - use futures::stream::{FusedStream, SelectAll, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = SelectAll::new(); @@ -30,9 +32,6 @@ fn is_terminated() { #[test] fn issue_1626() { - use futures::executor::block_on_stream; - use futures::stream; - let a = stream::iter(0..=2); let b = stream::iter(10..=14); @@ -51,10 +50,6 @@ fn issue_1626() { #[test] fn works_1() { - use futures::channel::mpsc; - use futures::executor::block_on_stream; - use futures::stream::select_all; - let (a_tx, a_rx) = mpsc::unbounded::<u32>(); let (b_tx, b_rx) = mpsc::unbounded::<u32>(); let (c_tx, c_rx) = mpsc::unbounded::<u32>(); @@ -81,3 +76,122 @@ fn works_1() { drop((a_tx, b_tx, c_tx)); assert_eq!(None, stream.next()); } + +#[test] +fn clear() { + let mut tasks = + select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(stream::iter(vec![3].into_iter())); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} + +#[test] +fn iter_mut() { + let mut stream = + vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.into_iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs index bec5262..8252ad7 100644 --- a/tests/stream_select_next_some.rs +++ b/tests/stream_select_next_some.rs @@ -1,11 +1,13 @@ +use futures::executor::block_on; +use futures::future::{self, FusedFuture, FutureExt}; +use futures::select; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::new_count_waker; + #[test] fn is_terminated() { - use futures::future; - use futures::future::{FusedFuture, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); @@ -30,15 +32,11 @@ fn is_terminated() { #[test] fn select() { - use futures::{future, select}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; @@ -61,17 +59,13 @@ fn select() { // Check that `select!` macro does not fail when importing from `futures_util`. #[test] fn futures_util_select() { - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - use futures_util::select; // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; diff --git a/tests/stream_split.rs b/tests/stream_split.rs new file mode 100644 index 0000000..694c151 --- /dev/null +++ b/tests/stream_split.rs @@ -0,0 +1,57 @@ +use futures::executor::block_on; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_project::pin_project; +use std::pin::Pin; + +#[test] +fn test_split() { + #[pin_project] + struct Join<T, U> { + #[pin] + stream: T, + #[pin] + sink: U, + } + + impl<T: Stream, U> Stream for Join<T, U> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + self.project().stream.poll_next(cx) + } + } + + impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { + type Error = U::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.project().sink.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_close(cx) + } + } + + let mut dest: Vec<i32> = Vec::new(); + { + let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest }; + + let (sink, stream) = join.split(); + let join = sink.reunite(stream).expect("test_split: reunite error"); + let (mut sink, stream) = join.split(); + let mut stream = stream.map(Ok); + block_on(sink.send_all(&mut stream)).unwrap(); + } + assert_eq!(dest, vec![10, 20, 30]); +} diff --git a/tests/try_stream.rs b/tests/stream_try_stream.rs index 194e74d..d83fc54 100644 --- a/tests/try_stream.rs +++ b/tests/stream_try_stream.rs @@ -1,3 +1,5 @@ +#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 + use futures::{ stream::{self, StreamExt, TryStreamExt}, task::Poll, diff --git a/tests/unfold.rs b/tests/stream_unfold.rs index 95722cf..16b1081 100644 --- a/tests/unfold.rs +++ b/tests/stream_unfold.rs @@ -1,10 +1,7 @@ use futures::future; use futures::stream; - use futures_test::future::FutureTestExt; -use futures_test::{ - assert_stream_done, assert_stream_next, assert_stream_pending, -}; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; #[test] fn unfold1() { diff --git a/tests/task_arc_wake.rs b/tests/task_arc_wake.rs new file mode 100644 index 0000000..aedc15b --- /dev/null +++ b/tests/task_arc_wake.rs @@ -0,0 +1,79 @@ +use futures::task::{self, ArcWake, Waker}; +use std::panic; +use std::sync::{Arc, Mutex}; + +struct CountingWaker { + nr_wake: Mutex<i32>, +} + +impl CountingWaker { + fn new() -> Self { + Self { nr_wake: Mutex::new(0) } + } + + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() + } +} + +impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc<Self>) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; + } +} + +#[test] +fn create_from_arc() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); + + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); + + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); + + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); +} + +#[test] +fn ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); + + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); +} + +#[test] +fn proper_refcount_on_wake_panic() { + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + let some_w = Arc::new(PanicWaker); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!( + "WAKE UP", + *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap() + ); + assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); // some_w +} diff --git a/tests/atomic_waker.rs b/tests/task_atomic_waker.rs index bf15d0f..2d1612a 100644 --- a/tests/atomic_waker.rs +++ b/tests/task_atomic_waker.rs @@ -1,14 +1,14 @@ +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::task::{AtomicWaker, Poll}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; + +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn basic() { - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - use std::sync::Arc; - use std::thread; - - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::task::{AtomicWaker, Poll}; - let atomic_waker = Arc::new(AtomicWaker::new()); let atomic_waker_copy = atomic_waker.clone(); diff --git a/tests/test_macro.rs b/tests/test_macro.rs new file mode 100644 index 0000000..6adf51d --- /dev/null +++ b/tests/test_macro.rs @@ -0,0 +1,20 @@ +#[futures_test::test] +async fn it_works() { + let fut = async { true }; + assert!(fut.await); + + let fut = async { false }; + assert!(!fut.await); +} + +#[should_panic] +#[futures_test::test] +async fn it_is_being_run() { + let fut = async { false }; + assert!(fut.await); +} + +#[futures_test::test] +async fn return_ty() -> Result<(), ()> { + Ok(()) +} diff --git a/tests/try_join.rs b/tests/try_join.rs index 6c6d084..0281ab8 100644 --- a/tests/try_join.rs +++ b/tests/try_join.rs @@ -1,9 +1,9 @@ #![deny(unreachable_code)] -use futures::{try_join, executor::block_on}; +use futures::{executor::block_on, try_join}; // TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to -// test behaviour of the `try_join!` macro with the never type before it is +// test behavior of the `try_join!` macro with the never type before it is // stabilized. Once `!` is again stabilized this can be removed and replaced // with direct use of `!` below where `Never` is used. trait MyTrait { @@ -14,7 +14,6 @@ impl<T> MyTrait for fn() -> T { } type Never = <fn() -> ! as MyTrait>::Output; - #[test] fn try_join_never_error() { block_on(async { diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs deleted file mode 100644 index 8e579a2..0000000 --- a/tests/try_join_all.rs +++ /dev/null @@ -1,58 +0,0 @@ -mod util { - use std::future::Future; - use futures::executor::block_on; - use std::fmt::Debug; - - pub fn assert_done<T, F>(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, - { - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } -} - -#[test] -fn collect_collects() { - use futures_util::future::{err, ok, try_join_all}; - - use util::assert_done; - - assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); - assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); - assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); - // REVIEW: should this be implemented? - // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![])); - - // TODO: needs more tests -} - -#[test] -fn try_join_all_iter_lifetime() { - use futures_util::future::{ok, try_join_all}; - use std::future::Future; - - use util::assert_done; - - // In futures-rs version 0.1, this function would fail to typecheck due to an overly - // conservative type parameterization of `TryJoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> { - let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len())); - Box::new(try_join_all(iter)) - } - - assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); -} - -#[test] -fn try_join_all_from_iter() { - use futures_util::future::{ok, TryJoinAll}; - - use util::assert_done; - - assert_done( - || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()), - Ok::<_, usize>(vec![1, 2]), - ) -} diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs index 6c7e11c..a7a5710 100644 --- a/tests_disabled/all.rs +++ b/tests_disabled/all.rs @@ -1,27 +1,27 @@ -use futures::future; -use futures::executor::block_on; use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future; use std::sync::mpsc::{channel, TryRecvError}; -mod support; -use support::*; +// mod support; +// use support::*; fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> { match r { - Ok(Either::Left((t, _))) | - Ok(Either::Right((t, _))) => Ok(t), - Err(Either::Left((e, _))) | - Err(Either::Right((e, _))) => Err(e), + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e), } } #[test] fn result_smoke() { fn is_future_v<A, B, C>(_: C) - where A: Send + 'static, - B: Send + 'static, - C: Future<Item=A, Error=B> - {} + where + A: Send + 'static, + B: Send + 'static, + C: Future<Item = A, Error = B>, + { + } is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1)); is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1)); @@ -64,7 +64,9 @@ fn result_smoke() { #[test] fn test_empty() { - fn empty() -> Empty<i32, u32> { future::empty() } + fn empty() -> Empty<i32, u32> { + future::empty() + } assert_empty(|| empty()); assert_empty(|| empty().select(empty())); @@ -105,16 +107,22 @@ fn flatten() { #[test] fn smoke_oneshot() { - assert_done(|| { - let (c, p) = oneshot::channel(); - c.send(1).unwrap(); - p - }, Ok(1)); - assert_done(|| { - let (c, p) = oneshot::channel::<i32>(); - drop(c); - p - }, Err(Canceled)); + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + p + }, + Err(Canceled), + ); let mut completes = Vec::new(); assert_empty(|| { let (a, b) = oneshot::channel::<i32>(); @@ -129,9 +137,7 @@ fn smoke_oneshot() { let (c, p) = oneshot::channel::<i32>(); drop(c); let (tx, rx) = channel(); - p.then(move |_| { - tx.send(()) - }).forget(); + p.then(move |_| tx.send(())).forget(); rx.recv().unwrap(); } @@ -139,8 +145,14 @@ fn smoke_oneshot() { fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); // assert!(f.poll(&mut Task::new()).is_pending()); @@ -156,8 +168,14 @@ fn select_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.select(d).then(unselect); assert!(f.poll(lw).ok().unwrap().is_pending()); @@ -173,8 +191,14 @@ fn select_cancels() { fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let mut f = b.join(d); drop(a); @@ -185,8 +209,14 @@ fn join_cancels() { let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); let (tx, rx) = channel(); let f = b.join(d); @@ -194,7 +224,8 @@ fn join_cancels() { tx.send(()).unwrap(); let res: Result<(), ()> = Ok(()); res - }).forget(); + }) + .forget(); assert!(rx.try_recv().is_err()); drop(a); rx.recv().unwrap(); @@ -243,7 +274,6 @@ fn join_incomplete() { }) } - #[test] fn select2() { assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); @@ -251,14 +281,15 @@ fn select2() { assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); - assert_done(|| { - f_ok(1).select(f_ok(2)) - .map_err(|_| 0) - .and_then(|either_tup| { - let (a, b) = either_tup.into_inner(); - b.map(move |b| a + b) - }) - }, Ok(3)); + assert_done( + || { + f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, + Ok(3), + ); // Finish one half of a select and then fail the second, ensuring that we // get the notification of the second one. @@ -297,8 +328,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let f = b.select(d); drop(f); assert!(drx.recv().is_err()); @@ -309,8 +346,14 @@ fn select2() { { let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let mut f = b.select(d); let _res = noop_waker_lw(|lw| f.poll(lw)); drop(f); @@ -322,8 +365,14 @@ fn select2() { { let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); let (tx, rx) = channel(); b.select(d).map(move |_| tx.send(()).unwrap()).forget(); drop(a); diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs index c1bc33f..0166ca4 100644 --- a/tests_disabled/bilock.rs +++ b/tests_disabled/bilock.rs @@ -1,11 +1,11 @@ -use futures::task; -use futures::stream; use futures::future; +use futures::stream; +use futures::task; use futures_util::lock::BiLock; use std::thread; -mod support; -use support::*; +// mod support; +// use support::*; #[test] fn smoke() { @@ -41,9 +41,9 @@ fn smoke() { }); assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); } #[test] @@ -51,10 +51,7 @@ fn concurrent() { const N: usize = 10000; let (a, b) = BiLock::new(0); - let a = Increment { - a: Some(a), - remaining: N, - }; + let a = Increment { a: Some(a), remaining: N }; let b = stream::iter_ok(0..N).fold(b, |b, _n| { b.lock().map(|mut b| { *b += 1; @@ -89,7 +86,7 @@ fn concurrent() { fn poll(&mut self) -> Poll<BiLock<usize>, ()> { loop { if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()) + return Ok(self.a.take().unwrap().into()); } let a = self.a.as_ref().unwrap(); diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs index ef0676d..854dbad 100644 --- a/tests_disabled/stream.rs +++ b/tests_disabled/stream.rs @@ -1,26 +1,26 @@ +use futures::channel::mpsc; +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{err, ok}; use futures::stream::{empty, iter_ok, poll_fn, Peekable}; -use futures::channel::oneshot; -use futures::channel::mpsc; -mod support; -use support::*; +// mod support; +// use support::*; pub struct Iter<I> { iter: I, } pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter> - where J: IntoIterator<Item=Result<T, E>>, +where + J: IntoIterator<Item = Result<T, E>>, { - Iter { - iter: i.into_iter(), - } + Iter { iter: i.into_iter() } } impl<I, T, E> Stream for Iter<I> - where I: Iterator<Item=Result<T, E>>, +where + I: Iterator<Item = Result<T, E>>, { type Item = T; type Error = E; @@ -34,21 +34,15 @@ impl<I, T, E> Stream for Iter<I> } } -fn list() -> Box<Stream<Item=i32, Error=u32> + Send> { +fn list() -> Box<Stream<Item = i32, Error = u32> + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Ok(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } -fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> { +fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> { let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Err(3))) - .forget(); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget(); Box::new(rx.then(|r| r.unwrap())) } @@ -89,40 +83,31 @@ fn filter() { #[test] fn filter_map() { - assert_done(|| list().filter_map(|x| { - ok(if x % 2 == 0 { - Some(x + 10) - } else { - None - }) - }).collect(), Ok(vec![12])); + assert_done( + || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(), + Ok(vec![12]), + ); } #[test] fn and_then() { assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); - assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), - Err(1)); + assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1)); } #[test] fn then() { assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); - } #[test] fn or_else() { - assert_done(|| err_list().or_else(|a| { - ok::<i32, u32>(a as i32) - }).collect(), Ok(vec![1, 2, 3])); + assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3])); } #[test] fn flatten() { - assert_done(|| list().map(|_| list()).flatten().collect(), - Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); - + assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); } #[test] @@ -132,9 +117,7 @@ fn skip() { #[test] fn skip_passes_errors_through() { - let mut s = block_on_stream( - iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) - ); + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(4))); @@ -144,8 +127,7 @@ fn skip_passes_errors_through() { #[test] fn skip_while() { - assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), - Ok(vec![2, 3])); + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3])); } #[test] fn take() { @@ -154,8 +136,7 @@ fn take() { #[test] fn take_while() { - assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), - Ok(vec![1, 2])); + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2])); } #[test] @@ -193,9 +174,9 @@ fn buffered() { let (a, b) = oneshot::channel::<u32>(); let (c, d) = oneshot::channel::<u32>(); - tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(2); sassert_empty(&mut rx); @@ -211,9 +192,9 @@ fn buffered() { let (a, b) = oneshot::channel::<u32>(); let (c, d) = oneshot::channel::<u32>(); - tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); let mut rx = rx.buffered(1); sassert_empty(&mut rx); @@ -233,8 +214,8 @@ fn unordered() { let (c, d) = oneshot::channel::<u32>(); tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); let mut rx = rx.buffer_unordered(2); sassert_empty(&mut rx); @@ -250,8 +231,8 @@ fn unordered() { let (c, d) = oneshot::channel::<u32>(); tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); // We don't even get to see `c` until `a` completes. let mut rx = rx.buffer_unordered(1); @@ -267,21 +248,17 @@ fn unordered() { #[test] fn zip() { - assert_done(|| list().zip(list()).collect(), - Ok(vec![(1, 1), (2, 2), (3, 3)])); - assert_done(|| list().zip(list().take(2)).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| list().take(2).zip(list()).collect(), - Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)])); assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3)); - assert_done(|| list().zip(list().map(|x| x + 1)).collect(), - Ok(vec![(1, 2), (2, 3), (3, 4)])); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)])); } #[test] fn peek() { struct Peek { - inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>> + inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>, } impl Future for Peek { @@ -299,15 +276,12 @@ fn peek() { } } - block_on(Peek { - inner: list().peekable(), - }).unwrap() + block_on(Peek { inner: list().peekable() }).unwrap() } #[test] fn wait() { - assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), - Ok(vec![1, 2, 3])); + assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3])); } #[test] @@ -337,8 +311,10 @@ fn forward() { let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; assert_eq!(v, vec![0, 1, 2, 3]); - assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), - Ok(vec![0, 1, 2, 3, 4, 5])); + assert_done( + move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5]), + ); } #[test] |