aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:02:15 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:02:15 +0000
commit1de9c728aa47509b5675e6a8e79f076bb86236ec (patch)
treee2532a639f92776d56f172cf560f5ba2f62642bc
parent71ac4c321fa28139119502996c08650fb2167eb7 (diff)
parent6661c46def7a4e79589003500e402827370bd285 (diff)
downloadfutures-android13-mainline-extservices-release.tar.gz
Snap for 8564071 from 6661c46def7a4e79589003500e402827370bd285 to mainline-extservices-releaseaml_ext_331814220aml_ext_331412000aml_ext_331312000aml_ext_331112010aml_ext_331012020android13-mainline-extservices-release
Change-Id: I85765edb256f7f15cc233a5107f3099b584d06cf
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp23
-rw-r--r--Cargo.toml117
-rw-r--r--Cargo.toml.orig25
-rw-r--r--METADATA8
-rw-r--r--TEST_MAPPING36
-rw-r--r--cargo2android.json5
-rw-r--r--src/lib.rs132
-rw-r--r--tests/_require_features.rs13
-rw-r--r--tests/arc_wake.rs82
-rw-r--r--tests/async_await_macros.rs165
-rw-r--r--tests/auto_traits.rs83
-rw-r--r--tests/compat.rs13
-rw-r--r--tests/eager_drop.rs176
-rw-r--r--tests/eventual.rs2
-rw-r--r--tests/future_abortable.rs (renamed from tests/abortable.rs)23
-rw-r--r--tests/future_basic_combinators.rs (renamed from tests/basic_combinators.rs)14
-rw-r--r--tests/future_fuse.rs (renamed from tests/fuse.rs)0
-rw-r--r--tests/future_inspect.rs16
-rw-r--r--tests/future_join_all.rs41
-rw-r--r--tests/future_obj.rs4
-rw-r--r--tests/future_select_all.rs (renamed from tests/select_all.rs)14
-rw-r--r--tests/future_select_ok.rs (renamed from tests/select_ok.rs)22
-rw-r--r--tests/future_shared.rs (renamed from tests/shared.rs)80
-rw-r--r--tests/future_try_flatten_stream.rs29
-rw-r--r--tests/future_try_join_all.rs46
-rw-r--r--tests/inspect.rs14
-rw-r--r--tests/io_buf_reader.rs442
-rw-r--r--tests/io_buf_writer.rs121
-rw-r--r--tests/io_cursor.rs23
-rw-r--r--tests/io_line_writer.rs73
-rw-r--r--tests/io_lines.rs66
-rw-r--r--tests/io_read.rs55
-rw-r--r--tests/io_read_exact.rs8
-rw-r--r--tests/io_read_line.rs45
-rw-r--r--tests/io_read_to_end.rs7
-rw-r--r--tests/io_read_to_string.rs20
-rw-r--r--tests/io_read_until.rs39
-rw-r--r--tests/io_write.rs73
-rw-r--r--tests/join_all.rs51
-rw-r--r--tests/lock_mutex.rs (renamed from tests/mutex.rs)34
-rw-r--r--tests/macro_comma_support.rs33
-rw-r--r--tests/oneshot.rs49
-rw-r--r--tests/ready_queue.rs48
-rw-r--r--tests/recurse.rs15
-rw-r--r--tests/sink.rs485
-rw-r--r--tests/sink_fanout.rs12
-rw-r--r--tests/split.rs75
-rw-r--r--tests/stream.rs349
-rw-r--r--tests/stream_abortable.rs46
-rw-r--r--tests/stream_buffer_unordered.rs (renamed from tests/buffer_unordered.rs)14
-rw-r--r--tests/stream_catch_unwind.rs9
-rw-r--r--tests/stream_futures_ordered.rs (renamed from tests/futures_ordered.rs)58
-rw-r--r--tests/stream_futures_unordered.rs (renamed from tests/futures_unordered.rs)183
-rw-r--r--tests/stream_into_async_read.rs96
-rw-r--r--tests/stream_peekable.rs53
-rw-r--r--tests/stream_select_all.rs138
-rw-r--r--tests/stream_select_next_some.rs26
-rw-r--r--tests/stream_split.rs57
-rw-r--r--tests/stream_try_stream.rs (renamed from tests/try_stream.rs)2
-rw-r--r--tests/stream_unfold.rs (renamed from tests/unfold.rs)5
-rw-r--r--tests/task_arc_wake.rs79
-rw-r--r--tests/task_atomic_waker.rs (renamed from tests/atomic_waker.rs)18
-rw-r--r--tests/test_macro.rs20
-rw-r--r--tests/try_join.rs5
-rw-r--r--tests/try_join_all.rs58
-rw-r--r--tests_disabled/all.rs149
-rw-r--r--tests_disabled/bilock.rs21
-rw-r--r--tests_disabled/stream.rs112
69 files changed, 2391 insertions, 2071 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f3ad3ab..e483977 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
- }
-}
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index f5fe962..a594b53 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@ rust_library {
name: "libfutures",
host_supported: true,
crate_name: "futures",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -62,28 +64,9 @@ rust_library {
],
apex_available: [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt",
],
min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// futures-channel-0.3.14 "alloc,futures-sink,sink,std"
-// futures-core-0.3.14 "alloc,std"
-// futures-executor-0.3.14 "std"
-// futures-io-0.3.14 "std"
-// futures-macro-0.3.14
-// futures-sink-0.3.14 "alloc,std"
-// futures-task-0.3.14 "alloc,std"
-// futures-util-0.3.14 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
-// memchr-2.3.4 "default,std"
-// pin-project-lite-0.2.6
-// pin-utils-0.1.0
-// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.7
-// proc-macro2-1.0.26 "default,proc-macro"
-// quote-1.0.9 "default,proc-macro"
-// slab-0.4.3 "default,std"
-// syn-1.0.70 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
-// unicode-xid-0.2.1 "default"
diff --git a/Cargo.toml b/Cargo.toml
index 9905985..f740f96 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,62 +3,80 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
+rust-version = "1.45"
name = "futures"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+version = "0.3.21"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
readme = "../README.md"
-keywords = ["futures", "async", "future"]
+keywords = [
+ "futures",
+ "async",
+ "future",
+]
categories = ["asynchronous"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
[package.metadata.playground]
-features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+features = [
+ "std",
+ "async-await",
+ "compat",
+ "io-compat",
+ "executor",
+ "thread-pool",
+]
+
[dependencies.futures-channel]
-version = "0.3.13"
+version = "0.3.21"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-executor]
-version = "0.3.13"
+version = "0.3.21"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-sink]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-task]
-version = "0.3.13"
+version = "0.3.21"
default-features = false
[dependencies.futures-util]
-version = "0.3.13"
+version = "0.3.21"
features = ["sink"]
default-features = false
+
[dev-dependencies.assert_matches]
version = "1.3.0"
@@ -75,16 +93,55 @@ version = "1"
version = "0.1.11"
[features]
-alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
-async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+alloc = [
+ "futures-core/alloc",
+ "futures-task/alloc",
+ "futures-sink/alloc",
+ "futures-channel/alloc",
+ "futures-util/alloc",
+]
+async-await = [
+ "futures-util/async-await",
+ "futures-util/async-await-macro",
+]
bilock = ["futures-util/bilock"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
-compat = ["std", "futures-util/compat"]
-default = ["std", "async-await", "executor"]
-executor = ["std", "futures-executor/std"]
-io-compat = ["compat", "futures-util/io-compat"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
-std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
-thread-pool = ["executor", "futures-executor/thread-pool"]
-unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+cfg-target-has-atomic = []
+compat = [
+ "std",
+ "futures-util/compat",
+]
+default = [
+ "std",
+ "async-await",
+ "executor",
+]
+executor = [
+ "std",
+ "futures-executor/std",
+]
+io-compat = [
+ "compat",
+ "futures-util/io-compat",
+]
+std = [
+ "alloc",
+ "futures-core/std",
+ "futures-task/std",
+ "futures-io/std",
+ "futures-sink/std",
+ "futures-util/std",
+ "futures-util/io",
+ "futures-util/channel",
+]
+thread-pool = [
+ "executor",
+ "futures-executor/thread-pool",
+]
+unstable = [
+ "futures-core/unstable",
+ "futures-task/unstable",
+ "futures-channel/unstable",
+ "futures-io/unstable",
+ "futures-util/unstable",
+]
write-all-vectored = ["futures-util/write-all-vectored"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d046c54..6871f47 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,14 +1,13 @@
[package]
name = "futures"
+version = "0.3.21"
edition = "2018"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
@@ -16,13 +15,13 @@ composability, and iterator-like interfaces.
categories = ["asynchronous"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.13", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.13", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.13", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.13", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] }
[dev-dependencies]
futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
@@ -47,11 +46,13 @@ thread-pool = ["executor", "futures-executor/thread-pool"]
# These features are outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
bilock = ["futures-util/bilock"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
write-all-vectored = ["futures-util/write-all-vectored"]
+# These features are no longer used.
+# TODO: remove in the next major version.
+cfg-target-has-atomic = []
+
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
diff --git a/METADATA b/METADATA
index e99c2fa..554f4df 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.13.crate"
+ value: "https://static.crates.io/crates/futures/futures-0.3.21.crate"
}
- version: "0.3.13"
+ version: "0.3.21"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 4
+ year: 2022
+ month: 3
day: 1
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index ec169ff..e2df61d 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,41 +1,39 @@
// Generated by update_crate_tests.py for tests that depend on this crate.
{
- "presubmit": [
- {
- "name": "anyhow_device_test_tests_test_ffi"
- },
- {
- "name": "anyhow_device_test_tests_test_context"
- },
+ "imports": [
{
- "name": "anyhow_device_test_tests_test_repr"
+ "path": "external/rust/crates/anyhow"
},
{
- "name": "anyhow_device_test_tests_test_convert"
- },
+ "path": "external/rust/crates/tokio"
+ }
+ ],
+ "presubmit": [
{
- "name": "anyhow_device_test_tests_test_fmt"
+ "name": "ZipFuseTest"
},
{
- "name": "anyhow_device_test_tests_test_boxed"
+ "name": "authfs_device_test_src_lib"
},
{
- "name": "anyhow_device_test_tests_test_downcast"
+ "name": "doh_unit_test"
},
{
- "name": "anyhow_device_test_tests_test_source"
- },
+ "name": "virtualizationservice_device_test"
+ }
+ ],
+ "presubmit-rust": [
{
- "name": "anyhow_device_test_tests_test_macros"
+ "name": "ZipFuseTest"
},
{
- "name": "anyhow_device_test_src_lib"
+ "name": "authfs_device_test_src_lib"
},
{
- "name": "anyhow_device_test_tests_test_autotrait"
+ "name": "doh_unit_test"
},
{
- "name": "anyhow_device_test_tests_test_chain"
+ "name": "virtualizationservice_device_test"
}
]
}
diff --git a/cargo2android.json b/cargo2android.json
index 01465d0..a7e2a4b 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,11 +1,12 @@
{
"apex-available": [
"//apex_available:platform",
+ "com.android.bluetooth",
"com.android.resolv",
"com.android.virt"
],
- "min_sdk_version": "29",
"dependencies": true,
"device": true,
+ "min-sdk-version": "29",
"run": true
-} \ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index de29ace..b8ebc61 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,12 +3,12 @@
//! This crate provides a number of core abstractions for writing asynchronous
//! code:
//!
-//! - [Futures](crate::future::Future) are single eventual values produced by
+//! - [Futures](crate::future) are single eventual values produced by
//! asynchronous computations. Some programming languages (e.g. JavaScript)
//! call this concept "promise".
-//! - [Streams](crate::stream::Stream) represent a series of values
+//! - [Streams](crate::stream) represent a series of values
//! produced asynchronously.
-//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of
+//! - [Sinks](crate::sink) provide support for asynchronous writing of
//! data.
//! - [Executors](crate::executor) are responsible for running asynchronous
//! tasks.
@@ -25,11 +25,12 @@
//! within macros and keywords such as async and await!.
//!
//! ```rust
+//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
//! # use futures::channel::mpsc;
//! # use futures::executor; ///standard executors to provide a context for futures and streams
//! # use futures::executor::ThreadPool;
//! # use futures::StreamExt;
-//!
+//! #
//! fn main() {
//! let pool = ThreadPool::new().expect("Failed to build pool");
//! let (tx, rx) = mpsc::unbounded::<i32>();
@@ -67,7 +68,7 @@
//! };
//!
//! // Actually execute the above future, which will invoke Future::poll and
-//! // subsequenty chain appropriate Future::poll and methods needing executors
+//! // subsequently chain appropriate Future::poll and methods needing executors
//! // to drive all futures. Eventually fut_values will be driven to completion.
//! let values: Vec<i32> = executor::block_on(fut_values);
//!
@@ -78,48 +79,46 @@
//! The majority of examples and code snippets in this crate assume that they are
//! inside an async block as written above.
-#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
-
#![cfg_attr(not(feature = "std"), no_std)]
-
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
#![cfg_attr(docsrs, feature(doc_cfg))]
-#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
-compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::future::{FutureExt, TryFutureExt};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_core::stream::{Stream, TryStream};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::stream::{StreamExt, TryStreamExt};
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_sink::Sink;
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::sink::SinkExt;
#[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
// Macro reexports
@@ -135,6 +134,10 @@ pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-a
#[doc(inline)]
pub use futures_util::{future, never, sink, stream, task};
+#[cfg(feature = "std")]
+#[cfg(feature = "async-await")]
+pub use futures_util::stream_select;
+
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel;
@@ -148,13 +151,73 @@ pub use futures_util::io;
#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
-#[doc(inline)]
-pub use futures_executor as executor;
+pub mod executor {
+ //! Built-in executors and related tools.
+ //!
+ //! All asynchronous computation occurs within an executor, which is
+ //! capable of spawning futures as tasks. This module provides several
+ //! built-in executors, as well as tools for building your own.
+ //!
+ //!
+ //! This module is only available when the `executor` feature of this
+ //! library is activated.
+ //!
+ //! # Using a thread pool (M:N task scheduling)
+ //!
+ //! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+ //! A small set of worker threads can handle a very large set of spawned tasks
+ //! (which are much lighter weight than threads). Tasks spawned onto the pool
+ //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+ //! the created threads.
+ //!
+ //! # Spawning additional tasks
+ //!
+ //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+ //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+ //! instead.
+ //!
+ //! # Single-threaded execution
+ //!
+ //! In addition to thread pools, it's possible to run a task (and the tasks
+ //! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+ //! Aside from cutting down on synchronization costs, this executor also makes
+ //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+ //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+ //! little work between I/O operations.
+ //!
+ //! There is also a convenience function [`block_on`] for simply running a
+ //! future to completion on the current thread.
+ //!
+ //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+ //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+ pub use futures_executor::{
+ block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
+ LocalSpawner,
+ };
+
+ #[cfg(feature = "thread-pool")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+ pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-#[doc(inline)]
-pub use futures_util::compat;
+pub mod compat {
+ //! Interop between `futures` 0.1 and 0.3.
+ //!
+ //! This module is only available when the `compat` feature of this
+ //! library is activated.
+
+ pub use futures_util::compat::{
+ Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
+ Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
+ };
+
+ #[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+ pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+}
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
@@ -175,10 +238,12 @@ pub mod prelude {
pub use crate::stream::{self, Stream, TryStream};
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::future::{FutureExt as _, TryFutureExt as _};
#[doc(no_inline)]
pub use crate::sink::SinkExt as _;
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::stream::{StreamExt as _, TryStreamExt as _};
#[cfg(feature = "std")]
@@ -186,6 +251,7 @@ pub mod prelude {
#[cfg(feature = "std")]
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::io::{
AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
};
diff --git a/tests/_require_features.rs b/tests/_require_features.rs
index da76dcd..8046cc9 100644
--- a/tests/_require_features.rs
+++ b/tests/_require_features.rs
@@ -1,8 +1,13 @@
#[cfg(not(all(
- feature = "std", feature = "alloc", feature = "async-await",
- feature = "compat", feature = "io-compat",
- feature = "executor", feature = "thread-pool",
+ feature = "std",
+ feature = "alloc",
+ feature = "async-await",
+ feature = "compat",
+ feature = "io-compat",
+ feature = "executor",
+ feature = "thread-pool",
)))]
-compile_error!("`futures` tests must have all stable features activated: \
+compile_error!(
+ "`futures` tests must have all stable features activated: \
use `--all-features` or `--features default,thread-pool,io-compat`"
);
diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs
deleted file mode 100644
index d19a83d..0000000
--- a/tests/arc_wake.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-mod countingwaker {
- use futures::task::{self, ArcWake, Waker};
- use std::sync::{Arc, Mutex};
-
- struct CountingWaker {
- nr_wake: Mutex<i32>,
- }
-
- impl CountingWaker {
- fn new() -> Self {
- Self {
- nr_wake: Mutex::new(0),
- }
- }
-
- fn wakes(&self) -> i32 {
- *self.nr_wake.lock().unwrap()
- }
- }
-
- impl ArcWake for CountingWaker {
- fn wake_by_ref(arc_self: &Arc<Self>) {
- let mut lock = arc_self.nr_wake.lock().unwrap();
- *lock += 1;
- }
- }
-
- #[test]
- fn create_from_arc() {
- let some_w = Arc::new(CountingWaker::new());
-
- let w1: Waker = task::waker(some_w.clone());
- assert_eq!(2, Arc::strong_count(&some_w));
- w1.wake_by_ref();
- assert_eq!(1, some_w.wakes());
-
- let w2 = w1.clone();
- assert_eq!(3, Arc::strong_count(&some_w));
-
- w2.wake_by_ref();
- assert_eq!(2, some_w.wakes());
-
- drop(w2);
- assert_eq!(2, Arc::strong_count(&some_w));
- drop(w1);
- assert_eq!(1, Arc::strong_count(&some_w));
- }
-
- #[test]
- fn ref_wake_same() {
- let some_w = Arc::new(CountingWaker::new());
-
- let w1: Waker = task::waker(some_w.clone());
- let w2 = task::waker_ref(&some_w);
- let w3 = w2.clone();
-
- assert!(w1.will_wake(&w2));
- assert!(w2.will_wake(&w3));
- }
-}
-
-#[test]
-fn proper_refcount_on_wake_panic() {
- use futures::task::{self, ArcWake, Waker};
- use std::sync::Arc;
-
- struct PanicWaker;
-
- impl ArcWake for PanicWaker {
- fn wake_by_ref(_arc_self: &Arc<Self>) {
- panic!("WAKE UP");
- }
- }
-
- let some_w = Arc::new(PanicWaker);
-
- let w1: Waker = task::waker(some_w.clone());
- assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap());
- assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
- drop(w1);
- assert_eq!(1, Arc::strong_count(&some_w)); // some_w
-}
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index bd586d6..ce1f3a3 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -1,9 +1,16 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, poll_fn, FutureExt};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+use futures::{
+ join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
+};
+use std::mem;
+
#[test]
fn poll_and_pending() {
- use futures::{pending, pin_mut, poll};
- use futures::executor::block_on;
- use futures::task::Poll;
-
let pending_once = async { pending!() };
block_on(async {
pin_mut!(pending_once);
@@ -14,11 +21,6 @@ fn poll_and_pending() {
#[test]
fn join() {
- use futures::{pin_mut, poll, join};
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::task::Poll;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
@@ -39,11 +41,6 @@ fn join() {
#[test]
fn select() {
- use futures::select;
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
@@ -62,11 +59,6 @@ fn select() {
#[test]
fn select_biased() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use futures::select_biased;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
@@ -85,12 +77,6 @@ fn select_biased() {
#[test]
fn select_streams() {
- use futures::select;
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::sink::SinkExt;
- use futures::stream::StreamExt;
-
let (mut tx1, rx1) = mpsc::channel::<i32>(1);
let (mut tx2, rx2) = mpsc::channel::<i32>(1);
let mut rx1 = rx1.fuse();
@@ -134,11 +120,6 @@ fn select_streams() {
#[test]
fn select_can_move_uncompleted_futures() {
- use futures::select;
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
@@ -165,10 +146,6 @@ fn select_can_move_uncompleted_futures() {
#[test]
fn select_nested() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future;
-
let mut outer_fut = future::ready(1);
let mut inner_fut = future::ready(2);
let res = block_on(async {
@@ -183,18 +160,16 @@ fn select_nested() {
assert_eq!(res, 3);
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn select_size() {
- use futures::select;
- use futures::future;
-
let fut = async {
let mut ready = future::ready(0i32);
select! {
_ = ready => {},
}
};
- assert_eq!(::std::mem::size_of_val(&fut), 24);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let mut ready1 = future::ready(0i32);
@@ -204,19 +179,13 @@ fn select_size() {
_ = ready2 => {},
}
};
- assert_eq!(::std::mem::size_of_val(&fut), 40);
+ assert_eq!(mem::size_of_val(&fut), 40);
}
#[test]
fn select_on_non_unpin_expressions() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
// The returned Future is !Unpin
- let make_non_unpin_fut = || { async {
- 5
- }};
+ let make_non_unpin_fut = || async { 5 };
let res = block_on(async {
let select_res;
@@ -231,14 +200,8 @@ fn select_on_non_unpin_expressions() {
#[test]
fn select_on_non_unpin_expressions_with_default() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
// The returned Future is !Unpin
- let make_non_unpin_fut = || { async {
- 5
- }};
+ let make_non_unpin_fut = || async { 5 };
let res = block_on(async {
let select_res;
@@ -252,15 +215,11 @@ fn select_on_non_unpin_expressions_with_default() {
assert_eq!(res, 5);
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn select_on_non_unpin_size() {
- use futures::select;
- use futures::future::FutureExt;
-
// The returned Future is !Unpin
- let make_non_unpin_fut = || { async {
- 5
- }};
+ let make_non_unpin_fut = || async { 5 };
let fut = async {
let select_res;
@@ -271,15 +230,11 @@ fn select_on_non_unpin_size() {
select_res
};
- assert_eq!(32, std::mem::size_of_val(&fut));
+ assert_eq!(32, mem::size_of_val(&fut));
}
#[test]
fn select_can_be_used_as_expression() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future;
-
block_on(async {
let res = select! {
x = future::ready(7) => x,
@@ -291,11 +246,6 @@ fn select_can_be_used_as_expression() {
#[test]
fn select_with_default_can_be_used_as_expression() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future::{FutureExt, poll_fn};
- use futures::task::{Context, Poll};
-
fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
Poll::Pending
}
@@ -312,10 +262,6 @@ fn select_with_default_can_be_used_as_expression() {
#[test]
fn select_with_complete_can_be_used_as_expression() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future;
-
block_on(async {
let res = select! {
x = future::pending::<i32>() => x,
@@ -330,10 +276,6 @@ fn select_with_complete_can_be_used_as_expression() {
#[test]
#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
async fn require_mutable(_: &mut i32) {}
async fn async_noop() {}
@@ -351,10 +293,6 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
#[test]
#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
- use futures::select;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
async fn require_mutable(_: &mut i32) {}
async fn async_noop() {}
@@ -373,60 +311,79 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
}
#[test]
-fn join_size() {
- use futures::join;
- use futures::future;
+#[allow(unused_assignments)]
+fn stream_select() {
+ // stream_select! macro
+ block_on(async {
+ let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
+
+ let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
+ assert_eq!(endless_ones.next().await, Some(1));
+ assert_eq!(endless_ones.next().await, Some(1));
+
+ let mut finite_list =
+ stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
+ assert_eq!(finite_list.next().await, Some(1));
+ assert_eq!(finite_list.next().await, Some(1));
+ assert_eq!(finite_list.next().await, None);
+
+ let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+ // Take 1000, and assert a somewhat even distribution of values.
+ // The fairness is randomized, but over 1000 samples we should be pretty close to even.
+ // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
+ let mut count = 0;
+ let results = endless_mixed
+ .take_while(move |_| {
+ count += 1;
+ let ret = count < 1000;
+ async move { ret }
+ })
+ .collect::<Vec<_>>()
+ .await;
+ assert!(results.iter().filter(|x| **x == 1).count() >= 299);
+ assert!(results.iter().filter(|x| **x == 2).count() >= 299);
+ assert!(results.iter().filter(|x| **x == 3).count() >= 299);
+ });
+}
+#[test]
+fn join_size() {
let fut = async {
let ready = future::ready(0i32);
join!(ready)
};
- assert_eq!(::std::mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 16);
let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
join!(ready1, ready2)
};
- assert_eq!(::std::mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 28);
}
#[test]
fn try_join_size() {
- use futures::try_join;
- use futures::future;
-
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
};
- assert_eq!(::std::mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 16);
let fut = async {
let ready1 = future::ready(Ok::<i32, i32>(0));
let ready2 = future::ready(Ok::<i32, i32>(0));
try_join!(ready1, ready2)
};
- assert_eq!(::std::mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 28);
}
#[test]
fn join_doesnt_require_unpin() {
- use futures::join;
-
- let _ = async {
- join!(async {}, async {})
- };
+ let _ = async { join!(async {}, async {}) };
}
#[test]
fn try_join_doesnt_require_unpin() {
- use futures::try_join;
-
- let _ = async {
- try_join!(
- async { Ok::<(), ()>(()) },
- async { Ok::<(), ()>(()) },
- )
- };
+ let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
}
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index 111fdf6..b3d8b00 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -470,6 +470,13 @@ pub mod future {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);
+ assert_impl!(PollImmediate<SendStream>: Send);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+ assert_impl!(PollImmediate<SyncStream>: Sync);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+ assert_impl!(PollImmediate<UnpinStream>: Unpin);
+ assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
assert_impl!(Ready<()>: Send);
assert_not_impl!(Ready<*const ()>: Send);
assert_impl!(Ready<()>: Sync);
@@ -810,6 +817,12 @@ pub mod io {
assert_impl!(Seek<'_, ()>: Unpin);
assert_not_impl!(Seek<'_, PhantomPinned>: Unpin);
+ assert_impl!(SeeKRelative<'_, ()>: Send);
+ assert_not_impl!(SeeKRelative<'_, *const ()>: Send);
+ assert_impl!(SeeKRelative<'_, ()>: Sync);
+ assert_not_impl!(SeeKRelative<'_, *const ()>: Sync);
+ assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin);
+
assert_impl!(Sink: Send);
assert_impl!(Sink: Sync);
assert_impl!(Sink: Unpin);
@@ -1383,6 +1396,26 @@ pub mod stream {
assert_impl!(Next<'_, ()>: Unpin);
assert_not_impl!(Next<'_, PhantomPinned>: Unpin);
+ assert_impl!(NextIf<'_, SendStream<()>, ()>: Send);
+ assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send);
+ assert_not_impl!(NextIf<'_, SendStream, ()>: Send);
+ assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send);
+ assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync);
+ assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync);
+ assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync);
+ assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send);
+ assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin);
+
+ assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send);
+ assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send);
+ assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send);
+ assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send);
+ assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync);
+ assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync);
+ assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync);
+ assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send);
+ assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin);
+
assert_impl!(Once<()>: Send);
assert_not_impl!(Once<*const ()>: Send);
assert_impl!(Once<()>: Sync);
@@ -1410,6 +1443,14 @@ pub mod stream {
assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
assert_impl!(Peek<'_, PinnedStream>: Unpin);
+ assert_impl!(PeekMut<'_, SendStream<()>>: Send);
+ assert_not_impl!(PeekMut<'_, SendStream>: Send);
+ assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
+ assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
+ assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
+ assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
+ assert_impl!(PeekMut<'_, PinnedStream>: Unpin);
+
assert_impl!(Peekable<SendStream<()>>: Send);
assert_not_impl!(Peekable<SendStream>: Send);
assert_not_impl!(Peekable<LocalStream>: Send);
@@ -1431,6 +1472,13 @@ pub mod stream {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);
+ assert_impl!(PollImmediate<SendStream>: Send);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+ assert_impl!(PollImmediate<SyncStream>: Sync);
+ assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+ assert_impl!(PollImmediate<UnpinStream>: Unpin);
+ assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
assert_impl!(ReadyChunks<SendStream<()>>: Send);
assert_not_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
@@ -1780,25 +1828,40 @@ pub mod stream {
assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin);
assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin);
- assert_not_impl!(futures_unordered::Iter<()>: Send);
- assert_not_impl!(futures_unordered::Iter<()>: Sync);
+ assert_impl!(futures_unordered::Iter<()>: Send);
+ assert_not_impl!(futures_unordered::Iter<*const ()>: Send);
+ assert_impl!(futures_unordered::Iter<()>: Sync);
+ assert_not_impl!(futures_unordered::Iter<*const ()>: Sync);
assert_impl!(futures_unordered::Iter<()>: Unpin);
- // futures_unordered::Iter requires `Fut: Unpin`
+ // The definition of futures_unordered::Iter has `Fut: Unpin` bounds.
// assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin);
- assert_not_impl!(futures_unordered::IterMut<()>: Send);
- assert_not_impl!(futures_unordered::IterMut<()>: Sync);
+ assert_impl!(futures_unordered::IterMut<()>: Send);
+ assert_not_impl!(futures_unordered::IterMut<*const ()>: Send);
+ assert_impl!(futures_unordered::IterMut<()>: Sync);
+ assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync);
assert_impl!(futures_unordered::IterMut<()>: Unpin);
- // futures_unordered::IterMut requires `Fut: Unpin`
+ // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds.
// assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin);
- assert_not_impl!(futures_unordered::IterPinMut<()>: Send);
- assert_not_impl!(futures_unordered::IterPinMut<()>: Sync);
+ assert_impl!(futures_unordered::IterPinMut<()>: Send);
+ assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send);
+ assert_impl!(futures_unordered::IterPinMut<()>: Sync);
+ assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin);
- assert_not_impl!(futures_unordered::IterPinRef<()>: Send);
- assert_not_impl!(futures_unordered::IterPinRef<()>: Sync);
+ assert_impl!(futures_unordered::IterPinRef<()>: Send);
+ assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send);
+ assert_impl!(futures_unordered::IterPinRef<()>: Sync);
+ assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);
+
+ assert_impl!(futures_unordered::IntoIter<()>: Send);
+ assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
+ assert_impl!(futures_unordered::IntoIter<()>: Sync);
+ assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync);
+ // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds.
+ // assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin);
}
/// Assert Send/Sync/Unpin for all public types in `futures::task`.
diff --git a/tests/compat.rs b/tests/compat.rs
index 39adc7c..ac04a95 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -1,16 +1,15 @@
#![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
-use tokio::timer::Delay;
-use tokio::runtime::Runtime;
-use std::time::Instant;
-use futures::prelude::*;
use futures::compat::Future01CompatExt;
+use futures::prelude::*;
+use std::time::Instant;
+use tokio::runtime::Runtime;
+use tokio::timer::Delay;
#[test]
fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() {
- let f = async {
- Delay::new(Instant::now()).compat().await
- };
+ let f = async { Delay::new(Instant::now()).compat().await };
let mut runtime = Runtime::new().unwrap();
runtime.block_on(f.boxed().compat()).unwrap();
diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs
index 11edb1b..9925077 100644
--- a/tests/eager_drop.rs
+++ b/tests/eager_drop.rs
@@ -1,16 +1,23 @@
+use futures::channel::oneshot;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use pin_project::pin_project;
+use std::pin::Pin;
+use std::sync::mpsc;
+
#[test]
fn map_ok() {
- use futures::future::{self, FutureExt, TryFutureExt};
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
// The closure given to `map_ok` should have been dropped by the time `map`
// runs.
let (tx1, rx1) = mpsc::channel::<()>();
let (tx2, rx2) = mpsc::channel::<()>();
future::ready::<Result<i32, i32>>(Err(1))
- .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map_ok(move |_| {
+ let _tx1 = tx1;
+ panic!("should not run");
+ })
.map(move |_| {
assert!(rx1.recv().is_err());
tx2.send(()).unwrap()
@@ -22,17 +29,16 @@ fn map_ok() {
#[test]
fn map_err() {
- use futures::future::{self, FutureExt, TryFutureExt};
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
// The closure given to `map_err` should have been dropped by the time `map`
// runs.
let (tx1, rx1) = mpsc::channel::<()>();
let (tx2, rx2) = mpsc::channel::<()>();
future::ready::<Result<i32, i32>>(Ok(1))
- .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map_err(move |_| {
+ let _tx1 = tx1;
+ panic!("should not run");
+ })
.map(move |_| {
assert!(rx1.recv().is_err());
tx2.send(()).unwrap()
@@ -42,96 +48,74 @@ fn map_err() {
rx2.recv().unwrap();
}
-mod channelled {
- use futures::future::Future;
- use futures::task::{Context,Poll};
- use pin_project::pin_project;
- use std::pin::Pin;
-
- #[pin_project]
- struct FutureData<F, T> {
- _data: T,
- #[pin]
- future: F,
- }
+#[pin_project]
+struct FutureData<F, T> {
+ _data: T,
+ #[pin]
+ future: F,
+}
- impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
- type Output = F::Output;
+impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+ type Output = F::Output;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
- self.project().future.poll(cx)
- }
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
+ self.project().future.poll(cx)
}
+}
- #[test]
- fn then_drops_eagerly() {
- use futures::channel::oneshot;
- use futures::future::{self, FutureExt, TryFutureExt};
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
- let (tx0, rx0) = oneshot::channel::<()>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
-
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .then(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready(())
- })
- .run_in_background();
-
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(()).unwrap();
- rx2.recv().unwrap();
- }
+#[test]
+fn then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<()>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
- #[test]
- fn and_then_drops_eagerly() {
- use futures::channel::oneshot;
- use futures::future::{self, TryFutureExt};
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
- let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
-
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .and_then(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready(Ok(()))
- })
- .run_in_background();
-
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(Ok(())).unwrap();
- rx2.recv().unwrap();
- }
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+ .then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(())
+ })
+ .run_in_background();
- #[test]
- fn or_else_drops_eagerly() {
- use futures::channel::oneshot;
- use futures::future::{self, TryFutureExt};
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
- let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
-
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .or_else(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready::<Result<(), ()>>(Ok(()))
- })
- .run_in_background();
-
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(Err(())).unwrap();
- rx2.recv().unwrap();
- }
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(()).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn and_then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+ .and_then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Ok(())).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn or_else_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+ .or_else(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready::<Result<(), ()>>(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Err(())).unwrap();
+ rx2.recv().unwrap();
}
diff --git a/tests/eventual.rs b/tests/eventual.rs
index bff000d..3461380 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
use futures::channel::oneshot;
use futures::executor::ThreadPool;
use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
diff --git a/tests/abortable.rs b/tests/future_abortable.rs
index 6b5a25c..e119f0b 100644
--- a/tests/abortable.rs
+++ b/tests/future_abortable.rs
@@ -1,45 +1,44 @@
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{abortable, Aborted, FutureExt};
+use futures::task::{Context, Poll};
+use futures_test::task::new_count_waker;
+
#[test]
fn abortable_works() {
- use futures::channel::oneshot;
- use futures::future::{abortable, Aborted};
- use futures::executor::block_on;
-
let (_tx, a_rx) = oneshot::channel::<()>();
let (abortable_rx, abort_handle) = abortable(a_rx);
abort_handle.abort();
+ assert!(abortable_rx.is_aborted());
assert_eq!(Err(Aborted), block_on(abortable_rx));
}
#[test]
fn abortable_awakens() {
- use futures::channel::oneshot;
- use futures::future::{abortable, Aborted, FutureExt};
- use futures::task::{Context, Poll};
- use futures_test::task::new_count_waker;
-
let (_tx, a_rx) = oneshot::channel::<()>();
let (mut abortable_rx, abort_handle) = abortable(a_rx);
let (waker, counter) = new_count_waker();
let mut cx = Context::from_waker(&waker);
+
assert_eq!(counter, 0);
assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx));
assert_eq!(counter, 0);
+
abort_handle.abort();
assert_eq!(counter, 1);
+ assert!(abortable_rx.is_aborted());
assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
}
#[test]
fn abortable_resolves() {
- use futures::channel::oneshot;
- use futures::future::abortable;
- use futures::executor::block_on;
let (tx, a_rx) = oneshot::channel::<()>();
let (abortable_rx, _abort_handle) = abortable(a_rx);
tx.send(()).unwrap();
+ assert!(!abortable_rx.is_aborted());
assert_eq!(Ok(Ok(())), block_on(abortable_rx));
}
diff --git a/tests/basic_combinators.rs b/tests/future_basic_combinators.rs
index fa65b6f..372ab48 100644
--- a/tests/basic_combinators.rs
+++ b/tests/future_basic_combinators.rs
@@ -13,17 +13,21 @@ fn basic_future_combinators() {
tx1.send(x).unwrap(); // Send 1
tx1.send(2).unwrap(); // Send 2
future::ready(3)
- }).map(move |x| {
+ })
+ .map(move |x| {
tx2.send(x).unwrap(); // Send 3
tx2.send(4).unwrap(); // Send 4
5
- }).map(move |x| {
+ })
+ .map(move |x| {
tx3.send(x).unwrap(); // Send 5
});
assert!(rx.try_recv().is_err()); // Not started yet
fut.run_in_background(); // Start it
- for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ for i in 1..=5 {
+ assert_eq!(rx.recv(), Ok(i));
+ } // Check it
assert!(rx.recv().is_err()); // Should be done
}
@@ -93,6 +97,8 @@ fn basic_try_future_combinators() {
assert!(rx.try_recv().is_err()); // Not started yet
fut.run_in_background(); // Start it
- for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ for i in 1..=12 {
+ assert_eq!(rx.recv(), Ok(i));
+ } // Check it
assert!(rx.recv().is_err()); // Should be done
}
diff --git a/tests/fuse.rs b/tests/future_fuse.rs
index 83f2c1c..83f2c1c 100644
--- a/tests/fuse.rs
+++ b/tests/future_fuse.rs
diff --git a/tests/future_inspect.rs b/tests/future_inspect.rs
new file mode 100644
index 0000000..eacd1f7
--- /dev/null
+++ b/tests/future_inspect.rs
@@ -0,0 +1,16 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+
+#[test]
+fn smoke() {
+ let mut counter = 0;
+
+ {
+ let work = future::ready::<i32>(40).inspect(|val| {
+ counter += *val;
+ });
+ assert_eq!(block_on(work), 40);
+ }
+
+ assert_eq!(counter, 40);
+}
diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs
new file mode 100644
index 0000000..44486e1
--- /dev/null
+++ b/tests/future_join_all.rs
@@ -0,0 +1,41 @@
+use futures::executor::block_on;
+use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
+use std::fmt::Debug;
+
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
+where
+ T: PartialEq + Debug,
+{
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+ assert_done(join_all(vec![ready(1)]), vec![1]);
+ // REVIEW: should this be implemented?
+ // assert_done(join_all(Vec::<i32>::new()), vec![]);
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `JoinAll`.
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
+ let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
+ join_all(iter)
+ }
+
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+}
+
+#[test]
+fn join_all_from_iter() {
+ assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
+}
diff --git a/tests/future_obj.rs b/tests/future_obj.rs
index c6b18fc..0e52534 100644
--- a/tests/future_obj.rs
+++ b/tests/future_obj.rs
@@ -1,6 +1,6 @@
-use futures::future::{Future, FutureObj, FutureExt};
-use std::pin::Pin;
+use futures::future::{Future, FutureExt, FutureObj};
use futures::task::{Context, Poll};
+use std::pin::Pin;
#[test]
fn dropping_does_not_segfault() {
diff --git a/tests/select_all.rs b/tests/future_select_all.rs
index 540db2c..299b479 100644
--- a/tests/select_all.rs
+++ b/tests/future_select_all.rs
@@ -1,14 +1,10 @@
+use futures::executor::block_on;
+use futures::future::{ready, select_all};
+use std::collections::HashSet;
+
#[test]
fn smoke() {
- use futures::executor::block_on;
- use futures::future::{ready, select_all};
- use std::collections::HashSet;
-
- let v = vec![
- ready(1),
- ready(2),
- ready(3),
- ];
+ let v = vec![ready(1), ready(2), ready(3)];
let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>();
diff --git a/tests/select_ok.rs b/tests/future_select_ok.rs
index 81cadb7..8aec003 100644
--- a/tests/select_ok.rs
+++ b/tests/future_select_ok.rs
@@ -1,14 +1,9 @@
+use futures::executor::block_on;
+use futures::future::{err, ok, select_ok};
+
#[test]
fn ignore_err() {
- use futures::executor::block_on;
- use futures::future::{err, ok, select_ok};
-
- let v = vec![
- err(1),
- err(2),
- ok(3),
- ok(4),
- ];
+ let v = vec![err(1), err(2), ok(3), ok(4)];
let (i, v) = block_on(select_ok(v)).ok().unwrap();
assert_eq!(i, 3);
@@ -23,14 +18,7 @@ fn ignore_err() {
#[test]
fn last_err() {
- use futures::executor::block_on;
- use futures::future::{err, ok, select_ok};
-
- let v = vec![
- ok(1),
- err(2),
- err(3),
- ];
+ let v = vec![ok(1), err(2), err(3)];
let (i, v) = block_on(select_ok(v)).ok().unwrap();
assert_eq!(i, 1);
diff --git a/tests/shared.rs b/tests/future_shared.rs
index cc0c6a2..3ceaebb 100644
--- a/tests/shared.rs
+++ b/tests/future_shared.rs
@@ -1,22 +1,22 @@
-mod count_clone {
- use std::cell::Cell;
- use std::rc::Rc;
-
- pub struct CountClone(pub Rc<Cell<i32>>);
-
- impl Clone for CountClone {
- fn clone(&self) -> Self {
- self.0.set(self.0.get() + 1);
- Self(self.0.clone())
- }
+use futures::channel::oneshot;
+use futures::executor::{block_on, LocalPool};
+use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
+use futures::task::LocalSpawn;
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::task::Poll;
+use std::thread;
+
+struct CountClone(Rc<Cell<i32>>);
+
+impl Clone for CountClone {
+ fn clone(&self) -> Self {
+ self.0.set(self.0.get() + 1);
+ Self(self.0.clone())
}
}
fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use std::thread;
let (tx, rx) = oneshot::channel::<i32>();
let f = rx.shared();
let join_handles = (0..threads_number)
@@ -53,11 +53,6 @@ fn many_threads() {
#[test]
fn drop_on_one_task_ok() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::{self, FutureExt, TryFutureExt};
- use std::thread;
-
let (tx, rx) = oneshot::channel::<u32>();
let f1 = rx.shared();
let f2 = f1.clone();
@@ -86,11 +81,6 @@ fn drop_on_one_task_ok() {
#[test]
fn drop_in_poll() {
- use futures::executor::block_on;
- use futures::future::{self, FutureExt, LocalFutureObj};
- use std::cell::RefCell;
- use std::rc::Rc;
-
let slot1 = Rc::new(RefCell::new(None));
let slot2 = slot1.clone();
@@ -106,13 +96,9 @@ fn drop_in_poll() {
assert_eq!(block_on(future1), 1);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn peek() {
- use futures::channel::oneshot;
- use futures::executor::LocalPool;
- use futures::future::{FutureExt, LocalFutureObj};
- use futures::task::LocalSpawn;
-
let mut local_pool = LocalPool::new();
let spawn = &mut local_pool.spawner();
@@ -134,9 +120,7 @@ fn peek() {
}
// Once the Shared has been polled, the value is peekable on the clone.
- spawn
- .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ()))))
- .unwrap();
+ spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
local_pool.run();
for _ in 0..2 {
assert_eq!(*f2.peek().unwrap(), Ok(42));
@@ -145,10 +129,6 @@ fn peek() {
#[test]
fn downgrade() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
-
let (tx, rx) = oneshot::channel::<i32>();
let shared = rx.shared();
// Since there are outstanding `Shared`s, we can get a `WeakShared`.
@@ -173,14 +153,6 @@ fn downgrade() {
#[test]
fn dont_clone_in_single_owner_shared_future() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use std::cell::Cell;
- use std::rc::Rc;
-
- use count_clone::CountClone;
-
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -193,14 +165,6 @@ fn dont_clone_in_single_owner_shared_future() {
#[test]
fn dont_do_unnecessary_clones_on_output() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use std::cell::Cell;
- use std::rc::Rc;
-
- use count_clone::CountClone;
-
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -215,11 +179,6 @@ fn dont_do_unnecessary_clones_on_output() {
#[test]
fn shared_future_that_wakes_itself_until_pending_is_returned() {
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use std::cell::Cell;
- use std::task::Poll;
-
let proceed = Cell::new(false);
let fut = futures::future::poll_fn(|cx| {
if proceed.get() {
@@ -233,8 +192,5 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() {
// The join future can only complete if the second future gets a chance to run after the first
// has returned pending
- assert_eq!(
- block_on(futures::future::join(fut, async { proceed.set(true) })),
- ((), ())
- );
+ assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}
diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs
index 4a614f9..82ae1ba 100644
--- a/tests/future_try_flatten_stream.rs
+++ b/tests/future_try_flatten_stream.rs
@@ -1,9 +1,14 @@
+use futures::executor::block_on_stream;
+use futures::future::{err, ok, TryFutureExt};
+use futures::sink::Sink;
+use futures::stream::Stream;
+use futures::stream::{self, StreamExt};
+use futures::task::{Context, Poll};
+use std::marker::PhantomData;
+use std::pin::Pin;
+
#[test]
fn successful_future() {
- use futures::executor::block_on_stream;
- use futures::future::{ok, TryFutureExt};
- use futures::stream::{self, StreamExt};
-
let stream_items = vec![17, 19];
let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
@@ -17,15 +22,8 @@ fn successful_future() {
#[test]
fn failed_future() {
- use core::marker::PhantomData;
- use core::pin::Pin;
- use futures::executor::block_on_stream;
- use futures::future::{err, TryFutureExt};
- use futures::stream::Stream;
- use futures::task::{Context, Poll};
-
struct PanickingStream<T, E> {
- _marker: PhantomData<(T, E)>
+ _marker: PhantomData<(T, E)>,
}
impl<T, E> Stream for PanickingStream<T, E> {
@@ -45,13 +43,6 @@ fn failed_future() {
#[test]
fn assert_impls() {
- use core::marker::PhantomData;
- use core::pin::Pin;
- use futures::sink::Sink;
- use futures::stream::Stream;
- use futures::task::{Context, Poll};
- use futures::future::{ok, TryFutureExt};
-
struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
impl<T, E, Item> Stream for StreamSink<T, E, Item> {
diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs
new file mode 100644
index 0000000..9a82487
--- /dev/null
+++ b/tests/future_try_join_all.rs
@@ -0,0 +1,46 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures_util::future::{err, ok, try_join_all, TryJoinAll};
+use std::fmt::Debug;
+use std::future::Future;
+
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
+where
+ T: PartialEq + Debug,
+{
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+ assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+ assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
+ // REVIEW: should this be implemented?
+ // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn try_join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `TryJoinAll`.
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
+ let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
+ try_join_all(iter)
+ }
+
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+}
+
+#[test]
+fn try_join_all_from_iter() {
+ assert_done(
+ vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
+ Ok::<_, usize>(vec![1, 2]),
+ )
+}
diff --git a/tests/inspect.rs b/tests/inspect.rs
deleted file mode 100644
index 375778b..0000000
--- a/tests/inspect.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-#[test]
-fn smoke() {
- use futures::executor::block_on;
- use futures::future::{self, FutureExt};
-
- let mut counter = 0;
-
- {
- let work = future::ready::<i32>(40).inspect(|val| { counter += *val; });
- assert_eq!(block_on(work), 40);
- }
-
- assert_eq!(counter, 40);
-}
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index f8f9d14..717297c 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -1,156 +1,240 @@
-macro_rules! run_fill_buf {
- ($reader:expr) => {{
- use futures_test::task::noop_context;
- use futures::task::Poll;
- use std::pin::Pin;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
- break x;
- }
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+ AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
+ BufReader, SeekFrom,
+};
+use futures::pin_mut;
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use pin_project::pin_project;
+use std::cmp;
+use std::io;
+use std::pin::Pin;
+
+// helper for maybe_pending_* tests
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
}
- }};
+ }
}
-mod util {
- use futures::future::Future;
- pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures_test::task::noop_context;
- use futures::task::Poll;
- use futures::future::FutureExt;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
+// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
+#[pin_project(!Unpin)]
+struct Cursor<T> {
+ #[pin]
+ inner: futures::io::Cursor<T>,
+}
+
+impl<T> Cursor<T> {
+ fn new(inner: T) -> Self {
+ Self { inner: futures::io::Cursor::new(inner) }
}
}
-mod maybe_pending {
- use futures::task::{Context,Poll};
- use std::{cmp,io};
- use std::pin::Pin;
- use futures::io::{AsyncRead,AsyncBufRead};
+impl AsyncRead for Cursor<&[u8]> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_read(cx, buf)
+ }
+}
- pub struct MaybePending<'a> {
- inner: &'a [u8],
- ready_read: bool,
- ready_fill_buf: bool,
+impl AsyncBufRead for Cursor<&[u8]> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.project().inner.poll_fill_buf(cx)
}
- impl<'a> MaybePending<'a> {
- pub fn new(inner: &'a [u8]) -> Self {
- Self { inner, ready_read: false, ready_fill_buf: false }
- }
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.project().inner.consume(amt)
}
+}
- impl AsyncRead for MaybePending<'_> {
- fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
- -> Poll<io::Result<usize>>
- {
- if self.ready_read {
- self.ready_read = false;
- Pin::new(&mut self.inner).poll_read(cx, buf)
- } else {
- self.ready_read = true;
- Poll::Pending
- }
+impl AsyncSeek for Cursor<&[u8]> {
+ fn poll_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ self.project().inner.poll_seek(cx, pos)
+ }
+}
+
+struct MaybePending<'a> {
+ inner: &'a [u8],
+ ready_read: bool,
+ ready_fill_buf: bool,
+}
+
+impl<'a> MaybePending<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self { inner, ready_read: false, ready_fill_buf: false }
+ }
+}
+
+impl AsyncRead for MaybePending<'_> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready_read {
+ self.ready_read = false;
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ } else {
+ self.ready_read = true;
+ Poll::Pending
}
}
+}
- impl AsyncBufRead for MaybePending<'_> {
- fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
- -> Poll<io::Result<&[u8]>>
- {
- if self.ready_fill_buf {
- self.ready_fill_buf = false;
- if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
- let len = cmp::min(2, self.inner.len());
- Poll::Ready(Ok(&self.inner[0..len]))
- } else {
- self.ready_fill_buf = true;
- Poll::Pending
+impl AsyncBufRead for MaybePending<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ if self.ready_fill_buf {
+ self.ready_fill_buf = false;
+ if self.inner.is_empty() {
+ return Poll::Ready(Ok(&[]));
}
+ let len = cmp::min(2, self.inner.len());
+ Poll::Ready(Ok(&self.inner[0..len]))
+ } else {
+ self.ready_fill_buf = true;
+ Poll::Pending
}
+ }
- fn consume(mut self: Pin<&mut Self>, amt: usize) {
- self.inner = &self.inner[amt..];
- }
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.inner = &self.inner[amt..];
}
}
#[test]
fn test_buffered_reader() {
- use futures::executor::block_on;
- use futures::io::{AsyncReadExt, BufReader};
-
- let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, inner);
-
- let mut buf = [0, 0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 3);
- assert_eq!(buf, [5, 6, 7]);
- assert_eq!(reader.buffer(), []);
-
- let mut buf = [0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 2);
- assert_eq!(buf, [0, 1]);
- assert_eq!(reader.buffer(), []);
-
- let mut buf = [0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [2]);
- assert_eq!(reader.buffer(), [3]);
-
- let mut buf = [0, 0, 0];
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [3, 0, 0]);
- assert_eq!(reader.buffer(), []);
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, inner);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = reader.read(&mut buf).await.unwrap();
+ assert_eq!(nread, 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ });
+}
- let nread = block_on(reader.read(&mut buf));
- assert_eq!(nread.unwrap(), 1);
- assert_eq!(buf, [4, 0, 0]);
- assert_eq!(reader.buffer(), []);
+#[test]
+fn test_buffered_reader_seek() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(2, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
+ reader.as_mut().consume(1);
+ assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+ });
+}
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+#[test]
+fn test_buffered_reader_seek_relative() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(2, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert!(reader.as_mut().seek_relative(3).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(0).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(1).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
+ assert!(reader.as_mut().seek_relative(-1).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+ assert!(reader.as_mut().seek_relative(2).await.is_ok());
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
+ });
}
#[test]
-fn test_buffered_reader_seek() {
- use futures::executor::block_on;
- use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom};
- use std::pin::Pin;
- use util::run;
+fn test_buffered_reader_invalidated_after_read() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(3, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+ reader.as_mut().consume(3);
+
+ let mut buffer = [0, 0, 0, 0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
+ assert_eq!(buffer, [0, 1, 2, 3, 4]);
+
+ assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+ let mut buffer = [0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+ assert_eq!(buffer, [3, 4]);
+ });
+}
- let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
-
- assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
- Pin::new(&mut reader).consume(1);
- assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+#[test]
+fn test_buffered_reader_invalidated_after_seek() {
+ block_on(async {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let reader = BufReader::with_capacity(3, Cursor::new(inner));
+ pin_mut!(reader);
+
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+ reader.as_mut().consume(3);
+
+ assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
+
+ assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+ let mut buffer = [0, 0];
+ assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+ assert_eq!(buffer, [3, 4]);
+ });
}
#[test]
fn test_buffered_reader_seek_underflow() {
- use futures::executor::block_on;
- use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom};
- use std::io;
-
// gimmick reader that yields its position modulo 256 for each byte
struct PositionReader {
- pos: u64
+ pos: u64,
}
impl io::Read for PositionReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
@@ -172,32 +256,31 @@ fn test_buffered_reader_seek_underflow() {
self.pos = self.pos.wrapping_add(n as u64);
}
SeekFrom::End(n) => {
- self.pos = u64::max_value().wrapping_add(n as u64);
+ self.pos = u64::MAX.wrapping_add(n as u64);
}
}
Ok(self.pos)
}
}
- let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
- assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
- assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
- // the following seek will require two underlying seeks
- let expected = 9_223_372_036_854_775_802;
- assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
- assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
- // seeking to 0 should empty the buffer.
- assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
- assert_eq!(reader.get_ref().get_ref().pos, expected);
+ block_on(async {
+ let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+ pin_mut!(reader);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
+ assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+ // the following seek will require two underlying seeks
+ let expected = 9_223_372_036_854_775_802;
+ assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
+ assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+ // seeking to 0 should empty the buffer.
+ assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
+ assert_eq!(reader.get_ref().get_ref().pos, expected);
+ });
}
#[test]
fn test_short_reads() {
- use futures::executor::block_on;
- use futures::io::{AsyncReadExt, AllowStdIo, BufReader};
- use std::io;
-
/// A dummy reader intended at testing short-reads propagation.
struct ShortReader {
lengths: Vec<usize>,
@@ -213,24 +296,22 @@ fn test_short_reads() {
}
}
- let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
- let mut reader = BufReader::new(AllowStdIo::new(inner));
- let mut buf = [0, 0];
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
- assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ block_on(async {
+ let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+ let mut reader = BufReader::new(AllowStdIo::new(inner));
+ let mut buf = [0, 0];
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+ });
}
#[test]
fn maybe_pending() {
- use futures::io::{AsyncReadExt, BufReader};
- use util::run;
- use maybe_pending::MaybePending;
-
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
@@ -268,10 +349,6 @@ fn maybe_pending() {
#[test]
fn maybe_pending_buf_read() {
- use futures::io::{AsyncBufReadExt, BufReader};
- use util::run;
- use maybe_pending::MaybePending;
-
let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
let mut reader = BufReader::with_capacity(2, inner);
let mut v = Vec::new();
@@ -291,68 +368,65 @@ fn maybe_pending_buf_read() {
// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
#[test]
fn maybe_pending_seek() {
- use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader,
- Cursor, SeekFrom
- };
- use futures::task::{Context,Poll};
- use std::io;
- use std::pin::Pin;
- use util::run;
- pub struct MaybePendingSeek<'a> {
+ #[pin_project]
+ struct MaybePendingSeek<'a> {
+ #[pin]
inner: Cursor<&'a [u8]>,
ready: bool,
}
impl<'a> MaybePendingSeek<'a> {
- pub fn new(inner: &'a [u8]) -> Self {
+ fn new(inner: &'a [u8]) -> Self {
Self { inner: Cursor::new(inner), ready: true }
}
}
impl AsyncRead for MaybePendingSeek<'_> {
- fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
- -> Poll<io::Result<usize>>
- {
- Pin::new(&mut self.inner).poll_read(cx, buf)
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_read(cx, buf)
}
}
impl AsyncBufRead for MaybePendingSeek<'_> {
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
- -> Poll<io::Result<&[u8]>>
- {
- let this: *mut Self = &mut *self as *mut _;
- Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ self.project().inner.poll_fill_buf(cx)
}
- fn consume(mut self: Pin<&mut Self>, amt: usize) {
- Pin::new(&mut self.inner).consume(amt)
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.project().inner.consume(amt)
}
}
impl AsyncSeek for MaybePendingSeek<'_> {
- fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
- -> Poll<io::Result<u64>>
- {
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
if self.ready {
- self.ready = false;
- Pin::new(&mut self.inner).poll_seek(cx, pos)
+ *self.as_mut().project().ready = false;
+ self.project().inner.poll_seek(cx, pos)
} else {
- self.ready = true;
+ *self.project().ready = true;
Poll::Pending
}
}
}
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
- let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+ let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+ pin_mut!(reader);
assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
- assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
- assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
Pin::new(&mut reader).consume(1);
assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
}
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
index d58a6d8..b264cd5 100644
--- a/tests/io_buf_writer.rs
+++ b/tests/io_buf_writer.rs
@@ -1,67 +1,59 @@
-mod maybe_pending {
- use futures::io::AsyncWrite;
- use futures::task::{Context, Poll};
- use std::io;
- use std::pin::Pin;
-
- pub struct MaybePending {
- pub inner: Vec<u8>,
- ready: bool,
- }
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+ AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom,
+};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::io;
+use std::pin::Pin;
+
+struct MaybePending {
+ inner: Vec<u8>,
+ ready: bool,
+}
- impl MaybePending {
- pub fn new(inner: Vec<u8>) -> Self {
- Self { inner, ready: false }
- }
+impl MaybePending {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner, ready: false }
}
+}
- impl AsyncWrite for MaybePending {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- if self.ready {
- self.ready = false;
- Pin::new(&mut self.inner).poll_write(cx, buf)
- } else {
- self.ready = true;
- Poll::Pending
- }
+impl AsyncWrite for MaybePending {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready = true;
+ Poll::Pending
}
+ }
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_flush(cx)
- }
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_close(cx)
- }
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
}
}
-mod util {
- use futures::future::Future;
-
- pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures::future::FutureExt;
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
}
}
}
#[test]
fn buf_writer() {
- use futures::executor::block_on;
- use futures::io::{AsyncWriteExt, BufWriter};
-
let mut writer = BufWriter::with_capacity(2, Vec::new());
block_on(writer.write(&[0, 1])).unwrap();
@@ -104,9 +96,6 @@ fn buf_writer() {
#[test]
fn buf_writer_inner_flushes() {
- use futures::executor::block_on;
- use futures::io::{AsyncWriteExt, BufWriter};
-
let mut w = BufWriter::with_capacity(3, Vec::new());
block_on(w.write(&[0, 1])).unwrap();
assert_eq!(*w.get_ref(), []);
@@ -117,9 +106,6 @@ fn buf_writer_inner_flushes() {
#[test]
fn buf_writer_seek() {
- use futures::executor::block_on;
- use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
-
// FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
// use `Vec::new` instead of `vec![0; 8]`.
let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
@@ -135,11 +121,6 @@ fn buf_writer_seek() {
#[test]
fn maybe_pending_buf_writer() {
- use futures::io::{AsyncWriteExt, BufWriter};
-
- use maybe_pending::MaybePending;
- use util::run;
-
let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
run(writer.write(&[0, 1])).unwrap();
@@ -182,11 +163,6 @@ fn maybe_pending_buf_writer() {
#[test]
fn maybe_pending_buf_writer_inner_flushes() {
- use futures::io::{AsyncWriteExt, BufWriter};
-
- use maybe_pending::MaybePending;
- use util::run;
-
let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
run(w.write(&[0, 1])).unwrap();
assert_eq!(&w.get_ref().inner, &[]);
@@ -197,13 +173,6 @@ fn maybe_pending_buf_writer_inner_flushes() {
#[test]
fn maybe_pending_buf_writer_seek() {
- use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
- use futures::task::{Context, Poll};
- use std::io;
- use std::pin::Pin;
-
- use util::run;
-
struct MaybePendingSeek {
inner: Cursor<Vec<u8>>,
ready_write: bool,
@@ -241,9 +210,11 @@ fn maybe_pending_buf_writer_seek() {
}
impl AsyncSeek for MaybePendingSeek {
- fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
- -> Poll<io::Result<u64>>
- {
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
if self.ready_seek {
self.ready_seek = false;
Pin::new(&mut self.inner).poll_seek(cx, pos)
diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs
index 4ba6342..435ea5a 100644
--- a/tests/io_cursor.rs
+++ b/tests/io_cursor.rs
@@ -1,13 +1,14 @@
+use assert_matches::assert_matches;
+use futures::executor::block_on;
+use futures::future::lazy;
+use futures::io::{AsyncWrite, Cursor};
+use futures::task::Poll;
+use std::pin::Pin;
+
#[test]
fn cursor_asyncwrite_vec() {
- use assert_matches::assert_matches;
- use futures::future::lazy;
- use futures::io::{AsyncWrite, Cursor};
- use futures::task::Poll;
- use std::pin::Pin;
-
let mut cursor = Cursor::new(vec![0; 5]);
- futures::executor::block_on(lazy(|cx| {
+ block_on(lazy(|cx| {
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2)));
@@ -18,14 +19,8 @@ fn cursor_asyncwrite_vec() {
#[test]
fn cursor_asyncwrite_box() {
- use assert_matches::assert_matches;
- use futures::future::lazy;
- use futures::io::{AsyncWrite, Cursor};
- use futures::task::Poll;
- use std::pin::Pin;
-
let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice());
- futures::executor::block_on(lazy(|cx| {
+ block_on(lazy(|cx| {
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1)));
diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs
new file mode 100644
index 0000000..b483e0f
--- /dev/null
+++ b/tests/io_line_writer.rs
@@ -0,0 +1,73 @@
+use futures::executor::block_on;
+use futures::io::{AsyncWriteExt, LineWriter};
+use std::io;
+
+#[test]
+fn line_writer() {
+ let mut writer = LineWriter::new(Vec::new());
+
+ block_on(writer.write(&[0])).unwrap();
+ assert_eq!(*writer.get_ref(), []);
+
+ block_on(writer.write(&[1])).unwrap();
+ assert_eq!(*writer.get_ref(), []);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]);
+
+ block_on(writer.write(&[3, b'\n'])).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']);
+}
+
+#[test]
+fn line_vectored() {
+ let mut line_writer = LineWriter::new(Vec::new());
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"\n"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"a"),
+ ]))
+ .unwrap(),
+ 2
+ );
+ assert_eq!(line_writer.get_ref(), b"\n");
+
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"b"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"a"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"c"),
+ ]))
+ .unwrap(),
+ 3
+ );
+ assert_eq!(line_writer.get_ref(), b"\n");
+ block_on(line_writer.flush()).unwrap();
+ assert_eq!(line_writer.get_ref(), b"\nabac");
+ assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0);
+
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ ]))
+ .unwrap(),
+ 0
+ );
+
+ assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3);
+ assert_eq!(line_writer.get_ref(), b"\nabaca\nb");
+}
diff --git a/tests/io_lines.rs b/tests/io_lines.rs
index 2552c7c..5ce01a6 100644
--- a/tests/io_lines.rs
+++ b/tests/io_lines.rs
@@ -1,32 +1,34 @@
-mod util {
- use futures::future::Future;
-
- pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures_test::task::noop_context;
- use futures::task::Poll;
- use futures::future::FutureExt;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
}
}
}
-#[test]
-fn lines() {
- use futures::executor::block_on;
- use futures::stream::StreamExt;
- use futures::io::{AsyncBufReadExt, Cursor};
+macro_rules! block_on_next {
+ ($expr:expr) => {
+ block_on($expr.next()).unwrap().unwrap()
+ };
+}
- macro_rules! block_on_next {
- ($expr:expr) => {
- block_on($expr.next()).unwrap().unwrap()
- };
- }
+macro_rules! run_next {
+ ($expr:expr) => {
+ run($expr.next()).unwrap().unwrap()
+ };
+}
+#[test]
+fn lines() {
let buf = Cursor::new(&b"12\r"[..]);
let mut s = buf.lines();
assert_eq!(block_on_next!(s), "12\r".to_string());
@@ -41,22 +43,8 @@ fn lines() {
#[test]
fn maybe_pending() {
- use futures::stream::{self, StreamExt, TryStreamExt};
- use futures::io::AsyncBufReadExt;
- use futures_test::io::AsyncReadTestExt;
-
- use util::run;
-
- macro_rules! run_next {
- ($expr:expr) => {
- run($expr.next()).unwrap().unwrap()
- };
- }
-
- let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
- .map(Ok)
- .into_async_read()
- .interleave_pending();
+ let buf =
+ stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending();
let mut s = buf.lines();
assert_eq!(run_next!(s), "12\r".to_string());
assert!(run(s.next()).is_none());
diff --git a/tests/io_read.rs b/tests/io_read.rs
index 5902ad0..d39a6ea 100644
--- a/tests/io_read.rs
+++ b/tests/io_read.rs
@@ -1,27 +1,26 @@
-mod mock_reader {
- use futures::io::AsyncRead;
- use std::io;
- use std::pin::Pin;
- use std::task::{Context, Poll};
+use futures::io::AsyncRead;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
- pub struct MockReader {
- fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
- }
+struct MockReader {
+ fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+}
- impl MockReader {
- pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- Self { fun: Box::new(fun) }
- }
+impl MockReader {
+ fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ Self { fun: Box::new(fun) }
}
+}
- impl AsyncRead for MockReader {
- fn poll_read(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &mut [u8]
- ) -> Poll<io::Result<usize>> {
- (self.get_mut().fun)(buf)
- }
+impl AsyncRead for MockReader {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
}
}
@@ -29,14 +28,6 @@ mod mock_reader {
/// calls `poll_read` with an empty slice if no buffers are provided.
#[test]
fn read_vectored_no_buffers() {
- use futures::io::AsyncRead;
- use futures_test::task::panic_context;
- use std::io;
- use std::pin::Pin;
- use std::task::Poll;
-
- use mock_reader::MockReader;
-
let mut reader = MockReader::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -53,14 +44,6 @@ fn read_vectored_no_buffers() {
/// calls `poll_read` with the first non-empty buffer.
#[test]
fn read_vectored_first_non_empty() {
- use futures::io::AsyncRead;
- use futures_test::task::panic_context;
- use std::io;
- use std::pin::Pin;
- use std::task::Poll;
-
- use mock_reader::MockReader;
-
let mut reader = MockReader::new(|buf| {
assert_eq!(buf.len(), 4);
buf.copy_from_slice(b"four");
diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs
index bd4b36d..6582e50 100644
--- a/tests/io_read_exact.rs
+++ b/tests/io_read_exact.rs
@@ -1,14 +1,14 @@
+use futures::executor::block_on;
+use futures::io::AsyncReadExt;
+
#[test]
fn read_exact() {
- use futures::executor::block_on;
- use futures::io::AsyncReadExt;
-
let mut reader: &[u8] = &[1, 2, 3, 4, 5];
let mut out = [0u8; 3];
let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out
assert!(res.is_ok());
- assert_eq!(out, [1,2,3]);
+ assert_eq!(out, [1, 2, 3]);
assert_eq!(reader.len(), 2);
let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left
diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs
index 51e8126..88a8779 100644
--- a/tests/io_read_line.rs
+++ b/tests/io_read_line.rs
@@ -1,8 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
#[test]
fn read_line() {
- use futures::executor::block_on;
- use futures::io::{AsyncBufReadExt, Cursor};
-
let mut buf = Cursor::new(b"12");
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
@@ -22,34 +36,13 @@ fn read_line() {
#[test]
fn maybe_pending() {
- use futures::future::Future;
-
- fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures::future::FutureExt;
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
- }
-
- use futures::stream::{self, StreamExt, TryStreamExt};
- use futures::io::AsyncBufReadExt;
- use futures_test::io::AsyncReadTestExt;
-
let mut buf = b"12".interleave_pending();
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
assert_eq!(v, "12");
- let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]])
- .map(Ok)
- .into_async_read()
- .interleave_pending();
+ let mut buf =
+ stream::iter(vec![&b"12"[..], &b"\n\n"[..]]).map(Ok).into_async_read().interleave_pending();
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
assert_eq!(v, "12\n");
diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs
index 892d463..7122511 100644
--- a/tests/io_read_to_end.rs
+++ b/tests/io_read_to_end.rs
@@ -1,4 +1,5 @@
use futures::{
+ executor::block_on,
io::{self, AsyncRead, AsyncReadExt},
task::{Context, Poll},
};
@@ -12,7 +13,7 @@ fn issue2310() {
}
impl MyRead {
- pub fn new() -> Self {
+ fn new() -> Self {
MyRead { first: false }
}
}
@@ -39,7 +40,7 @@ fn issue2310() {
}
impl VecWrapper {
- pub fn new() -> Self {
+ fn new() -> Self {
VecWrapper { inner: Vec::new() }
}
}
@@ -55,7 +56,7 @@ fn issue2310() {
}
}
- futures::executor::block_on(async {
+ block_on(async {
let mut vec = VecWrapper::new();
let mut read = MyRead::new();
diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs
index 2e9c00a..ae6aaa2 100644
--- a/tests/io_read_to_string.rs
+++ b/tests/io_read_to_string.rs
@@ -1,8 +1,13 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
#[test]
fn read_to_string() {
- use futures::executor::block_on;
- use futures::io::{AsyncReadExt, Cursor};
-
let mut c = Cursor::new(&b""[..]);
let mut v = String::new();
assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
@@ -20,16 +25,7 @@ fn read_to_string() {
#[test]
fn interleave_pending() {
- use futures::future::Future;
- use futures::stream::{self, StreamExt, TryStreamExt};
- use futures::io::AsyncReadExt;
- use futures_test::io::AsyncReadTestExt;
-
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures::future::FutureExt;
- use futures_test::task::noop_context;
- use futures::task::Poll;
-
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs
index 6fa22ee..71f857f 100644
--- a/tests/io_read_until.rs
+++ b/tests/io_read_until.rs
@@ -1,8 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
#[test]
fn read_until() {
- use futures::executor::block_on;
- use futures::io::{AsyncBufReadExt, Cursor};
-
let mut buf = Cursor::new(b"12");
let mut v = Vec::new();
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
@@ -22,25 +36,6 @@ fn read_until() {
#[test]
fn maybe_pending() {
- use futures::future::Future;
-
- fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- use futures::future::FutureExt;
- use futures_test::task::noop_context;
- use futures::task::Poll;
-
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
- }
-
- use futures::stream::{self, StreamExt, TryStreamExt};
- use futures::io::AsyncBufReadExt;
- use futures_test::io::AsyncReadTestExt;
-
let mut buf = b"12".interleave_pending();
let mut v = Vec::new();
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
diff --git a/tests/io_write.rs b/tests/io_write.rs
index 363f32b..6af2755 100644
--- a/tests/io_write.rs
+++ b/tests/io_write.rs
@@ -1,35 +1,34 @@
-mod mock_writer {
- use futures::io::AsyncWrite;
- use std::io;
- use std::pin::Pin;
- use std::task::{Context, Poll};
+use futures::io::AsyncWrite;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
- pub struct MockWriter {
- fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
- }
+struct MockWriter {
+ fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
+}
- impl MockWriter {
- pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- Self { fun: Box::new(fun) }
- }
+impl MockWriter {
+ fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ Self { fun: Box::new(fun) }
}
+}
- impl AsyncWrite for MockWriter {
- fn poll_write(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- (self.get_mut().fun)(buf)
- }
+impl AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- panic!()
- }
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- panic!()
- }
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
}
}
@@ -37,14 +36,6 @@ mod mock_writer {
/// calls `poll_write` with an empty slice if no buffers are provided.
#[test]
fn write_vectored_no_buffers() {
- use futures::io::AsyncWrite;
- use futures_test::task::panic_context;
- use std::io;
- use std::pin::Pin;
- use std::task::Poll;
-
- use mock_writer::MockWriter;
-
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -61,24 +52,12 @@ fn write_vectored_no_buffers() {
/// calls `poll_write` with the first non-empty buffer.
#[test]
fn write_vectored_first_non_empty() {
- use futures::io::AsyncWrite;
- use futures_test::task::panic_context;
- use std::io;
- use std::pin::Pin;
- use std::task::Poll;
-
- use mock_writer::MockWriter;
-
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"four");
Poll::Ready(Ok(4))
});
let cx = &mut panic_context();
- let bufs = &mut [
- io::IoSlice::new(&[]),
- io::IoSlice::new(&[]),
- io::IoSlice::new(b"four")
- ];
+ let bufs = &mut [io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(b"four")];
let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
let res = res.map_err(|e| e.kind());
diff --git a/tests/join_all.rs b/tests/join_all.rs
deleted file mode 100644
index c322e58..0000000
--- a/tests/join_all.rs
+++ /dev/null
@@ -1,51 +0,0 @@
-mod util {
- use std::future::Future;
- use std::fmt::Debug;
-
- pub fn assert_done<T, F>(actual_fut: F, expected: T)
- where
- T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
- {
- use futures::executor::block_on;
-
- let output = block_on(actual_fut());
- assert_eq!(output, expected);
- }
-}
-
-#[test]
-fn collect_collects() {
- use futures_util::future::{join_all,ready};
-
- util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
- util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
- // REVIEW: should this be implemented?
- // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
-
- // TODO: needs more tests
-}
-
-#[test]
-fn join_all_iter_lifetime() {
- use futures_util::future::{join_all,ready};
- use std::future::Future;
- // In futures-rs version 0.1, this function would fail to typecheck due to an overly
- // conservative type parameterization of `JoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
- let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
- Box::new(join_all(iter))
- }
-
- util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]);
-}
-
-#[test]
-fn join_all_from_iter() {
- use futures_util::future::{JoinAll,ready};
-
- util::assert_done(
- || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
- vec![1, 2],
- )
-}
diff --git a/tests/mutex.rs b/tests/lock_mutex.rs
index 68e0301..c92ef50 100644
--- a/tests/mutex.rs
+++ b/tests/lock_mutex.rs
@@ -1,9 +1,15 @@
+use futures::channel::mpsc;
+use futures::executor::{block_on, ThreadPool};
+use futures::future::{ready, FutureExt};
+use futures::lock::Mutex;
+use futures::stream::StreamExt;
+use futures::task::{Context, SpawnExt};
+use futures_test::future::FutureTestExt;
+use futures_test::task::{new_count_waker, panic_context};
+use std::sync::Arc;
+
#[test]
fn mutex_acquire_uncontested() {
- use futures::future::FutureExt;
- use futures::lock::Mutex;
- use futures_test::task::panic_context;
-
let mutex = Mutex::new(());
for _ in 0..10 {
assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready());
@@ -12,11 +18,6 @@ fn mutex_acquire_uncontested() {
#[test]
fn mutex_wakes_waiters() {
- use futures::future::FutureExt;
- use futures::lock::Mutex;
- use futures::task::Context;
- use futures_test::task::{new_count_waker, panic_context};
-
let mutex = Mutex::new(());
let (waker, counter) = new_count_waker();
let lock = mutex.lock().poll_unpin(&mut panic_context());
@@ -33,22 +34,11 @@ fn mutex_wakes_waiters() {
assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn mutex_contested() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future::ready;
- use futures::lock::Mutex;
- use futures::stream::StreamExt;
- use futures::task::SpawnExt;
- use futures_test::future::FutureTestExt;
- use std::sync::Arc;
-
let (tx, mut rx) = mpsc::unbounded();
- let pool = futures::executor::ThreadPool::builder()
- .pool_size(16)
- .create()
- .unwrap();
+ let pool = ThreadPool::builder().pool_size(16).create().unwrap();
let tx = Arc::new(tx);
let mutex = Arc::new(Mutex::new(0));
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index ca13163..3b082d2 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -1,25 +1,23 @@
+use futures::{
+ executor::block_on,
+ future::{self, FutureExt},
+ join, ready,
+ task::Poll,
+ try_join,
+};
+
#[test]
fn ready() {
- use futures::{
- executor::block_on,
- future,
- task::Poll,
- ready,
- };
-
block_on(future::poll_fn(|_| {
ready!(Poll::Ready(()),);
Poll::Ready(())
}))
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn poll() {
- use futures::{
- executor::block_on,
- future::FutureExt,
- poll,
- };
+ use futures::poll;
block_on(async {
let _ = poll!(async {}.boxed(),);
@@ -28,11 +26,6 @@ fn poll() {
#[test]
fn join() {
- use futures::{
- executor::block_on,
- join
- };
-
block_on(async {
let future1 = async { 1 };
let future2 = async { 2 };
@@ -42,12 +35,6 @@ fn join() {
#[test]
fn try_join() {
- use futures::{
- executor::block_on,
- future::FutureExt,
- try_join,
- };
-
block_on(async {
let future1 = async { 1 }.never_error();
let future2 = async { 2 }.never_error();
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index 2494306..34b78a3 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,11 +1,11 @@
+use futures::channel::oneshot;
+use futures::future::{FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+use std::thread;
+
#[test]
fn oneshot_send1() {
- use futures::channel::oneshot;
- use futures::future::TryFutureExt;
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
- use std::thread;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -17,12 +17,6 @@ fn oneshot_send1() {
#[test]
fn oneshot_send2() {
- use futures::channel::oneshot;
- use futures::future::TryFutureExt;
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
- use std::thread;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -33,12 +27,6 @@ fn oneshot_send2() {
#[test]
fn oneshot_send3() {
- use futures::channel::oneshot;
- use futures::future::TryFutureExt;
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
- use std::thread;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -49,11 +37,6 @@ fn oneshot_send3() {
#[test]
fn oneshot_drop_tx1() {
- use futures::channel::oneshot;
- use futures::future::FutureExt;
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -65,12 +48,6 @@ fn oneshot_drop_tx1() {
#[test]
fn oneshot_drop_tx2() {
- use futures::channel::oneshot;
- use futures::future::FutureExt;
- use futures_test::future::FutureTestExt;
- use std::sync::mpsc;
- use std::thread;
-
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -83,9 +60,19 @@ fn oneshot_drop_tx2() {
#[test]
fn oneshot_drop_rx() {
- use futures::channel::oneshot;
-
let (tx, rx) = oneshot::channel::<i32>();
drop(rx);
assert_eq!(Err(2), tx.send(2));
}
+
+#[test]
+fn oneshot_debug() {
+ let (tx, rx) = oneshot::channel::<i32>();
+ assert_eq!(format!("{:?}", tx), "Sender { complete: false }");
+ assert_eq!(format!("{:?}", rx), "Receiver { complete: false }");
+ drop(rx);
+ assert_eq!(format!("{:?}", tx), "Sender { complete: true }");
+ let (tx, rx) = oneshot::channel::<i32>();
+ drop(tx);
+ assert_eq!(format!("{:?}", rx), "Receiver { complete: true }");
+}
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index 9aa3636..afba8f2 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -1,18 +1,15 @@
-mod assert_send_sync {
- use futures::stream::FuturesUnordered;
-
- pub trait AssertSendSync: Send + Sync {}
- impl AssertSendSync for FuturesUnordered<()> {}
-}
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Barrier};
+use std::thread;
#[test]
fn basic_usage() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::Poll;
-
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
@@ -41,12 +38,6 @@ fn basic_usage() {
#[test]
fn resolving_errors() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::Poll;
-
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
@@ -75,12 +66,6 @@ fn resolving_errors() {
#[test]
fn dropping_ready_queue() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::FuturesUnordered;
- use futures_test::task::noop_context;
-
block_on(future::lazy(move |_| {
let queue = FuturesUnordered::new();
let (mut tx1, rx1) = oneshot::channel::<()>();
@@ -108,12 +93,9 @@ fn dropping_ready_queue() {
#[test]
fn stress() {
- use futures::channel::oneshot;
- use futures::executor::block_on_stream;
- use futures::stream::FuturesUnordered;
- use std::sync::{Arc, Barrier};
- use std::thread;
-
+ #[cfg(miri)]
+ const ITER: usize = 30;
+ #[cfg(not(miri))]
const ITER: usize = 300;
for i in 0..ITER {
@@ -157,12 +139,6 @@ fn stress() {
#[test]
fn panicking_future_dropped() {
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::Poll;
- use std::panic::{self, AssertUnwindSafe};
-
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
diff --git a/tests/recurse.rs b/tests/recurse.rs
index a151f1b..f06524f 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -1,10 +1,11 @@
+use futures::executor::block_on;
+use futures::future::{self, BoxFuture, FutureExt};
+use std::sync::mpsc;
+use std::thread;
+
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn lots() {
- use futures::executor::block_on;
- use futures::future::{self, FutureExt, BoxFuture};
- use std::sync::mpsc;
- use std::thread;
-
#[cfg(not(futures_sanitizer))]
const N: i32 = 1_000;
#[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136
@@ -20,8 +21,6 @@ fn lots() {
}
let (tx, rx) = mpsc::channel();
- thread::spawn(|| {
- block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))
- });
+ thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())));
assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap());
}
diff --git a/tests/sink.rs b/tests/sink.rs
index 597ed34..dc826bd 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -1,264 +1,221 @@
-mod sassert_next {
- use futures::stream::{Stream, StreamExt};
- use futures::task::Poll;
- use futures_test::task::panic_context;
- use std::fmt;
-
- pub fn sassert_next<S>(s: &mut S, item: S::Item)
- where
- S: Stream + Unpin,
- S::Item: Eq + fmt::Debug,
- {
- match s.poll_next_unpin(&mut panic_context()) {
- Poll::Ready(None) => panic!("stream is at its end"),
- Poll::Ready(Some(e)) => assert_eq!(e, item),
- Poll::Pending => panic!("stream wasn't ready"),
- }
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
+use futures::never::Never;
+use futures::ready;
+use futures::sink::{self, Sink, SinkErrInto, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{self, ArcWake, Context, Poll, Waker};
+use futures_test::task::panic_context;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::fmt;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+
+fn sassert_next<S>(s: &mut S, item: S::Item)
+where
+ S: Stream + Unpin,
+ S::Item: Eq + fmt::Debug,
+{
+ match s.poll_next_unpin(&mut panic_context()) {
+ Poll::Ready(None) => panic!("stream is at its end"),
+ Poll::Ready(Some(e)) => assert_eq!(e, item),
+ Poll::Pending => panic!("stream wasn't ready"),
}
}
-mod unwrap {
- use futures::task::Poll;
- use std::fmt;
-
- pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
- match x {
- Poll::Ready(Ok(x)) => x,
- Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
- Poll::Pending => panic!("Poll::Pending"),
- }
+fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
+ match x {
+ Poll::Ready(Ok(x)) => x,
+ Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
+ Poll::Pending => panic!("Poll::Pending"),
}
}
-mod flag_cx {
- use futures::task::{self, ArcWake, Context};
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, Ordering};
-
- // An Unpark struct that records unpark events for inspection
- pub struct Flag(AtomicBool);
+// An Unpark struct that records unpark events for inspection
+struct Flag(AtomicBool);
- impl Flag {
- pub fn new() -> Arc<Self> {
- Arc::new(Self(AtomicBool::new(false)))
- }
-
- pub fn take(&self) -> bool {
- self.0.swap(false, Ordering::SeqCst)
- }
+impl Flag {
+ fn new() -> Arc<Self> {
+ Arc::new(Self(AtomicBool::new(false)))
+ }
- pub fn set(&self, v: bool) {
- self.0.store(v, Ordering::SeqCst)
- }
+ fn take(&self) -> bool {
+ self.0.swap(false, Ordering::SeqCst)
}
- impl ArcWake for Flag {
- fn wake_by_ref(arc_self: &Arc<Self>) {
- arc_self.set(true)
- }
+ fn set(&self, v: bool) {
+ self.0.store(v, Ordering::SeqCst)
}
+}
- pub fn flag_cx<F, R>(f: F) -> R
- where
- F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
- {
- let flag = Flag::new();
- let waker = task::waker_ref(&flag);
- let cx = &mut Context::from_waker(&waker);
- f(flag.clone(), cx)
+impl ArcWake for Flag {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.set(true)
}
}
-mod start_send_fut {
- use futures::future::Future;
- use futures::ready;
- use futures::sink::Sink;
- use futures::task::{Context, Poll};
- use std::pin::Pin;
+fn flag_cx<F, R>(f: F) -> R
+where
+ F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
+{
+ let flag = Flag::new();
+ let waker = task::waker_ref(&flag);
+ let cx = &mut Context::from_waker(&waker);
+ f(flag.clone(), cx)
+}
- // Sends a value on an i32 channel sink
- pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
+// Sends a value on an i32 channel sink
+struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
- impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
- pub fn new(sink: S, item: Item) -> Self {
- Self(Some(sink), Some(item))
- }
+impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
+ fn new(sink: S, item: Item) -> Self {
+ Self(Some(sink), Some(item))
}
+}
- impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
- type Output = Result<S, S::Error>;
+impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
+ type Output = Result<S, S::Error>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self(inner, item) = self.get_mut();
- {
- let mut inner = inner.as_mut().unwrap();
- ready!(Pin::new(&mut inner).poll_ready(cx))?;
- Pin::new(&mut inner).start_send(item.take().unwrap())?;
- }
- Poll::Ready(Ok(inner.take().unwrap()))
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self(inner, item) = self.get_mut();
+ {
+ let mut inner = inner.as_mut().unwrap();
+ ready!(Pin::new(&mut inner).poll_ready(cx))?;
+ Pin::new(&mut inner).start_send(item.take().unwrap())?;
}
+ Poll::Ready(Ok(inner.take().unwrap()))
}
}
-mod manual_flush {
- use futures::sink::Sink;
- use futures::task::{Context, Poll, Waker};
- use std::mem;
- use std::pin::Pin;
-
- // Immediately accepts all requests to start pushing, but completion is managed
- // by manually flushing
- pub struct ManualFlush<T: Unpin> {
- data: Vec<T>,
- waiting_tasks: Vec<Waker>,
- }
+// Immediately accepts all requests to start pushing, but completion is managed
+// by manually flushing
+struct ManualFlush<T: Unpin> {
+ data: Vec<T>,
+ waiting_tasks: Vec<Waker>,
+}
- impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
- type Error = ();
+impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
+ type Error = ();
- fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
- fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
- if let Some(item) = item {
- self.data.push(item);
- } else {
- self.force_flush();
- }
- Ok(())
+ fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
+ if let Some(item) = item {
+ self.data.push(item);
+ } else {
+ self.force_flush();
}
+ Ok(())
+ }
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.data.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- self.waiting_tasks.push(cx.waker().clone());
- Poll::Pending
- }
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.data.is_empty() {
+ Poll::Ready(Ok(()))
+ } else {
+ self.waiting_tasks.push(cx.waker().clone());
+ Poll::Pending
}
+ }
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.poll_flush(cx)
- }
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
}
+}
- impl<T: Unpin> ManualFlush<T> {
- pub fn new() -> Self {
- Self {
- data: Vec::new(),
- waiting_tasks: Vec::new(),
- }
- }
+impl<T: Unpin> ManualFlush<T> {
+ fn new() -> Self {
+ Self { data: Vec::new(), waiting_tasks: Vec::new() }
+ }
- pub fn force_flush(&mut self) -> Vec<T> {
- for task in self.waiting_tasks.drain(..) {
- task.wake()
- }
- mem::replace(&mut self.data, Vec::new())
+ fn force_flush(&mut self) -> Vec<T> {
+ for task in self.waiting_tasks.drain(..) {
+ task.wake()
}
+ mem::replace(&mut self.data, Vec::new())
}
}
-mod allowance {
- use futures::sink::Sink;
- use futures::task::{Context, Poll, Waker};
- use std::cell::{Cell, RefCell};
- use std::pin::Pin;
- use std::rc::Rc;
+struct ManualAllow<T: Unpin> {
+ data: Vec<T>,
+ allow: Rc<Allow>,
+}
- pub struct ManualAllow<T: Unpin> {
- pub data: Vec<T>,
- allow: Rc<Allow>,
- }
+struct Allow {
+ flag: Cell<bool>,
+ tasks: RefCell<Vec<Waker>>,
+}
- pub struct Allow {
- flag: Cell<bool>,
- tasks: RefCell<Vec<Waker>>,
+impl Allow {
+ fn new() -> Self {
+ Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
}
- impl Allow {
- pub fn new() -> Self {
- Self {
- flag: Cell::new(false),
- tasks: RefCell::new(Vec::new()),
- }
- }
-
- pub fn check(&self, cx: &mut Context<'_>) -> bool {
- if self.flag.get() {
- true
- } else {
- self.tasks.borrow_mut().push(cx.waker().clone());
- false
- }
- }
-
- pub fn start(&self) {
- self.flag.set(true);
- let mut tasks = self.tasks.borrow_mut();
- for task in tasks.drain(..) {
- task.wake();
- }
+ fn check(&self, cx: &mut Context<'_>) -> bool {
+ if self.flag.get() {
+ true
+ } else {
+ self.tasks.borrow_mut().push(cx.waker().clone());
+ false
}
}
- impl<T: Unpin> Sink<T> for ManualAllow<T> {
- type Error = ();
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.allow.check(cx) {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
+ fn start(&self) {
+ self.flag.set(true);
+ let mut tasks = self.tasks.borrow_mut();
+ for task in tasks.drain(..) {
+ task.wake();
}
+ }
+}
- fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
- self.data.push(item);
- Ok(())
- }
+impl<T: Unpin> Sink<T> for ManualAllow<T> {
+ type Error = ();
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.allow.check(cx) {
Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
}
+ }
- fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ self.data.push(item);
+ Ok(())
}
- pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
- let allow = Rc::new(Allow::new());
- let manual_allow = ManualAllow {
- data: Vec::new(),
- allow: allow.clone(),
- };
- (manual_allow, allow)
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
}
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
+ let allow = Rc::new(Allow::new());
+ let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
+ (manual_allow, allow)
}
#[test]
fn either_sink() {
- use futures::sink::{Sink, SinkExt};
- use std::collections::VecDeque;
- use std::pin::Pin;
-
- let mut s = if true {
- Vec::<i32>::new().left_sink()
- } else {
- VecDeque::<i32>::new().right_sink()
- };
+ let mut s =
+ if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
Pin::new(&mut s).start_send(0).unwrap();
}
#[test]
fn vec_sink() {
- use futures::executor::block_on;
- use futures::sink::{Sink, SinkExt};
- use std::pin::Pin;
-
let mut v = Vec::new();
Pin::new(&mut v).start_send(0).unwrap();
Pin::new(&mut v).start_send(1).unwrap();
@@ -269,10 +226,6 @@ fn vec_sink() {
#[test]
fn vecdeque_sink() {
- use futures::sink::Sink;
- use std::collections::VecDeque;
- use std::pin::Pin;
-
let mut deque = VecDeque::new();
Pin::new(&mut deque).start_send(2).unwrap();
Pin::new(&mut deque).start_send(3).unwrap();
@@ -284,9 +237,6 @@ fn vecdeque_sink() {
#[test]
fn send() {
- use futures::executor::block_on;
- use futures::sink::SinkExt;
-
let mut v = Vec::new();
block_on(v.send(0)).unwrap();
@@ -301,10 +251,6 @@ fn send() {
#[test]
fn send_all() {
- use futures::executor::block_on;
- use futures::sink::SinkExt;
- use futures::stream::{self, StreamExt};
-
let mut v = Vec::new();
block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
@@ -321,15 +267,6 @@ fn send_all() {
// channel is full
#[test]
fn mpsc_blocking_start_send() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future::{self, FutureExt};
-
- use start_send_fut::StartSendFut;
- use flag_cx::flag_cx;
- use sassert_next::sassert_next;
- use unwrap::unwrap;
-
let (mut tx, mut rx) = mpsc::channel::<i32>(0);
block_on(future::lazy(|_| {
@@ -351,19 +288,9 @@ fn mpsc_blocking_start_send() {
// test `flush` by using `with` to make the first insertion into a sink block
// until a oneshot is completed
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn with_flush() {
- use futures::channel::oneshot;
- use futures::executor::block_on;
- use futures::future::{self, FutureExt, TryFutureExt};
- use futures::never::Never;
- use futures::sink::{Sink, SinkExt};
- use std::mem;
- use std::pin::Pin;
-
- use flag_cx::flag_cx;
- use unwrap::unwrap;
-
let (tx, rx) = oneshot::channel();
let mut block = rx.boxed();
let mut sink = Vec::new().with(|elem| {
@@ -390,11 +317,6 @@ fn with_flush() {
// test simple use of with to change data
#[test]
fn with_as_map() {
- use futures::executor::block_on;
- use futures::future;
- use futures::never::Never;
- use futures::sink::SinkExt;
-
let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -405,10 +327,6 @@ fn with_as_map() {
// test simple use of with_flat_map
#[test]
fn with_flat_map() {
- use futures::executor::block_on;
- use futures::sink::SinkExt;
- use futures::stream::{self, StreamExt};
-
let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -421,16 +339,6 @@ fn with_flat_map() {
// Regression test for the issue #1834.
#[test]
fn with_propagates_poll_ready() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future;
- use futures::sink::{Sink, SinkExt};
- use futures::task::Poll;
- use std::pin::Pin;
-
- use flag_cx::flag_cx;
- use sassert_next::sassert_next;
-
let (tx, mut rx) = mpsc::channel::<i32>(0);
let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
@@ -457,14 +365,6 @@ fn with_propagates_poll_ready() {
// but doesn't claim to be flushed until the underlying sink is
#[test]
fn with_flush_propagate() {
- use futures::future::{self, FutureExt};
- use futures::sink::{Sink, SinkExt};
- use std::pin::Pin;
-
- use manual_flush::ManualFlush;
- use flag_cx::flag_cx;
- use unwrap::unwrap;
-
let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
flag_cx(|flag, cx| {
unwrap(Pin::new(&mut sink).poll_ready(cx));
@@ -486,21 +386,13 @@ fn with_flush_propagate() {
// test that `Clone` is implemented on `with` sinks
#[test]
fn with_implements_clone() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future;
- use futures::{SinkExt, StreamExt};
-
let (mut tx, rx) = mpsc::channel(5);
{
- let mut is_positive = tx
- .clone()
- .with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
+ let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
- let mut is_long = tx
- .clone()
- .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
+ let mut is_long =
+ tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
block_on(is_positive.clone().send(-1)).unwrap();
block_on(is_long.clone().send("123456")).unwrap();
@@ -512,18 +404,12 @@ fn with_implements_clone() {
block_on(tx.close()).unwrap();
- assert_eq!(
- block_on(rx.collect::<Vec<_>>()),
- vec![false, true, false, true, false]
- );
+ assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
}
// test that a buffer is a no-nop around a sink that always accepts sends
#[test]
fn buffer_noop() {
- use futures::executor::block_on;
- use futures::sink::SinkExt;
-
let mut sink = Vec::new().buffer(0);
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -539,15 +425,6 @@ fn buffer_noop() {
// and writing out when the underlying sink is ready
#[test]
fn buffer() {
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use futures::sink::SinkExt;
-
- use start_send_fut::StartSendFut;
- use flag_cx::flag_cx;
- use unwrap::unwrap;
- use allowance::manual_allow;
-
let (sink, allow) = manual_allow::<i32>();
let sink = sink.buffer(2);
@@ -567,10 +444,6 @@ fn buffer() {
#[test]
fn fanout_smoke() {
- use futures::executor::block_on;
- use futures::sink::SinkExt;
- use futures::stream::{self, StreamExt};
-
let sink1 = Vec::new();
let sink2 = Vec::new();
let mut sink = sink1.fanout(sink2);
@@ -582,16 +455,6 @@ fn fanout_smoke() {
#[test]
fn fanout_backpressure() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future::FutureExt;
- use futures::sink::SinkExt;
- use futures::stream::StreamExt;
-
- use start_send_fut::StartSendFut;
- use flag_cx::flag_cx;
- use unwrap::unwrap;
-
let (left_send, mut left_recv) = mpsc::channel(0);
let (right_send, mut right_recv) = mpsc::channel(0);
let sink = left_send.fanout(right_send);
@@ -624,12 +487,6 @@ fn fanout_backpressure() {
#[test]
fn sink_map_err() {
- use futures::channel::mpsc;
- use futures::sink::{Sink, SinkExt};
- use futures::task::Poll;
- use futures_test::task::panic_context;
- use std::pin::Pin;
-
{
let cx = &mut panic_context();
let (tx, _rx) = mpsc::channel(1);
@@ -639,20 +496,11 @@ fn sink_map_err() {
}
let tx = mpsc::channel(0).0;
- assert_eq!(
- Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()),
- Err(())
- );
+ assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
}
#[test]
fn sink_unfold() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future::poll_fn;
- use futures::sink::{self, Sink, SinkExt};
- use futures::task::Poll;
-
block_on(poll_fn(|cx| {
let (tx, mut rx) = mpsc::channel(1);
let unfold = sink::unfold((), |(), i: i32| {
@@ -685,14 +533,8 @@ fn sink_unfold() {
#[test]
fn err_into() {
- use futures::channel::mpsc;
- use futures::sink::{Sink, SinkErrInto, SinkExt};
- use futures::task::Poll;
- use futures_test::task::panic_context;
- use std::pin::Pin;
-
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
- pub struct ErrIntoTest;
+ struct ErrIntoTest;
impl From<mpsc::SendError> for ErrIntoTest {
fn from(_: mpsc::SendError) -> Self {
@@ -709,8 +551,5 @@ fn err_into() {
}
let tx = mpsc::channel(0).0;
- assert_eq!(
- Pin::new(&mut tx.sink_err_into()).start_send(()),
- Err(ErrIntoTest)
- );
+ assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
}
diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs
index 7d1fa43..e57b2d8 100644
--- a/tests/sink_fanout.rs
+++ b/tests/sink_fanout.rs
@@ -1,11 +1,11 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::join3;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+
#[test]
fn it_works() {
- use futures::channel::mpsc;
- use futures::executor::block_on;
- use futures::future::join3;
- use futures::sink::SinkExt;
- use futures::stream::{self, StreamExt};
-
let (tx1, rx1) = mpsc::channel(1);
let (tx2, rx2) = mpsc::channel(2);
let tx = tx1.fanout(tx2).sink_map_err(|_| ());
diff --git a/tests/split.rs b/tests/split.rs
deleted file mode 100644
index 86c2fc6..0000000
--- a/tests/split.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-#[test]
-fn test_split() {
- use futures::executor::block_on;
- use futures::sink::{Sink, SinkExt};
- use futures::stream::{self, Stream, StreamExt};
- use futures::task::{Context, Poll};
- use pin_project::pin_project;
- use std::pin::Pin;
-
- #[pin_project]
- struct Join<T, U> {
- #[pin]
- stream: T,
- #[pin]
- sink: U,
- }
-
- impl<T: Stream, U> Stream for Join<T, U> {
- type Item = T::Item;
-
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T::Item>> {
- self.project().stream.poll_next(cx)
- }
- }
-
- impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
- type Error = U::Error;
-
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.project().sink.poll_ready(cx)
- }
-
- fn start_send(
- self: Pin<&mut Self>,
- item: Item,
- ) -> Result<(), Self::Error> {
- self.project().sink.start_send(item)
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.project().sink.poll_flush(cx)
- }
-
- fn poll_close(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.project().sink.poll_close(cx)
- }
- }
-
- let mut dest: Vec<i32> = Vec::new();
- {
- let join = Join {
- stream: stream::iter(vec![10, 20, 30]),
- sink: &mut dest
- };
-
- let (sink, stream) = join.split();
- let join = sink.reunite(stream).expect("test_split: reunite error");
- let (mut sink, stream) = join.split();
- let mut stream = stream.map(Ok);
- block_on(sink.send_all(&mut stream)).unwrap();
- }
- assert_eq!(dest, vec![10, 20, 30]);
-}
diff --git a/tests/stream.rs b/tests/stream.rs
index 14b283d..71ec654 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,8 +1,18 @@
+use std::iter;
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::{self, Future};
+use futures::lock::Mutex;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use futures::{ready, FutureExt};
+use futures_test::task::noop_context;
+
#[test]
fn select() {
- use futures::executor::block_on;
- use futures::stream::{self, StreamExt};
-
fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
let a = stream::iter(a);
let b = stream::iter(b);
@@ -17,19 +27,12 @@ fn select() {
#[test]
fn flat_map() {
- use futures::stream::{self, StreamExt};
-
- futures::executor::block_on(async {
- let st = stream::iter(vec![
- stream::iter(0..=4u8),
- stream::iter(6..=10),
- stream::iter(0..=2),
- ]);
-
- let values: Vec<_> = st
- .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0)))
- .collect()
- .await;
+ block_on(async {
+ let st =
+ stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
+
+ let values: Vec<_> =
+ st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await;
assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
});
@@ -37,28 +40,287 @@ fn flat_map() {
#[test]
fn scan() {
- use futures::stream::{self, StreamExt};
-
- futures::executor::block_on(async {
- assert_eq!(
- stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
- .scan(1, |state, e| {
- *state += 1;
- futures::future::ready(if e < *state { Some(e) } else { None })
- })
+ block_on(async {
+ let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
+ .scan(1, |state, e| {
+ *state += 1;
+ futures::future::ready(if e < *state { Some(e) } else { None })
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ assert_eq!(values, vec![1u8, 2, 3, 4]);
+ });
+}
+
+#[test]
+fn flatten_unordered() {
+ use futures::executor::block_on;
+ use futures::stream::*;
+ use futures::task::*;
+ use std::convert::identity;
+ use std::pin::Pin;
+ use std::thread;
+ use std::time::Duration;
+
+ struct DataStream {
+ data: Vec<u8>,
+ polled: bool,
+ wake_immediately: bool,
+ }
+
+ impl Stream for DataStream {
+ type Item = u8;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time =
+ Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ self.polled = true;
+ Poll::Pending
+ } else {
+ self.polled = false;
+ Poll::Ready(self.data.pop())
+ }
+ }
+ }
+
+ struct Interchanger {
+ polled: bool,
+ base: u8,
+ wake_immediately: bool,
+ }
+
+ impl Stream for Interchanger {
+ type Item = DataStream;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ self.polled = true;
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time = Duration::from_millis(self.base as u64);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ Poll::Pending
+ } else {
+ let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+ self.base += 1;
+ self.polled = false;
+ Poll::Ready(Some(DataStream {
+ polled: false,
+ data,
+ wake_immediately: self.wake_immediately && self.base % 2 == 0,
+ }))
+ }
+ }
+ }
+
+ // basic behaviour
+ {
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(10..=12),
+ ]);
+
+ let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+ assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+ });
+
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(0..=2),
+ ]);
+
+ let mut fm_unordered = st
+ .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+ });
+ }
+
+ // wake up immediately
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // wake up after delay
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
.collect::<Vec<_>>()
- .await,
- vec![1u8, 2, 3, 4]
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>(),
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ );
+
+ fm_unordered.sort_unstable();
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, fl_unordered);
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // waker panics
+ {
+ let stream = Arc::new(Mutex::new(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity)),
+ ));
+
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+ let panic_waker = waker(Arc::new(PanicWaker));
+ let mut panic_cx = Context::from_waker(&panic_waker);
+ let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+ Poll::Ready(Some(()))
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // stream panics
+ {
+ let st = stream::iter(iter::once(
+ once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+ ))
+ .chain(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .map(|stream| stream.right_stream())
+ .take(10),
);
- });
+
+ let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+ let data = ready!(lock.poll_next_unpin(cx));
+
+ Poll::Ready(data)
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
}
#[test]
fn take_until() {
- use futures::future::{self, Future};
- use futures::stream::{self, StreamExt};
- use futures::task::Poll;
-
fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
let mut i = 0;
future::poll_fn(move |_cx| {
@@ -71,7 +333,7 @@ fn take_until() {
})
}
- futures::executor::block_on(async {
+ block_on(async {
// Verify stopping works:
let stream = stream::iter(1u32..=10);
let stop_fut = make_stop_fut(5);
@@ -123,10 +385,15 @@ fn take_until() {
#[test]
#[should_panic]
-fn ready_chunks_panic_on_cap_zero() {
- use futures::channel::mpsc;
- use futures::stream::StreamExt;
+fn chunks_panic_on_cap_zero() {
+ let (_, rx1) = mpsc::channel::<()>(1);
+ let _ = rx1.chunks(0);
+}
+
+#[test]
+#[should_panic]
+fn ready_chunks_panic_on_cap_zero() {
let (_, rx1) = mpsc::channel::<()>(1);
let _ = rx1.ready_chunks(0);
@@ -134,12 +401,6 @@ fn ready_chunks_panic_on_cap_zero() {
#[test]
fn ready_chunks() {
- use futures::channel::mpsc;
- use futures::stream::StreamExt;
- use futures::sink::SinkExt;
- use futures::FutureExt;
- use futures_test::task::noop_context;
-
let (mut tx, rx1) = mpsc::channel::<i32>(16);
let mut s = rx1.ready_chunks(2);
@@ -147,14 +408,14 @@ fn ready_chunks() {
let mut cx = noop_context();
assert!(s.next().poll_unpin(&mut cx).is_pending());
- futures::executor::block_on(async {
+ block_on(async {
tx.send(1).await.unwrap();
assert_eq!(s.next().await.unwrap(), vec![1]);
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
tx.send(4).await.unwrap();
- assert_eq!(s.next().await.unwrap(), vec![2,3]);
+ assert_eq!(s.next().await.unwrap(), vec![2, 3]);
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}
diff --git a/tests/stream_abortable.rs b/tests/stream_abortable.rs
new file mode 100644
index 0000000..2339dd0
--- /dev/null
+++ b/tests/stream_abortable.rs
@@ -0,0 +1,46 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::stream::{abortable, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use futures::SinkExt;
+use futures_test::task::new_count_waker;
+use std::pin::Pin;
+
+#[test]
+fn abortable_works() {
+ let (_tx, a_rx) = mpsc::channel::<()>(1);
+ let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+ abort_handle.abort();
+ assert!(abortable_rx.is_aborted());
+ assert_eq!(None, block_on(abortable_rx.next()));
+}
+
+#[test]
+fn abortable_awakens() {
+ let (_tx, a_rx) = mpsc::channel::<()>(1);
+ let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ assert_eq!(counter, 0);
+ assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx));
+ assert_eq!(counter, 0);
+
+ abort_handle.abort();
+ assert_eq!(counter, 1);
+ assert!(abortable_rx.is_aborted());
+ assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx));
+}
+
+#[test]
+fn abortable_resolves() {
+ let (mut tx, a_rx) = mpsc::channel::<()>(1);
+ let (mut abortable_rx, _abort_handle) = abortable(a_rx);
+
+ block_on(tx.send(())).unwrap();
+
+ assert!(!abortable_rx.is_aborted());
+ assert_eq!(Some(()), block_on(abortable_rx.next()));
+}
diff --git a/tests/buffer_unordered.rs b/tests/stream_buffer_unordered.rs
index 5c8b8bf..9a2ee17 100644
--- a/tests/buffer_unordered.rs
+++ b/tests/stream_buffer_unordered.rs
@@ -1,13 +1,13 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::{block_on, block_on_stream};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::mpsc as std_mpsc;
+use std::thread;
+
#[test]
#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
fn works() {
- use futures::channel::{oneshot, mpsc};
- use futures::executor::{block_on, block_on_stream};
- use futures::sink::SinkExt;
- use futures::stream::StreamExt;
- use std::sync::mpsc as std_mpsc;
- use std::thread;
-
const N: usize = 4;
let (mut tx, rx) = mpsc::channel(1);
diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs
index 272558c..8b23a0a 100644
--- a/tests/stream_catch_unwind.rs
+++ b/tests/stream_catch_unwind.rs
@@ -1,8 +1,8 @@
+use futures::executor::block_on_stream;
+use futures::stream::{self, StreamExt};
+
#[test]
fn panic_in_the_middle_of_the_stream() {
- use futures::executor::block_on_stream;
- use futures::stream::{self, StreamExt};
-
let stream = stream::iter(vec![Some(10), None, Some(11)]);
// panic on second element
@@ -16,9 +16,6 @@ fn panic_in_the_middle_of_the_stream() {
#[test]
fn no_panic() {
- use futures::executor::block_on_stream;
- use futures::stream::{self, StreamExt};
-
let stream = stream::iter(vec![10, 11, 12]);
let mut iter = block_on_stream(stream.catch_unwind());
diff --git a/tests/futures_ordered.rs b/tests/stream_futures_ordered.rs
index 7f21c82..84e0bcc 100644
--- a/tests/futures_ordered.rs
+++ b/tests/stream_futures_ordered.rs
@@ -1,10 +1,12 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt, TryFutureExt};
+use futures::stream::{FuturesOrdered, StreamExt};
+use futures_test::task::noop_context;
+use std::any::Any;
+
#[test]
fn works_1() {
- use futures::channel::oneshot;
- use futures::executor::block_on_stream;
- use futures::stream::{StreamExt, FuturesOrdered};
- use futures_test::task::noop_context;
-
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -24,21 +26,16 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
- use futures::channel::oneshot;
- use futures::future::{join, FutureExt};
- use futures::stream::{StreamExt, FuturesOrdered};
- use futures_test::task::noop_context;
-
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
- let mut stream = vec![
- a_rx.boxed(),
- join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
- ].into_iter().collect::<FuturesOrdered<_>>();
+ let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
+ .into_iter()
+ .collect::<FuturesOrdered<_>>();
let mut cx = noop_context();
a_tx.send(33).unwrap();
@@ -51,37 +48,30 @@ fn works_2() {
#[test]
fn from_iterator() {
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::{StreamExt, FuturesOrdered};
-
- let stream = vec![
- future::ready::<i32>(1),
- future::ready::<i32>(2),
- future::ready::<i32>(3)
- ].into_iter().collect::<FuturesOrdered<_>>();
+ let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
+ .into_iter()
+ .collect::<FuturesOrdered<_>>();
assert_eq!(stream.len(), 3);
- assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
+ assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn queue_never_unblocked() {
- use futures::channel::oneshot;
- use futures::future::{self, Future, TryFutureExt};
- use futures::stream::{StreamExt, FuturesOrdered};
- use futures_test::task::noop_context;
- use std::any::Any;
-
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let mut stream = vec![
Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
- Box::new(future::try_select(b_rx, c_rx)
- .map_err(|e| e.factor_first().0)
- .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
- ].into_iter().collect::<FuturesOrdered<_>>();
+ Box::new(
+ future::try_select(b_rx, c_rx)
+ .map_err(|e| e.factor_first().0)
+ .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)),
+ ) as _,
+ ]
+ .into_iter()
+ .collect::<FuturesOrdered<_>>();
let cx = &mut noop_context();
for _ in 0..10 {
diff --git a/tests/futures_unordered.rs b/tests/stream_futures_unordered.rs
index ac0817e..f62f733 100644
--- a/tests/futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -1,17 +1,17 @@
-use futures::future::Future;
-use futures::stream::{FuturesUnordered, StreamExt};
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt};
+use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
use futures_test::task::noop_context;
+use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
use std::iter::FromIterator;
use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn is_terminated() {
- use futures::future;
- use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
let mut cx = noop_context();
let mut tasks = FuturesUnordered::new();
@@ -39,19 +39,12 @@ fn is_terminated() {
#[test]
fn works_1() {
- use futures::channel::oneshot;
- use futures::executor::block_on_stream;
- use futures::stream::FuturesUnordered;
-
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
- let mut iter = block_on_stream(
- vec![a_rx, b_rx, c_rx]
- .into_iter()
- .collect::<FuturesUnordered<_>>(),
- );
+ let mut iter =
+ block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
b_tx.send(99).unwrap();
assert_eq!(Some(Ok(99)), iter.next());
@@ -63,24 +56,16 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
- use futures::channel::oneshot;
- use futures::future::{join, FutureExt};
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
- let mut stream = vec![
- a_rx.boxed(),
- join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
- ]
- .into_iter()
- .collect::<FuturesUnordered<_>>();
+ let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
a_tx.send(9).unwrap();
b_tx.send(10).unwrap();
@@ -94,29 +79,16 @@ fn works_2() {
#[test]
fn from_iterator() {
- use futures::executor::block_on;
- use futures::future;
- use futures::stream::{FuturesUnordered, StreamExt};
-
- let stream = vec![
- future::ready::<i32>(1),
- future::ready::<i32>(2),
- future::ready::<i32>(3),
- ]
- .into_iter()
- .collect::<FuturesUnordered<_>>();
+ let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
assert_eq!(stream.len(), 3);
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn finished_future() {
- use std::marker::Unpin;
- use futures::channel::oneshot;
- use futures::future::{self, Future, FutureExt};
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures_test::task::noop_context;
-
let (_a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -142,17 +114,11 @@ fn finished_future() {
#[test]
fn iter_mut_cancel() {
- use futures::channel::oneshot;
- use futures::executor::block_on_stream;
- use futures::stream::FuturesUnordered;
-
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
- let mut stream = vec![a_rx, b_rx, c_rx]
- .into_iter()
- .collect::<FuturesUnordered<_>>();
+ let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
for rx in stream.iter_mut() {
rx.close();
@@ -172,16 +138,10 @@ fn iter_mut_cancel() {
#[test]
fn iter_mut_len() {
- use futures::future;
- use futures::stream::FuturesUnordered;
-
- let mut stream = vec![
- future::pending::<()>(),
- future::pending::<()>(),
- future::pending::<()>(),
- ]
- .into_iter()
- .collect::<FuturesUnordered<_>>();
+ let mut stream =
+ vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
@@ -196,15 +156,6 @@ fn iter_mut_len() {
#[test]
fn iter_cancel() {
- use std::marker::Unpin;
- use std::pin::Pin;
- use std::sync::atomic::{AtomicBool, Ordering};
-
- use futures::executor::block_on_stream;
- use futures::future::{self, Future, FutureExt};
- use futures::stream::FuturesUnordered;
- use futures::task::{Context, Poll};
-
struct AtomicCancel<F> {
future: F,
cancel: AtomicBool,
@@ -250,16 +201,9 @@ fn iter_cancel() {
#[test]
fn iter_len() {
- use futures::future;
- use futures::stream::FuturesUnordered;
-
- let stream = vec![
- future::pending::<()>(),
- future::pending::<()>(),
- future::pending::<()>(),
- ]
- .into_iter()
- .collect::<FuturesUnordered<_>>();
+ let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
let mut iter = stream.iter();
assert_eq!(iter.len(), 3);
@@ -273,12 +217,52 @@ fn iter_len() {
}
#[test]
-fn futures_not_moved_after_poll() {
- use futures::future;
- use futures::stream::FuturesUnordered;
- use futures_test::future::FutureTestExt;
- use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
+fn into_iter_cancel() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
+
+ let stream = stream
+ .into_iter()
+ .map(|mut rx| {
+ rx.close();
+ rx
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut iter = block_on_stream(stream);
+
+ assert!(a_tx.is_canceled());
+ assert!(b_tx.is_canceled());
+ assert!(c_tx.is_canceled());
+
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn into_iter_len() {
+ let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+ let mut into_iter = stream.into_iter();
+ assert_eq!(into_iter.len(), 3);
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 2);
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 1);
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 0);
+ assert!(into_iter.next().is_none());
+}
+
+#[test]
+fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
// asserting that it does not move.
let fut = future::ready(()).pending_once().assert_unmoved();
@@ -292,11 +276,6 @@ fn futures_not_moved_after_poll() {
#[test]
fn len_valid_during_out_of_order_completion() {
- use futures::channel::oneshot;
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
// Complete futures out-of-order and add new futures afterwards to ensure
// length values remain correct.
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -368,3 +347,25 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
}
+
+#[test]
+fn clear() {
+ let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
+
+ assert_eq!(block_on(tasks.next()), Some(1));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ tasks.push(future::ready(3));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ assert_eq!(block_on(tasks.next()), None);
+ assert!(tasks.is_terminated());
+ tasks.clear();
+ assert!(!tasks.is_terminated());
+}
diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs
index 222c985..60188d3 100644
--- a/tests/stream_into_async_read.rs
+++ b/tests/stream_into_async_read.rs
@@ -1,31 +1,51 @@
-#[test]
-fn test_into_async_read() {
- use core::pin::Pin;
- use futures::io::AsyncRead;
- use futures::stream::{self, TryStreamExt};
- use futures::task::Poll;
- use futures_test::{task::noop_context, stream::StreamTestExt};
-
- macro_rules! assert_read {
- ($reader:expr, $buf:expr, $item:expr) => {
- let mut cx = noop_context();
- loop {
- match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
- Poll::Ready(Ok(x)) => {
- assert_eq!(x, $item);
- break;
- }
- Poll::Ready(Err(err)) => {
- panic!("assertion failed: expected value but got {}", err);
- }
- Poll::Pending => {
- continue;
- }
+use core::pin::Pin;
+use futures::io::{AsyncBufRead, AsyncRead};
+use futures::stream::{self, TryStreamExt};
+use futures::task::Poll;
+use futures_test::{stream::StreamTestExt, task::noop_context};
+
+macro_rules! assert_read {
+ ($reader:expr, $buf:expr, $item:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $item);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+}
+
+macro_rules! assert_fill_buf {
+ ($reader:expr, $buf:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $buf);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
}
}
- };
- }
+ }
+ };
+}
+#[test]
+fn test_into_async_read() {
let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
let mut reader = stream.interleave_pending().into_async_read();
let mut buf = vec![0; 3];
@@ -53,32 +73,6 @@ fn test_into_async_read() {
#[test]
fn test_into_async_bufread() {
- use core::pin::Pin;
- use futures::io::AsyncBufRead;
- use futures::stream::{self, TryStreamExt};
- use futures::task::Poll;
- use futures_test::{task::noop_context, stream::StreamTestExt};
-
- macro_rules! assert_fill_buf {
- ($reader:expr, $buf:expr) => {
- let mut cx = noop_context();
- loop {
- match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
- Poll::Ready(Ok(x)) => {
- assert_eq!(x, $buf);
- break;
- }
- Poll::Ready(Err(err)) => {
- panic!("assertion failed: expected value but got {}", err);
- }
- Poll::Pending => {
- continue;
- }
- }
- }
- };
- }
-
let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
let mut reader = stream.interleave_pending().into_async_read();
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
index 66a7385..153fcc2 100644
--- a/tests/stream_peekable.rs
+++ b/tests/stream_peekable.rs
@@ -1,13 +1,58 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures::stream::{self, Peekable, StreamExt};
+
#[test]
fn peekable() {
- use futures::executor::block_on;
- use futures::pin_mut;
- use futures::stream::{self, Peekable, StreamExt};
-
block_on(async {
let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
pin_mut!(peekable);
assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+
+ let s = stream::once(async { 1 }).peekable();
+ pin_mut!(s);
+ assert_eq!(s.as_mut().peek().await, Some(&1u8));
+ assert_eq!(s.collect::<Vec<u8>>().await, vec![1]);
+ });
+}
+
+#[test]
+fn peekable_mut() {
+ block_on(async {
+ let s = stream::iter(vec![1u8, 2, 3]).peekable();
+ pin_mut!(s);
+ if let Some(p) = s.as_mut().peek_mut().await {
+ if *p == 1 {
+ *p = 5;
+ }
+ }
+ assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
+ });
+}
+
+#[test]
+fn peekable_next_if_eq() {
+ block_on(async {
+ // first, try on references
+ let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
+ pin_mut!(s);
+ // try before `peek()`
+ assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
+ assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
+ // try after peek()
+ assert_eq!(s.as_mut().peek().await, Some(&"of"));
+ assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
+ assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
+ // make sure `next()` still behaves
+ assert_eq!(s.next().await, Some("Gold"));
+
+ // make sure comparison works for owned values
+ let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
+ pin_mut!(s);
+ // make sure basic functionality works
+ assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
+ assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
+ assert_eq!(s.as_mut().next_if_eq("").await, None);
});
}
diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs
index 6178412..4ae0735 100644
--- a/tests/stream_select_all.rs
+++ b/tests/stream_select_all.rs
@@ -1,10 +1,12 @@
+use futures::channel::mpsc;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, FutureExt};
+use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+
#[test]
fn is_terminated() {
- use futures::future::{self, FutureExt};
- use futures::stream::{FusedStream, SelectAll, StreamExt};
- use futures::task::Poll;
- use futures_test::task::noop_context;
-
let mut cx = noop_context();
let mut tasks = SelectAll::new();
@@ -30,9 +32,6 @@ fn is_terminated() {
#[test]
fn issue_1626() {
- use futures::executor::block_on_stream;
- use futures::stream;
-
let a = stream::iter(0..=2);
let b = stream::iter(10..=14);
@@ -51,10 +50,6 @@ fn issue_1626() {
#[test]
fn works_1() {
- use futures::channel::mpsc;
- use futures::executor::block_on_stream;
- use futures::stream::select_all;
-
let (a_tx, a_rx) = mpsc::unbounded::<u32>();
let (b_tx, b_rx) = mpsc::unbounded::<u32>();
let (c_tx, c_rx) = mpsc::unbounded::<u32>();
@@ -81,3 +76,122 @@ fn works_1() {
drop((a_tx, b_tx, c_tx));
assert_eq!(None, stream.next());
}
+
+#[test]
+fn clear() {
+ let mut tasks =
+ select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
+
+ assert_eq!(block_on(tasks.next()), Some(1));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ tasks.push(stream::iter(vec![3].into_iter()));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ assert_eq!(block_on(tasks.next()), None);
+ assert!(tasks.is_terminated());
+ tasks.clear();
+ assert!(!tasks.is_terminated());
+}
+
+#[test]
+fn iter_mut() {
+ let mut stream =
+ vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.next()), Some(1));
+ assert_eq!(stream.len(), 2);
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ assert_eq!(block_on(stream.next()), Some(2));
+ assert_eq!(stream.len(), 2);
+ assert_eq!(block_on(stream.next()), None);
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn iter() {
+ let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.next()), Some(1));
+ assert_eq!(stream.len(), 2);
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ assert_eq!(block_on(stream.next()), Some(2));
+ assert_eq!(stream.len(), 2);
+ assert_eq!(block_on(stream.next()), None);
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn into_iter() {
+ let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.into_iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs
index bec5262..8252ad7 100644
--- a/tests/stream_select_next_some.rs
+++ b/tests/stream_select_next_some.rs
@@ -1,11 +1,13 @@
+use futures::executor::block_on;
+use futures::future::{self, FusedFuture, FutureExt};
+use futures::select;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::new_count_waker;
+
#[test]
fn is_terminated() {
- use futures::future;
- use futures::future::{FusedFuture, FutureExt};
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures::task::{Context, Poll};
- use futures_test::task::new_count_waker;
-
let (waker, counter) = new_count_waker();
let mut cx = Context::from_waker(&waker);
@@ -30,15 +32,11 @@ fn is_terminated() {
#[test]
fn select() {
- use futures::{future, select};
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures_test::future::FutureTestExt;
-
// Checks that even though `async_tasks` will yield a `None` and return
// `is_terminated() == true` during the first poll, it manages to toggle
// back to having items after a future is pushed into it during the second
// poll (after pending_once completes).
- futures::executor::block_on(async {
+ block_on(async {
let mut fut = future::ready(1).pending_once();
let mut async_tasks = FuturesUnordered::new();
let mut total = 0;
@@ -61,17 +59,13 @@ fn select() {
// Check that `select!` macro does not fail when importing from `futures_util`.
#[test]
fn futures_util_select() {
- use futures::future;
- use futures::stream::{FuturesUnordered, StreamExt};
- use futures_test::future::FutureTestExt;
-
use futures_util::select;
// Checks that even though `async_tasks` will yield a `None` and return
// `is_terminated() == true` during the first poll, it manages to toggle
// back to having items after a future is pushed into it during the second
// poll (after pending_once completes).
- futures::executor::block_on(async {
+ block_on(async {
let mut fut = future::ready(1).pending_once();
let mut async_tasks = FuturesUnordered::new();
let mut total = 0;
diff --git a/tests/stream_split.rs b/tests/stream_split.rs
new file mode 100644
index 0000000..694c151
--- /dev/null
+++ b/tests/stream_split.rs
@@ -0,0 +1,57 @@
+use futures::executor::block_on;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_project::pin_project;
+use std::pin::Pin;
+
+#[test]
+fn test_split() {
+ #[pin_project]
+ struct Join<T, U> {
+ #[pin]
+ stream: T,
+ #[pin]
+ sink: U,
+ }
+
+ impl<T: Stream, U> Stream for Join<T, U> {
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+ self.project().stream.poll_next(cx)
+ }
+ }
+
+ impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
+ type Error = U::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().sink.poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ self.project().sink.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().sink.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().sink.poll_close(cx)
+ }
+ }
+
+ let mut dest: Vec<i32> = Vec::new();
+ {
+ let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest };
+
+ let (sink, stream) = join.split();
+ let join = sink.reunite(stream).expect("test_split: reunite error");
+ let (mut sink, stream) = join.split();
+ let mut stream = stream.map(Ok);
+ block_on(sink.send_all(&mut stream)).unwrap();
+ }
+ assert_eq!(dest, vec![10, 20, 30]);
+}
diff --git a/tests/try_stream.rs b/tests/stream_try_stream.rs
index 194e74d..d83fc54 100644
--- a/tests/try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
use futures::{
stream::{self, StreamExt, TryStreamExt},
task::Poll,
diff --git a/tests/unfold.rs b/tests/stream_unfold.rs
index 95722cf..16b1081 100644
--- a/tests/unfold.rs
+++ b/tests/stream_unfold.rs
@@ -1,10 +1,7 @@
use futures::future;
use futures::stream;
-
use futures_test::future::FutureTestExt;
-use futures_test::{
- assert_stream_done, assert_stream_next, assert_stream_pending,
-};
+use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
#[test]
fn unfold1() {
diff --git a/tests/task_arc_wake.rs b/tests/task_arc_wake.rs
new file mode 100644
index 0000000..aedc15b
--- /dev/null
+++ b/tests/task_arc_wake.rs
@@ -0,0 +1,79 @@
+use futures::task::{self, ArcWake, Waker};
+use std::panic;
+use std::sync::{Arc, Mutex};
+
+struct CountingWaker {
+ nr_wake: Mutex<i32>,
+}
+
+impl CountingWaker {
+ fn new() -> Self {
+ Self { nr_wake: Mutex::new(0) }
+ }
+
+ fn wakes(&self) -> i32 {
+ *self.nr_wake.lock().unwrap()
+ }
+}
+
+impl ArcWake for CountingWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let mut lock = arc_self.nr_wake.lock().unwrap();
+ *lock += 1;
+ }
+}
+
+#[test]
+fn create_from_arc() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!(2, Arc::strong_count(&some_w));
+ w1.wake_by_ref();
+ assert_eq!(1, some_w.wakes());
+
+ let w2 = w1.clone();
+ assert_eq!(3, Arc::strong_count(&some_w));
+
+ w2.wake_by_ref();
+ assert_eq!(2, some_w.wakes());
+
+ drop(w2);
+ assert_eq!(2, Arc::strong_count(&some_w));
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w));
+}
+
+#[test]
+fn ref_wake_same() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ let w2 = task::waker_ref(&some_w);
+ let w3 = w2.clone();
+
+ assert!(w1.will_wake(&w2));
+ assert!(w2.will_wake(&w3));
+}
+
+#[test]
+fn proper_refcount_on_wake_panic() {
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
+ let some_w = Arc::new(PanicWaker);
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!(
+ "WAKE UP",
+ *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()
+ );
+ assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w)); // some_w
+}
diff --git a/tests/atomic_waker.rs b/tests/task_atomic_waker.rs
index bf15d0f..2d1612a 100644
--- a/tests/atomic_waker.rs
+++ b/tests/task_atomic_waker.rs
@@ -1,14 +1,14 @@
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::task::{AtomicWaker, Poll};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::thread;
+
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn basic() {
- use std::sync::atomic::AtomicUsize;
- use std::sync::atomic::Ordering;
- use std::sync::Arc;
- use std::thread;
-
- use futures::executor::block_on;
- use futures::future::poll_fn;
- use futures::task::{AtomicWaker, Poll};
-
let atomic_waker = Arc::new(AtomicWaker::new());
let atomic_waker_copy = atomic_waker.clone();
diff --git a/tests/test_macro.rs b/tests/test_macro.rs
new file mode 100644
index 0000000..6adf51d
--- /dev/null
+++ b/tests/test_macro.rs
@@ -0,0 +1,20 @@
+#[futures_test::test]
+async fn it_works() {
+ let fut = async { true };
+ assert!(fut.await);
+
+ let fut = async { false };
+ assert!(!fut.await);
+}
+
+#[should_panic]
+#[futures_test::test]
+async fn it_is_being_run() {
+ let fut = async { false };
+ assert!(fut.await);
+}
+
+#[futures_test::test]
+async fn return_ty() -> Result<(), ()> {
+ Ok(())
+}
diff --git a/tests/try_join.rs b/tests/try_join.rs
index 6c6d084..0281ab8 100644
--- a/tests/try_join.rs
+++ b/tests/try_join.rs
@@ -1,9 +1,9 @@
#![deny(unreachable_code)]
-use futures::{try_join, executor::block_on};
+use futures::{executor::block_on, try_join};
// TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to
-// test behaviour of the `try_join!` macro with the never type before it is
+// test behavior of the `try_join!` macro with the never type before it is
// stabilized. Once `!` is again stabilized this can be removed and replaced
// with direct use of `!` below where `Never` is used.
trait MyTrait {
@@ -14,7 +14,6 @@ impl<T> MyTrait for fn() -> T {
}
type Never = <fn() -> ! as MyTrait>::Output;
-
#[test]
fn try_join_never_error() {
block_on(async {
diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs
deleted file mode 100644
index 8e579a2..0000000
--- a/tests/try_join_all.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-mod util {
- use std::future::Future;
- use futures::executor::block_on;
- use std::fmt::Debug;
-
- pub fn assert_done<T, F>(actual_fut: F, expected: T)
- where
- T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
- {
- let output = block_on(actual_fut());
- assert_eq!(output, expected);
- }
-}
-
-#[test]
-fn collect_collects() {
- use futures_util::future::{err, ok, try_join_all};
-
- use util::assert_done;
-
- assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
- assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
- assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
- // REVIEW: should this be implemented?
- // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
-
- // TODO: needs more tests
-}
-
-#[test]
-fn try_join_all_iter_lifetime() {
- use futures_util::future::{ok, try_join_all};
- use std::future::Future;
-
- use util::assert_done;
-
- // In futures-rs version 0.1, this function would fail to typecheck due to an overly
- // conservative type parameterization of `TryJoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
- let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
- Box::new(try_join_all(iter))
- }
-
- assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
-}
-
-#[test]
-fn try_join_all_from_iter() {
- use futures_util::future::{ok, TryJoinAll};
-
- use util::assert_done;
-
- assert_done(
- || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
- Ok::<_, usize>(vec![1, 2]),
- )
-}
diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs
index 6c7e11c..a7a5710 100644
--- a/tests_disabled/all.rs
+++ b/tests_disabled/all.rs
@@ -1,27 +1,27 @@
-use futures::future;
-use futures::executor::block_on;
use futures::channel::oneshot::{self, Canceled};
+use futures::executor::block_on;
+use futures::future;
use std::sync::mpsc::{channel, TryRecvError};
-mod support;
-use support::*;
+// mod support;
+// use support::*;
fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
match r {
- Ok(Either::Left((t, _))) |
- Ok(Either::Right((t, _))) => Ok(t),
- Err(Either::Left((e, _))) |
- Err(Either::Right((e, _))) => Err(e),
+ Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t),
+ Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e),
}
}
#[test]
fn result_smoke() {
fn is_future_v<A, B, C>(_: C)
- where A: Send + 'static,
- B: Send + 'static,
- C: Future<Item=A, Error=B>
- {}
+ where
+ A: Send + 'static,
+ B: Send + 'static,
+ C: Future<Item = A, Error = B>,
+ {
+ }
is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
@@ -64,7 +64,9 @@ fn result_smoke() {
#[test]
fn test_empty() {
- fn empty() -> Empty<i32, u32> { future::empty() }
+ fn empty() -> Empty<i32, u32> {
+ future::empty()
+ }
assert_empty(|| empty());
assert_empty(|| empty().select(empty()));
@@ -105,16 +107,22 @@ fn flatten() {
#[test]
fn smoke_oneshot() {
- assert_done(|| {
- let (c, p) = oneshot::channel();
- c.send(1).unwrap();
- p
- }, Ok(1));
- assert_done(|| {
- let (c, p) = oneshot::channel::<i32>();
- drop(c);
- p
- }, Err(Canceled));
+ assert_done(
+ || {
+ let (c, p) = oneshot::channel();
+ c.send(1).unwrap();
+ p
+ },
+ Ok(1),
+ );
+ assert_done(
+ || {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ p
+ },
+ Err(Canceled),
+ );
let mut completes = Vec::new();
assert_empty(|| {
let (a, b) = oneshot::channel::<i32>();
@@ -129,9 +137,7 @@ fn smoke_oneshot() {
let (c, p) = oneshot::channel::<i32>();
drop(c);
let (tx, rx) = channel();
- p.then(move |_| {
- tx.send(())
- }).forget();
+ p.then(move |_| tx.send(())).forget();
rx.recv().unwrap();
}
@@ -139,8 +145,14 @@ fn smoke_oneshot() {
fn select_cancels() {
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |b| { btx.send(b).unwrap(); b });
- let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
let mut f = b.select(d).then(unselect);
// assert!(f.poll(&mut Task::new()).is_pending());
@@ -156,8 +168,14 @@ fn select_cancels() {
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |b| { btx.send(b).unwrap(); b });
- let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
let mut f = b.select(d).then(unselect);
assert!(f.poll(lw).ok().unwrap().is_pending());
@@ -173,8 +191,14 @@ fn select_cancels() {
fn join_cancels() {
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |b| { btx.send(b).unwrap(); b });
- let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
let mut f = b.join(d);
drop(a);
@@ -185,8 +209,14 @@ fn join_cancels() {
let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, _brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |b| { btx.send(b).unwrap(); b });
- let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
let (tx, rx) = channel();
let f = b.join(d);
@@ -194,7 +224,8 @@ fn join_cancels() {
tx.send(()).unwrap();
let res: Result<(), ()> = Ok(());
res
- }).forget();
+ })
+ .forget();
assert!(rx.try_recv().is_err());
drop(a);
rx.recv().unwrap();
@@ -243,7 +274,6 @@ fn join_incomplete() {
})
}
-
#[test]
fn select2() {
assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
@@ -251,14 +281,15 @@ fn select2() {
assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
- assert_done(|| {
- f_ok(1).select(f_ok(2))
- .map_err(|_| 0)
- .and_then(|either_tup| {
- let (a, b) = either_tup.into_inner();
- b.map(move |b| a + b)
- })
- }, Ok(3));
+ assert_done(
+ || {
+ f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| {
+ let (a, b) = either_tup.into_inner();
+ b.map(move |b| a + b)
+ })
+ },
+ Ok(3),
+ );
// Finish one half of a select and then fail the second, ensuring that we
// get the notification of the second one.
@@ -297,8 +328,14 @@ fn select2() {
{
let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |v| { btx.send(v).unwrap(); v });
- let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
let f = b.select(d);
drop(f);
assert!(drx.recv().is_err());
@@ -309,8 +346,14 @@ fn select2() {
{
let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |v| { btx.send(v).unwrap(); v });
- let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
let mut f = b.select(d);
let _res = noop_waker_lw(|lw| f.poll(lw));
drop(f);
@@ -322,8 +365,14 @@ fn select2() {
{
let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
let ((btx, brx), (dtx, drx)) = (channel(), channel());
- let b = b.map(move |v| { btx.send(v).unwrap(); v });
- let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
let (tx, rx) = channel();
b.select(d).map(move |_| tx.send(()).unwrap()).forget();
drop(a);
diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs
index c1bc33f..0166ca4 100644
--- a/tests_disabled/bilock.rs
+++ b/tests_disabled/bilock.rs
@@ -1,11 +1,11 @@
-use futures::task;
-use futures::stream;
use futures::future;
+use futures::stream;
+use futures::task;
use futures_util::lock::BiLock;
use std::thread;
-mod support;
-use support::*;
+// mod support;
+// use support::*;
#[test]
fn smoke() {
@@ -41,9 +41,9 @@ fn smoke() {
});
assert!(task::spawn(future)
- .poll_future_notify(&notify_noop(), 0)
- .expect("failure in poll")
- .is_ready());
+ .poll_future_notify(&notify_noop(), 0)
+ .expect("failure in poll")
+ .is_ready());
}
#[test]
@@ -51,10 +51,7 @@ fn concurrent() {
const N: usize = 10000;
let (a, b) = BiLock::new(0);
- let a = Increment {
- a: Some(a),
- remaining: N,
- };
+ let a = Increment { a: Some(a), remaining: N };
let b = stream::iter_ok(0..N).fold(b, |b, _n| {
b.lock().map(|mut b| {
*b += 1;
@@ -89,7 +86,7 @@ fn concurrent() {
fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
loop {
if self.remaining == 0 {
- return Ok(self.a.take().unwrap().into())
+ return Ok(self.a.take().unwrap().into());
}
let a = self.a.as_ref().unwrap();
diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs
index ef0676d..854dbad 100644
--- a/tests_disabled/stream.rs
+++ b/tests_disabled/stream.rs
@@ -1,26 +1,26 @@
+use futures::channel::mpsc;
+use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{err, ok};
use futures::stream::{empty, iter_ok, poll_fn, Peekable};
-use futures::channel::oneshot;
-use futures::channel::mpsc;
-mod support;
-use support::*;
+// mod support;
+// use support::*;
pub struct Iter<I> {
iter: I,
}
pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
- where J: IntoIterator<Item=Result<T, E>>,
+where
+ J: IntoIterator<Item = Result<T, E>>,
{
- Iter {
- iter: i.into_iter(),
- }
+ Iter { iter: i.into_iter() }
}
impl<I, T, E> Stream for Iter<I>
- where I: Iterator<Item=Result<T, E>>,
+where
+ I: Iterator<Item = Result<T, E>>,
{
type Item = T;
type Error = E;
@@ -34,21 +34,15 @@ impl<I, T, E> Stream for Iter<I>
}
}
-fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+fn list() -> Box<Stream<Item = i32, Error = u32> + Send> {
let (tx, rx) = mpsc::channel(1);
- tx.send(Ok(1))
- .and_then(|tx| tx.send(Ok(2)))
- .and_then(|tx| tx.send(Ok(3)))
- .forget();
+ tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget();
Box::new(rx.then(|r| r.unwrap()))
}
-fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> {
let (tx, rx) = mpsc::channel(1);
- tx.send(Ok(1))
- .and_then(|tx| tx.send(Ok(2)))
- .and_then(|tx| tx.send(Err(3)))
- .forget();
+ tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget();
Box::new(rx.then(|r| r.unwrap()))
}
@@ -89,40 +83,31 @@ fn filter() {
#[test]
fn filter_map() {
- assert_done(|| list().filter_map(|x| {
- ok(if x % 2 == 0 {
- Some(x + 10)
- } else {
- None
- })
- }).collect(), Ok(vec![12]));
+ assert_done(
+ || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(),
+ Ok(vec![12]),
+ );
}
#[test]
fn and_then() {
assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
- assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(),
- Err(1));
+ assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1));
}
#[test]
fn then() {
assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
-
}
#[test]
fn or_else() {
- assert_done(|| err_list().or_else(|a| {
- ok::<i32, u32>(a as i32)
- }).collect(), Ok(vec![1, 2, 3]));
+ assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3]));
}
#[test]
fn flatten() {
- assert_done(|| list().map(|_| list()).flatten().collect(),
- Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
-
+ assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
}
#[test]
@@ -132,9 +117,7 @@ fn skip() {
#[test]
fn skip_passes_errors_through() {
- let mut s = block_on_stream(
- iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)
- );
+ let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1));
assert_eq!(s.next(), Some(Err(1)));
assert_eq!(s.next(), Some(Err(2)));
assert_eq!(s.next(), Some(Ok(4)));
@@ -144,8 +127,7 @@ fn skip_passes_errors_through() {
#[test]
fn skip_while() {
- assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
- Ok(vec![2, 3]));
+ assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3]));
}
#[test]
fn take() {
@@ -154,8 +136,7 @@ fn take() {
#[test]
fn take_while() {
- assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
- Ok(vec![1, 2]));
+ assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2]));
}
#[test]
@@ -193,9 +174,9 @@ fn buffered() {
let (a, b) = oneshot::channel::<u32>();
let (c, d) = oneshot::channel::<u32>();
- tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
- .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
- .forget();
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
let mut rx = rx.buffered(2);
sassert_empty(&mut rx);
@@ -211,9 +192,9 @@ fn buffered() {
let (a, b) = oneshot::channel::<u32>();
let (c, d) = oneshot::channel::<u32>();
- tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
- .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
- .forget();
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
let mut rx = rx.buffered(1);
sassert_empty(&mut rx);
@@ -233,8 +214,8 @@ fn unordered() {
let (c, d) = oneshot::channel::<u32>();
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
- .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
- .forget();
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
let mut rx = rx.buffer_unordered(2);
sassert_empty(&mut rx);
@@ -250,8 +231,8 @@ fn unordered() {
let (c, d) = oneshot::channel::<u32>();
tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
- .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
- .forget();
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
// We don't even get to see `c` until `a` completes.
let mut rx = rx.buffer_unordered(1);
@@ -267,21 +248,17 @@ fn unordered() {
#[test]
fn zip() {
- assert_done(|| list().zip(list()).collect(),
- Ok(vec![(1, 1), (2, 2), (3, 3)]));
- assert_done(|| list().zip(list().take(2)).collect(),
- Ok(vec![(1, 1), (2, 2)]));
- assert_done(|| list().take(2).zip(list()).collect(),
- Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)]));
+ assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)]));
assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
- assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
- Ok(vec![(1, 2), (2, 3), (3, 4)]));
+ assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)]));
}
#[test]
fn peek() {
struct Peek {
- inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
+ inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>,
}
impl Future for Peek {
@@ -299,15 +276,12 @@ fn peek() {
}
}
- block_on(Peek {
- inner: list().peekable(),
- }).unwrap()
+ block_on(Peek { inner: list().peekable() }).unwrap()
}
#[test]
fn wait() {
- assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(),
- Ok(vec![1, 2, 3]));
+ assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3]));
}
#[test]
@@ -337,8 +311,10 @@ fn forward() {
let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
assert_eq!(v, vec![0, 1, 2, 3]);
- assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
- Ok(vec![0, 1, 2, 3, 4, 5]));
+ assert_done(
+ move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
+ Ok(vec![0, 1, 2, 3, 4, 5]),
+ );
}
#[test]