aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Macnak <natsu@google.com>2020-04-17 22:39:55 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-04-17 22:39:55 +0000
commit7b15212dc34bfbe4f938a14c8873a1ab35771067 (patch)
tree2904c76dbc4b14ccf04bbb6f8f30cb42e1994440
parent4d40bd2be1190c8fc626cde39660ccb58737091d (diff)
parent1c010194db0195f1ebc0c90793e3ce27f498aad1 (diff)
downloadfutures-7b15212dc34bfbe4f938a14c8873a1ab35771067.tar.gz
Import 'futures' rust crate version 0.3.4 am: 3d406bbb48 am: 4394d5d0c0 am: e92cdfa932 am: 1c010194db
Change-Id: I30d90a6f8a1f4adea0a7028647f65b78cfc152f3
-rw-r--r--.gitignore7
-rw-r--r--Android.bp44
-rw-r--r--Cargo.toml76
-rw-r--r--Cargo.toml.orig52
l---------LICENSE1
-rw-r--r--LICENSE-APACHE202
-rw-r--r--LICENSE-MIT26
-rw-r--r--METADATA18
-rw-r--r--MODULE_LICENSE_APACHE20
l---------NOTICE1
-rw-r--r--src/lib.rs605
-rw-r--r--tests/abortable.rs39
-rw-r--r--tests/arc_wake.rs77
-rw-r--r--tests/async_await_macros.rs360
-rw-r--r--tests/atomic_waker.rs49
-rw-r--r--tests/basic_combinators.rs98
-rw-r--r--tests/buffer_unordered.rs73
-rw-r--r--tests/compat.rs17
-rw-r--r--tests/eager_drop.rs117
-rw-r--r--tests/eventual.rs159
-rw-r--r--tests/fuse.rs12
-rw-r--r--tests/future_obj.rs33
-rw-r--r--tests/future_try_flatten_stream.rs82
-rw-r--r--tests/futures_ordered.rs83
-rw-r--r--tests/futures_unordered.rs287
-rw-r--r--tests/inspect.rs14
-rw-r--r--tests/io_buf_reader.rs321
-rw-r--r--tests/io_buf_writer.rs236
-rw-r--r--tests/io_cursor.rs29
-rw-r--r--tests/io_lines.rs62
-rw-r--r--tests/io_read.rs65
-rw-r--r--tests/io_read_exact.rs17
-rw-r--r--tests/io_read_line.rs60
-rw-r--r--tests/io_read_to_string.rs45
-rw-r--r--tests/io_read_until.rs60
-rw-r--r--tests/io_window.rs26
-rw-r--r--tests/io_write.rs70
-rw-r--r--tests/join_all.rs43
-rw-r--r--tests/macro_comma_support.rs42
-rw-r--r--tests/mutex.rs69
-rw-r--r--tests/object_safety.rs49
-rw-r--r--tests/oneshot.rs66
-rw-r--r--tests/ready_queue.rs151
-rw-r--r--tests/recurse.rs22
-rw-r--r--tests/select_all.rs29
-rw-r--r--tests/select_ok.rs39
-rw-r--r--tests/shared.rs151
-rw-r--r--tests/sink.rs516
-rw-r--r--tests/sink_fanout.rs24
-rw-r--r--tests/split.rs77
-rw-r--r--tests/stream.rs32
-rw-r--r--tests/stream_catch_unwind.rs27
-rw-r--r--tests/stream_into_async_read.rs96
-rw-r--r--tests/stream_peekable.rs13
-rw-r--r--tests/stream_select_all.rs78
-rw-r--r--tests/stream_select_next_some.rs85
-rw-r--r--tests/try_join.rs36
-rw-r--r--tests/try_join_all.rs44
-rw-r--r--tests/unfold.rs35
-rw-r--r--tests_disabled/all.rs351
-rw-r--r--tests_disabled/bilock.rs105
-rw-r--r--tests_disabled/stream.rs393
62 files changed, 6096 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..de02da3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+target
+**/*.rs.bk
+Cargo.lock
+_site
+.sass-cache
+.idea
+.DS_Store
diff --git a/Android.bp b/Android.bp
new file mode 100644
index 0000000..c988699
--- /dev/null
+++ b/Android.bp
@@ -0,0 +1,44 @@
+// This file is generated by cargo2android.py.
+
+rust_library_host_rlib {
+ name: "libfutures",
+ crate_name: "futures",
+ srcs: ["src/lib.rs"],
+ edition: "2018",
+ features: [
+ "alloc",
+ "async-await",
+ "default",
+ "executor",
+ "futures-executor",
+ "std",
+ ],
+ rlibs: [
+ "libfutures_channel",
+ "libfutures_core",
+ "libfutures_executor",
+ "libfutures_io",
+ "libfutures_sink",
+ "libfutures_task",
+ "libfutures_util",
+ ],
+}
+
+// dependent_library ["feature_list"]
+// futures-channel-0.3.4 "alloc,futures-sink,sink,std"
+// futures-core-0.3.4 "alloc,std"
+// futures-executor-0.3.4 "std"
+// futures-io-0.3.4 "std"
+// futures-macro-0.3.4
+// futures-sink-0.3.4 "alloc,std"
+// futures-task-0.3.4 "alloc,std"
+// futures-util-0.3.4 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
+// memchr-2.3.3 "default,std"
+// pin-utils-0.1.0-alpha.4
+// proc-macro-hack-0.5.11
+// proc-macro-nested-0.1.3
+// proc-macro2-1.0.9 "default,proc-macro"
+// quote-1.0.3 "default,proc-macro"
+// slab-0.4.2
+// syn-1.0.16 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
+// unicode-xid-0.2.0 "default"
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..3f93105
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,76 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "futures"
+version = "0.3.4"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures/0.3.0"
+readme = "../README.md"
+keywords = ["futures", "async", "future"]
+categories = ["asynchronous"]
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+[package.metadata.docs.rs]
+all-features = true
+
+[package.metadata.playground]
+features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+[dependencies.futures-channel]
+version = "0.3.4"
+features = ["sink"]
+default-features = false
+
+[dependencies.futures-core]
+version = "0.3.4"
+default-features = false
+
+[dependencies.futures-executor]
+version = "0.3.4"
+optional = true
+default-features = false
+
+[dependencies.futures-io]
+version = "0.3.4"
+default-features = false
+
+[dependencies.futures-sink]
+version = "0.3.4"
+default-features = false
+
+[dependencies.futures-task]
+version = "0.3.4"
+default-features = false
+
+[dependencies.futures-util]
+version = "0.3.4"
+features = ["sink"]
+default-features = false
+
+[features]
+alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
+async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+bilock = ["futures-util/bilock"]
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
+compat = ["std", "futures-util/compat"]
+default = ["std", "async-await", "executor"]
+executor = ["std", "futures-executor/std"]
+io-compat = ["compat", "futures-util/io-compat"]
+read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
+std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
+thread-pool = ["executor", "futures-executor/thread-pool"]
+unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+[badges.travis-ci]
+repository = "rust-lang/futures-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..7095c19
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,52 @@
+[package]
+name = "futures"
+edition = "2018"
+version = "0.3.4"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+license = "MIT OR Apache-2.0"
+readme = "../README.md"
+keywords = ["futures", "async", "future"]
+repository = "https://github.com/rust-lang/futures-rs"
+homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures/0.3.0"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
+categories = ["asynchronous"]
+
+[badges]
+travis-ci = { repository = "rust-lang/futures-rs" }
+
+[dependencies]
+futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.4", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.4", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.4", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.4", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.4", default-features = false, features = ["sink"] }
+
+[features]
+default = ["std", "async-await", "executor"]
+std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
+alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
+async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+compat = ["std", "futures-util/compat"]
+io-compat = ["compat", "futures-util/io-compat"]
+executor = ["std", "futures-executor/std"]
+thread-pool = ["executor", "futures-executor/thread-pool"]
+
+# Unstable features
+# These features are outside of the normal semver guarantees and require the
+# `unstable` feature as an explicit opt-in to unstable API.
+unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
+bilock = ["futures-util/bilock"]
+read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
+
+[package.metadata.docs.rs]
+all-features = true
+
+[package.metadata.playground]
+features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
diff --git a/LICENSE b/LICENSE
new file mode 120000
index 0000000..6b579aa
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1 @@
+LICENSE-APACHE \ No newline at end of file
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
new file mode 100644
index 0000000..9eb0b09
--- /dev/null
+++ b/LICENSE-APACHE
@@ -0,0 +1,202 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
new file mode 100644
index 0000000..8ad082e
--- /dev/null
+++ b/LICENSE-MIT
@@ -0,0 +1,26 @@
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..42d8dba
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,18 @@
+name: "futures"
+description:
+ "An implementation of futures and streams featuring zero allocations, "
+ "composability, and iterator-like interfaces."
+
+third_party {
+ url {
+ type: HOMEPAGE
+ value: "https://crates.io/crates/futures"
+ }
+ url {
+ type: GIT
+ value: "https://github.com/rust-lang/futures-rs"
+ }
+ version: "0.3.4"
+ last_upgrade_date { year: 2020 month: 3 day: 17 }
+ license_type: NOTICE
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/NOTICE b/NOTICE
new file mode 120000
index 0000000..7a694c9
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1 @@
+LICENSE \ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..3cdb3d3
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,605 @@
+//! Abstractions for asynchronous programming.
+//!
+//! This crate provides a number of core abstractions for writing asynchronous
+//! code:
+//!
+//! - [Futures](crate::future::Future) are single eventual values produced by
+//! asynchronous computations. Some programming languages (e.g. JavaScript)
+//! call this concept "promise".
+//! - [Streams](crate::stream::Stream) represent a series of values
+//! produced asynchronously.
+//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of
+//! data.
+//! - [Executors](crate::executor) are responsible for running asynchronous
+//! tasks.
+//!
+//! The crate also contains abstractions for [asynchronous I/O](crate::io) and
+//! [cross-task communication](crate::channel).
+//!
+//! Underlying all of this is the *task system*, which is a form of lightweight
+//! threading. Large asynchronous computations are built up using futures,
+//! streams and sinks, and then spawned as independent tasks that are run to
+//! completion, but *do not block* the thread running them.
+//!
+//! The following example describes how the task system context is built and used
+//! within macros and keywords such as async and await!.
+//!
+//! ```rust
+//! # use futures::channel::mpsc;
+//! # use futures::executor; ///standard executors to provide a context for futures and streams
+//! # use futures::executor::ThreadPool;
+//! # use futures::StreamExt;
+//!
+//! fn main() {
+//! let pool = ThreadPool::new().expect("Failed to build pool");
+//! let (tx, rx) = mpsc::unbounded::<i32>();
+//!
+//! // Create a future by an async block, where async is responsible for an
+//! // implementation of Future. At this point no executor has been provided
+//! // to this future, so it will not be running.
+//! let fut_values = async {
+//! // Create another async block, again where the Future implementation
+//! // is generated by async. Since this is inside of a parent async block,
+//! // it will be provided with the executor of the parent block when the parent
+//! // block is executed.
+//! //
+//! // This executor chaining is done by Future::poll whose second argument
+//! // is a std::task::Context. This represents our executor, and the Future
+//! // implemented by this async block can be polled using the parent async
+//! // block's executor.
+//! let fut_tx_result = async move {
+//! (0..100).for_each(|v| {
+//! tx.unbounded_send(v).expect("Failed to send");
+//! })
+//! };
+//!
+//! // Use the provided thread pool to spawn the generated future
+//! // responsible for transmission
+//! pool.spawn_ok(fut_tx_result);
+//!
+//! let fut_values = rx
+//! .map(|v| v * 2)
+//! .collect();
+//!
+//! // Use the executor provided to this async block to wait for the
+//! // future to complete.
+//! fut_values.await
+//! };
+//!
+//! // Actually execute the above future, which will invoke Future::poll and
+//! // subsequenty chain appropriate Future::poll and methods needing executors
+//! // to drive all futures. Eventually fut_values will be driven to completion.
+//! let values: Vec<i32> = executor::block_on(fut_values);
+//!
+//! println!("Values={:?}", values);
+//! }
+//! ```
+//!
+//! The majority of examples and code snippets in this crate assume that they are
+//! inside an async block as written above.
+
+#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
+#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+// It cannot be included in the published code because this lints have false positives in the minimum required version.
+#![cfg_attr(test, warn(single_use_lifetimes))]
+#![warn(clippy::all)]
+
+#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
+
+#![doc(html_root_url = "https://docs.rs/futures/0.3.0")]
+
+#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
+compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
+
+#[cfg(all(feature = "bilock", not(feature = "unstable")))]
+compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
+
+#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
+compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
+
+#[doc(hidden)] pub use futures_core::future::{Future, TryFuture};
+#[doc(hidden)] pub use futures_util::future::{FutureExt, TryFutureExt};
+
+#[doc(hidden)] pub use futures_core::stream::{Stream, TryStream};
+#[doc(hidden)] pub use futures_util::stream::{StreamExt, TryStreamExt};
+
+#[doc(hidden)] pub use futures_sink::Sink;
+#[doc(hidden)] pub use futures_util::sink::SinkExt;
+
+#[cfg(feature = "std")]
+#[doc(hidden)] pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
+#[cfg(feature = "std")]
+#[doc(hidden)] pub use futures_util::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt};
+
+// Macro reexports
+pub use futures_core::ready; // Readiness propagation
+pub use futures_util::pin_mut;
+#[cfg(feature = "std")]
+#[cfg(feature = "async-await")]
+pub use futures_util::{pending, poll}; // Async-await
+
+#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+#[cfg(feature = "alloc")]
+pub mod channel {
+ //! Cross-task communication.
+ //!
+ //! Like threads, concurrent tasks sometimes need to communicate with each
+ //! other. This module contains two basic abstractions for doing so:
+ //!
+ //! - [oneshot](crate::channel::oneshot), a way of sending a single value
+ //! from one task to another.
+ //! - [mpsc](crate::channel::mpsc), a multi-producer, single-consumer
+ //! channel for sending values between tasks, analogous to the
+ //! similarly-named structure in the standard library.
+ //!
+ //! This module is only available when the `std` or `alloc` feature of this
+ //! library is activated, and it is activated by default.
+
+ pub use futures_channel::oneshot;
+
+ #[cfg(feature = "std")]
+ pub use futures_channel::mpsc;
+}
+
+#[cfg(feature = "compat")]
+pub mod compat {
+ //! Interop between `futures` 0.1 and 0.3.
+ //!
+ //! This module is only available when the `compat` feature of this
+ //! library is activated.
+
+ pub use futures_util::compat::{
+ Compat,
+ CompatSink,
+ Compat01As03,
+ Compat01As03Sink,
+ Executor01Future,
+ Executor01As03,
+ Executor01CompatExt,
+ Future01CompatExt,
+ Stream01CompatExt,
+ Sink01CompatExt,
+ };
+
+ #[cfg(feature = "io-compat")]
+ pub use futures_util::compat::{
+ AsyncRead01CompatExt,
+ AsyncWrite01CompatExt,
+ };
+}
+
+#[cfg(feature = "executor")]
+pub mod executor {
+ //! Task execution.
+ //!
+ //! All asynchronous computation occurs within an executor, which is
+ //! capable of spawning futures as tasks. This module provides several
+ //! built-in executors, as well as tools for building your own.
+ //!
+ //! This module is only available when the `executor` feature of this
+ //! library is activated, and it is activated by default.
+ //!
+ //! # Using a thread pool (M:N task scheduling)
+ //!
+ //! Most of the time tasks should be executed on a [thread
+ //! pool](crate::executor::ThreadPool). A small set of worker threads can
+ //! handle a very large set of spawned tasks (which are much lighter weight
+ //! than threads). Tasks spawned onto the pool with the
+ //! [`spawn_ok()`](crate::executor::ThreadPool::spawn_ok)
+ //! function will run ambiently on the created threads.
+ //!
+ //! # Spawning additional tasks
+ //!
+ //! Tasks can be spawned onto a spawner by calling its
+ //! [`spawn_obj`](crate::task::Spawn::spawn_obj) method directly.
+ //! In the case of `!Send` futures,
+ //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj)
+ //! can be used instead.
+ //!
+ //! # Single-threaded execution
+ //!
+ //! In addition to thread pools, it's possible to run a task (and the tasks
+ //! it spawns) entirely within a single thread via the
+ //! [`LocalPool`](crate::executor::LocalPool) executor. Aside from cutting
+ //! down on synchronization costs, this executor also makes it possible to
+ //! spawn non-`Send` tasks, via
+ //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj).
+ //! The `LocalPool` is best suited for running I/O-bound tasks that do
+ //! relatively little work between I/O operations.
+ //!
+ //! There is also a convenience function
+ //! [`block_on`](crate::executor::block_on) for simply running a future to
+ //! completion on the current thread.
+
+ pub use futures_executor::{
+ BlockingStream,
+ Enter, EnterError,
+ LocalSpawner, LocalPool,
+ block_on, block_on_stream, enter,
+ };
+
+ #[cfg(feature = "thread-pool")]
+ pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
+
+pub mod future {
+ //! Asynchronous values.
+ //!
+ //! This module contains:
+ //!
+ //! - The [`Future` trait](crate::future::Future).
+ //! - The [`FutureExt`](crate::future::FutureExt) trait, which provides
+ //! adapters for chaining and composing futures.
+ //! - Top-level future combinators like [`lazy`](crate::future::lazy) which
+ //! creates a future from a closure that defines its return value, and
+ //! [`ready`](crate::future::ready), which constructs a future with an
+ //! immediate defined value.
+
+ pub use futures_core::future::{
+ Future, TryFuture, FusedFuture,
+ };
+
+ #[cfg(feature = "alloc")]
+ pub use futures_core::future::{BoxFuture, LocalBoxFuture};
+
+ pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
+
+ pub use futures_util::future::{
+ lazy, Lazy,
+ maybe_done, MaybeDone,
+ pending, Pending,
+ poll_fn, PollFn,
+ ready, ok, err, Ready,
+ join, join3, join4, join5,
+ Join, Join3, Join4, Join5,
+ select, Select,
+ try_join, try_join3, try_join4, try_join5,
+ TryJoin, TryJoin3, TryJoin4, TryJoin5,
+ try_select, TrySelect,
+ Either,
+ OptionFuture,
+
+ FutureExt,
+ FlattenStream, Flatten, Fuse, Inspect, IntoStream, Map, Then, UnitError,
+ NeverError,
+
+ TryFutureExt,
+ AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
+ InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse,
+ };
+
+ #[cfg(feature = "alloc")]
+ pub use futures_util::future::{
+ join_all, JoinAll,
+ select_all, SelectAll,
+ try_join_all, TryJoinAll,
+ select_ok, SelectOk,
+ };
+
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ pub use futures_util::future::{
+ abortable, Abortable, AbortHandle, AbortRegistration, Aborted,
+ };
+
+ #[cfg(feature = "std")]
+ pub use futures_util::future::{
+ Remote, RemoteHandle,
+ CatchUnwind, Shared,
+ };
+}
+
+#[cfg(feature = "std")]
+pub mod io {
+ //! Asynchronous I/O.
+ //!
+ //! This module is the asynchronous version of `std::io`. It defines four
+ //! traits, [`AsyncRead`](crate::io::AsyncRead),
+ //! [`AsyncWrite`](crate::io::AsyncWrite),
+ //! [`AsyncSeek`](crate::io::AsyncSeek), and
+ //! [`AsyncBufRead`](crate::io::AsyncBufRead), which mirror the `Read`,
+ //! `Write`, `Seek`, and `BufRead` traits of the standard library. However,
+ //! these traits integrate
+ //! with the asynchronous task system, so that if an I/O object isn't ready
+ //! for reading (or writing), the thread is not blocked, and instead the
+ //! current task is queued to be woken when I/O is ready.
+ //!
+ //! In addition, the [`AsyncReadExt`](crate::io::AsyncReadExt),
+ //! [`AsyncWriteExt`](crate::io::AsyncWriteExt),
+ //! [`AsyncSeekExt`](crate::io::AsyncSeekExt), and
+ //! [`AsyncBufReadExt`](crate::io::AsyncBufReadExt) extension traits offer a
+ //! variety of useful combinators for operating with asynchronous I/O
+ //! objects, including ways to work with them using futures, streams and
+ //! sinks.
+ //!
+ //! This module is only available when the `std` feature of this
+ //! library is activated, and it is activated by default.
+
+ pub use futures_io::{
+ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
+ IoSlice, IoSliceMut, Result, SeekFrom,
+ };
+
+ #[cfg(feature = "read-initializer")]
+ pub use futures_io::Initializer;
+
+ pub use futures_util::io::{
+ AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
+ BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf,
+ empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
+ ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat,
+ Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
+ WriteVectored,
+ };
+}
+
+#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+#[cfg(feature = "alloc")]
+pub mod lock {
+ //! Futures-powered synchronization primitives.
+ //!
+ //! This module is only available when the `std` or `alloc` feature of this
+ //! library is activated, and it is activated by default.
+
+ #[cfg(feature = "bilock")]
+ pub use futures_util::lock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
+
+ #[cfg(feature = "std")]
+ pub use futures_util::lock::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
+}
+
+pub mod prelude {
+ //! A "prelude" for crates using the `futures` crate.
+ //!
+ //! This prelude is similar to the standard library's prelude in that you'll
+ //! almost always want to import its entire contents, but unlike the
+ //! standard library's prelude you'll have to do so manually:
+ //!
+ //! ```
+ //! # #[allow(unused_imports)]
+ //! use futures::prelude::*;
+ //! ```
+ //!
+ //! The prelude may grow over time as additional items see ubiquitous use.
+
+ pub use crate::future::{self, Future, TryFuture};
+ pub use crate::stream::{self, Stream, TryStream};
+ pub use crate::sink::{self, Sink};
+
+ #[doc(no_inline)]
+ pub use crate::future::{FutureExt as _, TryFutureExt as _};
+ #[doc(no_inline)]
+ pub use crate::stream::{StreamExt as _, TryStreamExt as _};
+ #[doc(no_inline)]
+ pub use crate::sink::SinkExt as _;
+
+ #[cfg(feature = "std")]
+ pub use crate::io::{
+ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead,
+ };
+
+ #[cfg(feature = "std")]
+ #[doc(no_inline)]
+ pub use crate::io::{
+ AsyncReadExt as _, AsyncWriteExt as _, AsyncSeekExt as _, AsyncBufReadExt as _,
+ };
+}
+
+pub mod sink {
+ //! Asynchronous sinks.
+ //!
+ //! This module contains:
+ //!
+ //! - The [`Sink` trait](crate::sink::Sink), which allows you to
+ //! asynchronously write data.
+ //! - The [`SinkExt`](crate::sink::SinkExt) trait, which provides adapters
+ //! for chaining and composing sinks.
+
+ pub use futures_sink::Sink;
+
+ pub use futures_util::sink::{
+ Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
+ SinkExt, Fanout, Drain, drain,
+ WithFlatMap,
+ };
+
+ #[cfg(feature = "alloc")]
+ pub use futures_util::sink::Buffer;
+}
+
+pub mod stream {
+ //! Asynchronous streams.
+ //!
+ //! This module contains:
+ //!
+ //! - The [`Stream` trait](crate::stream::Stream), for objects that can
+ //! asynchronously produce a sequence of values.
+ //! - The [`StreamExt`](crate::stream::StreamExt) trait, which provides
+ //! adapters for chaining and composing streams.
+ //! - Top-level stream contructors like [`iter`](crate::stream::iter)
+ //! which creates a stream from an iterator.
+
+ pub use futures_core::stream::{
+ Stream, TryStream, FusedStream,
+ };
+
+ #[cfg(feature = "alloc")]
+ pub use futures_core::stream::{BoxStream, LocalBoxStream};
+
+ pub use futures_util::stream::{
+ iter, Iter,
+ repeat, Repeat,
+ empty, Empty,
+ pending, Pending,
+ once, Once,
+ poll_fn, PollFn,
+ select, Select,
+ unfold, Unfold,
+ try_unfold, TryUnfold,
+
+ StreamExt,
+ Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold,
+ Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
+ SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeWhile,
+ Then, Zip,
+
+ TryStreamExt,
+ AndThen, ErrInto, MapOk, MapErr, OrElse,
+ InspectOk, InspectErr,
+ TryNext, TryForEach, TryFilter, TryFilterMap, TryFlatten,
+ TryCollect, TryConcat, TryFold, TrySkipWhile,
+ IntoStream,
+ };
+
+ #[cfg(feature = "alloc")]
+ pub use futures_util::stream::{
+ // For StreamExt:
+ Chunks,
+ };
+
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ pub use futures_util::stream::{
+ FuturesOrdered,
+ futures_unordered, FuturesUnordered,
+
+ // For StreamExt:
+ BufferUnordered, Buffered, ForEachConcurrent, SplitStream, SplitSink,
+ ReuniteError,
+
+ select_all, SelectAll,
+ };
+
+ #[cfg(feature = "std")]
+ pub use futures_util::stream::{
+ // For StreamExt:
+ CatchUnwind,
+ };
+
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ pub use futures_util::stream::{
+ // For TryStreamExt:
+ TryBufferUnordered, TryForEachConcurrent,
+ };
+
+ #[cfg(feature = "std")]
+ pub use futures_util::stream::IntoAsyncRead;
+}
+
+pub mod task {
+ //! Tools for working with tasks.
+ //!
+ //! This module contains:
+ //!
+ //! - [`Spawn`](crate::task::Spawn), a trait for spawning new tasks.
+ //! - [`Context`](crate::task::Context), a context of an asynchronous task,
+ //! including a handle for waking up the task.
+ //! - [`Waker`](crate::task::Waker), a handle for waking up a task.
+ //!
+ //! The remaining types and traits in the module are used for implementing
+ //! executors or dealing with synchronization issues around task wakeup.
+
+ pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
+
+ pub use futures_task::{
+ Spawn, LocalSpawn, SpawnError,
+ FutureObj, LocalFutureObj, UnsafeFutureObj,
+ };
+
+ pub use futures_util::task::noop_waker;
+
+ #[cfg(feature = "std")]
+ pub use futures_util::task::noop_waker_ref;
+
+ #[cfg(feature = "alloc")]
+ pub use futures_util::task::{SpawnExt, LocalSpawnExt};
+
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ pub use futures_util::task::{waker, waker_ref, WakerRef, ArcWake};
+
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ pub use futures_util::task::AtomicWaker;
+}
+
+pub mod never {
+ //! This module contains the `Never` type.
+ //!
+ //! Values of this type can never be created and will never exist.
+
+ pub use futures_util::never::Never;
+}
+
+// proc-macro re-export --------------------------------------
+
+// Not public API.
+#[doc(hidden)]
+pub use futures_core::core_reexport;
+
+// Not public API.
+#[cfg(feature = "async-await")]
+#[doc(hidden)]
+pub use futures_util::async_await;
+
+// Not public API.
+#[cfg(feature = "async-await")]
+#[doc(hidden)]
+pub mod inner_macro {
+ pub use futures_util::join;
+ pub use futures_util::try_join;
+ #[cfg(feature = "std")]
+ pub use futures_util::select;
+ pub use futures_util::select_biased;
+}
+
+#[cfg(feature = "async-await")]
+futures_util::document_join_macro! {
+ #[macro_export]
+ macro_rules! join { // replace `::futures_util` with `::futures` as the crate path
+ ($($tokens:tt)*) => {
+ $crate::inner_macro::join! {
+ futures_crate_path ( ::futures )
+ $( $tokens )*
+ }
+ }
+ }
+
+ #[macro_export]
+ macro_rules! try_join { // replace `::futures_util` with `::futures` as the crate path
+ ($($tokens:tt)*) => {
+ $crate::inner_macro::try_join! {
+ futures_crate_path ( ::futures )
+ $( $tokens )*
+ }
+ }
+ }
+}
+
+#[cfg(feature = "async-await")]
+futures_util::document_select_macro! {
+ #[cfg(feature = "std")]
+ #[macro_export]
+ macro_rules! select { // replace `::futures_util` with `::futures` as the crate path
+ ($($tokens:tt)*) => {
+ $crate::inner_macro::select! {
+ futures_crate_path ( ::futures )
+ $( $tokens )*
+ }
+ }
+ }
+
+ #[macro_export]
+ macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path
+ ($($tokens:tt)*) => {
+ $crate::inner_macro::select_biased! {
+ futures_crate_path ( ::futures )
+ $( $tokens )*
+ }
+ }
+ }
+}
diff --git a/tests/abortable.rs b/tests/abortable.rs
new file mode 100644
index 0000000..5925c9a
--- /dev/null
+++ b/tests/abortable.rs
@@ -0,0 +1,39 @@
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{abortable, Aborted, FutureExt};
+use futures::task::{Context, Poll};
+use futures_test::task::new_count_waker;
+
+#[test]
+fn abortable_works() {
+ let (_tx, a_rx) = oneshot::channel::<()>();
+ let (abortable_rx, abort_handle) = abortable(a_rx);
+
+ abort_handle.abort();
+ assert_eq!(Err(Aborted), block_on(abortable_rx));
+}
+
+#[test]
+fn abortable_awakens() {
+ let (_tx, a_rx) = oneshot::channel::<()>();
+ let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+ assert_eq!(counter, 0);
+ assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx));
+ assert_eq!(counter, 0);
+ abort_handle.abort();
+ assert_eq!(counter, 1);
+ assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
+}
+
+#[test]
+fn abortable_resolves() {
+ let (tx, a_rx) = oneshot::channel::<()>();
+ let (abortable_rx, _abort_handle) = abortable(a_rx);
+
+ tx.send(()).unwrap();
+
+ assert_eq!(Ok(Ok(())), block_on(abortable_rx));
+}
diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs
new file mode 100644
index 0000000..1940e4f
--- /dev/null
+++ b/tests/arc_wake.rs
@@ -0,0 +1,77 @@
+use futures::task::{self, ArcWake, Waker};
+use std::sync::{Arc, Mutex};
+
+struct CountingWaker {
+ nr_wake: Mutex<i32>,
+}
+
+impl CountingWaker {
+ fn new() -> CountingWaker {
+ CountingWaker {
+ nr_wake: Mutex::new(0),
+ }
+ }
+
+ fn wakes(&self) -> i32 {
+ *self.nr_wake.lock().unwrap()
+ }
+}
+
+impl ArcWake for CountingWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let mut lock = arc_self.nr_wake.lock().unwrap();
+ *lock += 1;
+ }
+}
+
+#[test]
+fn create_waker_from_arc() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!(2, Arc::strong_count(&some_w));
+ w1.wake_by_ref();
+ assert_eq!(1, some_w.wakes());
+
+ let w2 = w1.clone();
+ assert_eq!(3, Arc::strong_count(&some_w));
+
+ w2.wake_by_ref();
+ assert_eq!(2, some_w.wakes());
+
+ drop(w2);
+ assert_eq!(2, Arc::strong_count(&some_w));
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w));
+}
+
+struct PanicWaker;
+
+impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+}
+
+#[test]
+fn proper_refcount_on_wake_panic() {
+ let some_w = Arc::new(PanicWaker);
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap());
+ assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w)); // some_w
+}
+
+#[test]
+fn waker_ref_wake_same() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ let w2 = task::waker_ref(&some_w);
+ let w3 = w2.clone();
+
+ assert!(w1.will_wake(&w2));
+ assert!(w2.will_wake(&w3));
+}
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
new file mode 100644
index 0000000..bc717df
--- /dev/null
+++ b/tests/async_await_macros.rs
@@ -0,0 +1,360 @@
+#![recursion_limit="128"]
+
+use futures::{pending, pin_mut, poll, join, try_join, select};
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, FutureExt, poll_fn};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+
+#[test]
+fn poll_and_pending() {
+ let pending_once = async { pending!() };
+ block_on(async {
+ pin_mut!(pending_once);
+ assert_eq!(Poll::Pending, poll!(&mut pending_once));
+ assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
+ });
+}
+
+#[test]
+fn join() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = oneshot::channel::<i32>();
+
+ let fut = async {
+ let res = join!(rx1, rx2);
+ assert_eq!((Ok(1), Ok(2)), res);
+ };
+
+ block_on(async {
+ pin_mut!(fut);
+ assert_eq!(Poll::Pending, poll!(&mut fut));
+ tx1.send(1).unwrap();
+ assert_eq!(Poll::Pending, poll!(&mut fut));
+ tx2.send(2).unwrap();
+ assert_eq!(Poll::Ready(()), poll!(&mut fut));
+ });
+}
+
+#[test]
+fn select() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (_tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ let mut ran = false;
+ block_on(async {
+ select! {
+ res = rx1.fuse() => {
+ assert_eq!(Ok(1), res);
+ ran = true;
+ },
+ _ = rx2.fuse() => unreachable!(),
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_biased() {
+ use futures::select_biased;
+
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (_tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ let mut ran = false;
+ block_on(async {
+ select_biased! {
+ res = rx1.fuse() => {
+ assert_eq!(Ok(1), res);
+ ran = true;
+ },
+ _ = rx2.fuse() => unreachable!(),
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_streams() {
+ let (mut tx1, rx1) = mpsc::channel::<i32>(1);
+ let (mut tx2, rx2) = mpsc::channel::<i32>(1);
+ let mut rx1 = rx1.fuse();
+ let mut rx2 = rx2.fuse();
+ let mut ran = false;
+ let mut total = 0;
+ block_on(async {
+ let mut tx1_opt;
+ let mut tx2_opt;
+ select! {
+ _ = rx1.next() => panic!(),
+ _ = rx2.next() => panic!(),
+ default => {
+ tx1.send(2).await.unwrap();
+ tx2.send(3).await.unwrap();
+ tx1_opt = Some(tx1);
+ tx2_opt = Some(tx2);
+ }
+ complete => panic!(),
+ }
+ loop {
+ select! {
+ // runs first and again after default
+ x = rx1.next() => if let Some(x) = x { total += x; },
+ // runs second and again after default
+ x = rx2.next() => if let Some(x) = x { total += x; },
+ // runs third
+ default => {
+ assert_eq!(total, 5);
+ ran = true;
+ drop(tx1_opt.take().unwrap());
+ drop(tx2_opt.take().unwrap());
+ },
+ // runs last
+ complete => break,
+ };
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_can_move_uncompleted_futures() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ tx2.send(2).unwrap();
+ let mut ran = false;
+ let mut rx1 = rx1.fuse();
+ let mut rx2 = rx2.fuse();
+ block_on(async {
+ select! {
+ res = rx1 => {
+ assert_eq!(Ok(1), res);
+ assert_eq!(Ok(2), rx2.await);
+ ran = true;
+ },
+ res = rx2 => {
+ assert_eq!(Ok(2), res);
+ assert_eq!(Ok(1), rx1.await);
+ ran = true;
+ },
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_nested() {
+ let mut outer_fut = future::ready(1);
+ let mut inner_fut = future::ready(2);
+ let res = block_on(async {
+ select! {
+ x = outer_fut => {
+ select! {
+ y = inner_fut => x + y,
+ }
+ }
+ }
+ });
+ assert_eq!(res, 3);
+}
+
+#[test]
+fn select_size() {
+ let fut = async {
+ let mut ready = future::ready(0i32);
+ select! {
+ _ = ready => {},
+ }
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 24);
+
+ let fut = async {
+ let mut ready1 = future::ready(0i32);
+ let mut ready2 = future::ready(0i32);
+ select! {
+ _ = ready1 => {},
+ _ = ready2 => {},
+ }
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 40);
+}
+
+#[test]
+fn select_on_non_unpin_expressions() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let res = block_on(async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ };
+ select_res
+ });
+ assert_eq!(res, 5);
+}
+
+#[test]
+fn select_on_non_unpin_expressions_with_default() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let res = block_on(async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ default => { select_res = 7 },
+ };
+ select_res
+ });
+ assert_eq!(res, 5);
+}
+
+#[test]
+fn select_on_non_unpin_size() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let fut = async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ };
+ select_res
+ };
+
+ assert_eq!(48, std::mem::size_of_val(&fut));
+}
+
+#[test]
+fn select_can_be_used_as_expression() {
+ block_on(async {
+ let res = select! {
+ x = future::ready(7) => { x },
+ y = future::ready(3) => { y + 1 },
+ };
+ assert!(res == 7 || res == 4);
+ });
+}
+
+#[test]
+fn select_with_default_can_be_used_as_expression() {
+ fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
+ Poll::Pending
+ }
+
+ block_on(async {
+ let res = select! {
+ x = poll_fn(poll_always_pending::<i32>).fuse() => x,
+ y = poll_fn(poll_always_pending::<i32>).fuse() => { y + 1 },
+ default => 99,
+ };
+ assert_eq!(res, 99);
+ });
+}
+
+#[test]
+fn select_with_complete_can_be_used_as_expression() {
+ block_on(async {
+ let res = select! {
+ x = future::pending::<i32>() => { x },
+ y = future::pending::<i32>() => { y + 1 },
+ default => 99,
+ complete => 237,
+ };
+ assert_eq!(res, 237);
+ });
+}
+
+async fn require_mutable(_: &mut i32) {}
+async fn async_noop() {}
+
+#[test]
+fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
+ block_on(async {
+ let mut value = 234;
+ select! {
+ x = require_mutable(&mut value).fuse() => { },
+ y = async_noop().fuse() => {
+ value += 5;
+ },
+ }
+ });
+}
+
+#[test]
+fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
+ block_on(async {
+ let mut value = 234;
+ select! {
+ x = require_mutable(&mut value).fuse() => { },
+ y = async_noop().fuse() => {
+ value += 5;
+ },
+ default => {
+ value += 27;
+ },
+ }
+ });
+}
+
+#[test]
+fn join_size() {
+ let fut = async {
+ let ready = future::ready(0i32);
+ join!(ready)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 16);
+
+ let fut = async {
+ let ready1 = future::ready(0i32);
+ let ready2 = future::ready(0i32);
+ join!(ready1, ready2)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 28);
+}
+
+#[test]
+fn try_join_size() {
+ let fut = async {
+ let ready = future::ready(Ok::<i32, i32>(0));
+ try_join!(ready)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 16);
+
+ let fut = async {
+ let ready1 = future::ready(Ok::<i32, i32>(0));
+ let ready2 = future::ready(Ok::<i32, i32>(0));
+ try_join!(ready1, ready2)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 28);
+}
+
+#[test]
+fn join_doesnt_require_unpin() {
+ let _ = async {
+ join!(async {}, async {})
+ };
+}
+
+#[test]
+fn try_join_doesnt_require_unpin() {
+ let _ = async {
+ try_join!(
+ async { Ok::<(), ()>(()) },
+ async { Ok::<(), ()>(()) },
+ )
+ };
+}
diff --git a/tests/atomic_waker.rs b/tests/atomic_waker.rs
new file mode 100644
index 0000000..d9ce753
--- /dev/null
+++ b/tests/atomic_waker.rs
@@ -0,0 +1,49 @@
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::thread;
+
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::task::{AtomicWaker, Poll};
+
+#[test]
+fn basic() {
+ let atomic_waker = Arc::new(AtomicWaker::new());
+ let atomic_waker_copy = atomic_waker.clone();
+
+ let returned_pending = Arc::new(AtomicUsize::new(0));
+ let returned_pending_copy = returned_pending.clone();
+
+ let woken = Arc::new(AtomicUsize::new(0));
+ let woken_copy = woken.clone();
+
+ let t = thread::spawn(move || {
+ let mut pending_count = 0;
+
+ block_on(poll_fn(move |cx| {
+ if woken_copy.load(Ordering::Relaxed) == 1 {
+ Poll::Ready(())
+ } else {
+ // Assert we return pending exactly once
+ assert_eq!(0, pending_count);
+ pending_count += 1;
+ atomic_waker_copy.register(cx.waker());
+
+ returned_pending_copy.store(1, Ordering::Relaxed);
+
+ Poll::Pending
+ }
+ }))
+ });
+
+ while returned_pending.load(Ordering::Relaxed) == 0 {}
+
+ // give spawned thread some time to sleep in `block_on`
+ thread::yield_now();
+
+ woken.store(1, Ordering::Relaxed);
+ atomic_waker.wake();
+
+ t.join().unwrap();
+}
diff --git a/tests/basic_combinators.rs b/tests/basic_combinators.rs
new file mode 100644
index 0000000..fa65b6f
--- /dev/null
+++ b/tests/basic_combinators.rs
@@ -0,0 +1,98 @@
+use futures::future::{self, FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+
+#[test]
+fn basic_future_combinators() {
+ let (tx1, rx) = mpsc::channel();
+ let tx2 = tx1.clone();
+ let tx3 = tx1.clone();
+
+ let fut = future::ready(1)
+ .then(move |x| {
+ tx1.send(x).unwrap(); // Send 1
+ tx1.send(2).unwrap(); // Send 2
+ future::ready(3)
+ }).map(move |x| {
+ tx2.send(x).unwrap(); // Send 3
+ tx2.send(4).unwrap(); // Send 4
+ 5
+ }).map(move |x| {
+ tx3.send(x).unwrap(); // Send 5
+ });
+
+ assert!(rx.try_recv().is_err()); // Not started yet
+ fut.run_in_background(); // Start it
+ for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ assert!(rx.recv().is_err()); // Should be done
+}
+
+#[test]
+fn basic_try_future_combinators() {
+ let (tx1, rx) = mpsc::channel();
+ let tx2 = tx1.clone();
+ let tx3 = tx1.clone();
+ let tx4 = tx1.clone();
+ let tx5 = tx1.clone();
+ let tx6 = tx1.clone();
+ let tx7 = tx1.clone();
+ let tx8 = tx1.clone();
+ let tx9 = tx1.clone();
+ let tx10 = tx1.clone();
+
+ let fut = future::ready(Ok(1))
+ .and_then(move |x: i32| {
+ tx1.send(x).unwrap(); // Send 1
+ tx1.send(2).unwrap(); // Send 2
+ future::ready(Ok(3))
+ })
+ .or_else(move |x: i32| {
+ tx2.send(x).unwrap(); // Should not run
+ tx2.send(-1).unwrap();
+ future::ready(Ok(-1))
+ })
+ .map_ok(move |x: i32| {
+ tx3.send(x).unwrap(); // Send 3
+ tx3.send(4).unwrap(); // Send 4
+ 5
+ })
+ .map_err(move |x: i32| {
+ tx4.send(x).unwrap(); // Should not run
+ tx4.send(-1).unwrap();
+ -1
+ })
+ .map(move |x: Result<i32, i32>| {
+ tx5.send(x.unwrap()).unwrap(); // Send 5
+ tx5.send(6).unwrap(); // Send 6
+ Err(7) // Now return errors!
+ })
+ .and_then(move |x: i32| {
+ tx6.send(x).unwrap(); // Should not run
+ tx6.send(-1).unwrap();
+ future::ready(Err(-1))
+ })
+ .or_else(move |x: i32| {
+ tx7.send(x).unwrap(); // Send 7
+ tx7.send(8).unwrap(); // Send 8
+ future::ready(Err(9))
+ })
+ .map_ok(move |x: i32| {
+ tx8.send(x).unwrap(); // Should not run
+ tx8.send(-1).unwrap();
+ -1
+ })
+ .map_err(move |x: i32| {
+ tx9.send(x).unwrap(); // Send 9
+ tx9.send(10).unwrap(); // Send 10
+ 11
+ })
+ .map(move |x: Result<i32, i32>| {
+ tx10.send(x.err().unwrap()).unwrap(); // Send 11
+ tx10.send(12).unwrap(); // Send 12
+ });
+
+ assert!(rx.try_recv().is_err()); // Not started yet
+ fut.run_in_background(); // Start it
+ for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ assert!(rx.recv().is_err()); // Should be done
+}
diff --git a/tests/buffer_unordered.rs b/tests/buffer_unordered.rs
new file mode 100644
index 0000000..1c559c8
--- /dev/null
+++ b/tests/buffer_unordered.rs
@@ -0,0 +1,73 @@
+use futures::channel::{oneshot, mpsc};
+use futures::executor::{block_on, block_on_stream};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::mpsc as std_mpsc;
+use std::thread;
+
+#[test]
+#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
+fn works() {
+ const N: usize = 4;
+
+ let (mut tx, rx) = mpsc::channel(1);
+
+ let (tx2, rx2) = std_mpsc::channel();
+ let (tx3, rx3) = std_mpsc::channel();
+ let t1 = thread::spawn(move || {
+ for _ in 0..=N {
+ let (mytx, myrx) = oneshot::channel();
+ block_on(tx.send(myrx)).unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ rx2.recv().unwrap();
+ for _ in 0..N {
+ let (mytx, myrx) = oneshot::channel();
+ block_on(tx.send(myrx)).unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ });
+
+ let (tx4, rx4) = std_mpsc::channel();
+ let t2 = thread::spawn(move || {
+ for item in block_on_stream(rx.buffer_unordered(N)) {
+ tx4.send(item.unwrap()).unwrap();
+ }
+ });
+
+ let o1 = rx3.recv().unwrap();
+ let o2 = rx3.recv().unwrap();
+ let o3 = rx3.recv().unwrap();
+ let o4 = rx3.recv().unwrap();
+ assert!(rx4.try_recv().is_err());
+
+ o1.send(1).unwrap();
+ assert_eq!(rx4.recv(), Ok(1));
+ o3.send(3).unwrap();
+ assert_eq!(rx4.recv(), Ok(3));
+ tx2.send(()).unwrap();
+ o2.send(2).unwrap();
+ assert_eq!(rx4.recv(), Ok(2));
+ o4.send(4).unwrap();
+ assert_eq!(rx4.recv(), Ok(4));
+
+ let o5 = rx3.recv().unwrap();
+ let o6 = rx3.recv().unwrap();
+ let o7 = rx3.recv().unwrap();
+ let o8 = rx3.recv().unwrap();
+ let o9 = rx3.recv().unwrap();
+
+ o5.send(5).unwrap();
+ assert_eq!(rx4.recv(), Ok(5));
+ o8.send(8).unwrap();
+ assert_eq!(rx4.recv(), Ok(8));
+ o9.send(9).unwrap();
+ assert_eq!(rx4.recv(), Ok(9));
+ o7.send(7).unwrap();
+ assert_eq!(rx4.recv(), Ok(7));
+ o6.send(6).unwrap();
+ assert_eq!(rx4.recv(), Ok(6));
+
+ t1.join().unwrap();
+ t2.join().unwrap();
+}
diff --git a/tests/compat.rs b/tests/compat.rs
new file mode 100644
index 0000000..39adc7c
--- /dev/null
+++ b/tests/compat.rs
@@ -0,0 +1,17 @@
+#![cfg(feature = "compat")]
+
+use tokio::timer::Delay;
+use tokio::runtime::Runtime;
+use std::time::Instant;
+use futures::prelude::*;
+use futures::compat::Future01CompatExt;
+
+#[test]
+fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() {
+ let f = async {
+ Delay::new(Instant::now()).compat().await
+ };
+
+ let mut runtime = Runtime::new().unwrap();
+ runtime.block_on(f.boxed().compat()).unwrap();
+}
diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs
new file mode 100644
index 0000000..674e401
--- /dev/null
+++ b/tests/eager_drop.rs
@@ -0,0 +1,117 @@
+use futures::channel::oneshot;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use pin_utils::unsafe_pinned;
+use std::pin::Pin;
+use std::sync::mpsc;
+
+#[test]
+fn map_ok() {
+ // The closure given to `map_ok` should have been dropped by the time `map`
+ // runs.
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ future::ready::<Result<i32, i32>>(Err(1))
+ .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map(move |_| {
+ assert!(rx1.recv().is_err());
+ tx2.send(()).unwrap()
+ })
+ .run_in_background();
+
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn map_err() {
+ // The closure given to `map_err` should have been dropped by the time `map`
+ // runs.
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ future::ready::<Result<i32, i32>>(Ok(1))
+ .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map(move |_| {
+ assert!(rx1.recv().is_err());
+ tx2.send(()).unwrap()
+ })
+ .run_in_background();
+
+ rx2.recv().unwrap();
+}
+
+struct FutureData<F, T> {
+ _data: T,
+ future: F,
+}
+
+impl<F, T> FutureData<F, T> {
+ unsafe_pinned!(future: F);
+}
+
+impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
+ self.future().poll(cx)
+ }
+}
+
+#[test]
+fn then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<()>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(())
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(()).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn and_then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .and_then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Ok(())).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn or_else_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .or_else(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready::<Result<(), ()>>(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Err(())).unwrap();
+ rx2.recv().unwrap();
+}
diff --git a/tests/eventual.rs b/tests/eventual.rs
new file mode 100644
index 0000000..bff000d
--- /dev/null
+++ b/tests/eventual.rs
@@ -0,0 +1,159 @@
+use futures::channel::oneshot;
+use futures::executor::ThreadPool;
+use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
+use futures::task::SpawnExt;
+use std::sync::mpsc;
+use std::thread;
+
+fn run<F: Future + Send + 'static>(future: F) {
+ let tp = ThreadPool::new().unwrap();
+ tp.spawn(future.map(drop)).unwrap();
+}
+
+#[test]
+fn join1() {
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert_eq!(rx.recv(), Ok(1));
+ assert!(rx.recv().is_err());
+ drop(c2);
+}
+
+#[test]
+fn join4() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert!(rx.recv().is_ok());
+ drop(c2);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join5() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (c3, p3) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert!(rx.try_recv().is_err());
+ c3.send(3).unwrap();
+ assert_eq!(rx.recv(), Ok(((1, 2), 3)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select1() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ let (v, p2) = rx.recv().unwrap().into_inner();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_ok(move |v| tx.send(v).unwrap()));
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_ok(move |v| tx.send(v).unwrap()));
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_err(move |_v| tx.send(2).unwrap()));
+ drop(c2);
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select4() {
+ let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
+
+ let t = thread::spawn(move || {
+ for c in rx {
+ c.send(1).unwrap();
+ }
+ });
+
+ let (tx2, rx2) = mpsc::channel();
+ for _ in 0..10000 {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+
+ let tx3 = tx2.clone();
+ run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
+ tx.send(c1).unwrap();
+ rx2.recv().unwrap();
+ drop(c2);
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
diff --git a/tests/fuse.rs b/tests/fuse.rs
new file mode 100644
index 0000000..83f2c1c
--- /dev/null
+++ b/tests/fuse.rs
@@ -0,0 +1,12 @@
+use futures::future::{self, FutureExt};
+use futures::task::Context;
+use futures_test::task::panic_waker;
+
+#[test]
+fn fuse() {
+ let mut future = future::ready::<i32>(2).fuse();
+ let waker = panic_waker();
+ let mut cx = Context::from_waker(&waker);
+ assert!(future.poll_unpin(&mut cx).is_ready());
+ assert!(future.poll_unpin(&mut cx).is_pending());
+}
diff --git a/tests/future_obj.rs b/tests/future_obj.rs
new file mode 100644
index 0000000..c6b18fc
--- /dev/null
+++ b/tests/future_obj.rs
@@ -0,0 +1,33 @@
+use futures::future::{Future, FutureObj, FutureExt};
+use std::pin::Pin;
+use futures::task::{Context, Poll};
+
+#[test]
+fn dropping_does_not_segfault() {
+ FutureObj::new(async { String::new() }.boxed());
+}
+
+#[test]
+fn dropping_drops_the_future() {
+ let mut times_dropped = 0;
+
+ struct Inc<'a>(&'a mut u32);
+
+ impl Future for Inc<'_> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<()> {
+ unimplemented!()
+ }
+ }
+
+ impl Drop for Inc<'_> {
+ fn drop(&mut self) {
+ *self.0 += 1;
+ }
+ }
+
+ FutureObj::new(Inc(&mut times_dropped).boxed());
+
+ assert_eq!(times_dropped, 1);
+}
diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs
new file mode 100644
index 0000000..082c5ef
--- /dev/null
+++ b/tests/future_try_flatten_stream.rs
@@ -0,0 +1,82 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures::executor::block_on_stream;
+use futures::future::{ok, err, TryFutureExt};
+use futures::sink::Sink;
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+
+#[test]
+fn successful_future() {
+ let stream_items = vec![17, 19];
+ let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
+
+ let stream = future_of_a_stream.try_flatten_stream();
+
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Ok(17), iter.next().unwrap());
+ assert_eq!(Ok(19), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
+
+struct PanickingStream<T, E> {
+ _marker: PhantomData<(T, E)>
+}
+
+impl<T, E> Stream for PanickingStream<T, E> {
+ type Item = Result<T, E>;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
+}
+
+#[test]
+fn failed_future() {
+ let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
+ let stream = future_of_a_stream.try_flatten_stream();
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Err(10), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
+
+struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
+
+impl<T, E, Item> Stream for StreamSink<T, E, Item> {
+ type Item = Result<T, E>;
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
+}
+
+impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
+ type Error = E;
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
+ panic!()
+ }
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+}
+
+fn assert_stream<S: Stream>(_: &S) {}
+fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
+fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
+
+#[test]
+fn assert_impls() {
+ let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream();
+ assert_stream(&s);
+ assert_sink(&s);
+ assert_stream_sink(&s);
+ let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink();
+ assert_stream(&s);
+ assert_sink(&s);
+ assert_stream_sink(&s);
+}
diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs
new file mode 100644
index 0000000..d06b62f
--- /dev/null
+++ b/tests/futures_ordered.rs
@@ -0,0 +1,83 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt, TryFutureExt};
+use futures::stream::{StreamExt, FuturesOrdered};
+use futures_test::task::noop_context;
+use std::any::Any;
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
+
+ b_tx.send(99).unwrap();
+ assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(Some(Ok(99)), iter.next());
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(None, iter.next());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ a_rx.boxed(),
+ join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
+ ].into_iter().collect::<FuturesOrdered<_>>();
+
+ let mut cx = noop_context();
+ a_tx.send(33).unwrap();
+ b_tx.send(33).unwrap();
+ assert!(stream.poll_next_unpin(&mut cx).is_ready());
+ assert!(stream.poll_next_unpin(&mut cx).is_pending());
+ c_tx.send(33).unwrap();
+ assert!(stream.poll_next_unpin(&mut cx).is_ready());
+}
+
+#[test]
+fn from_iterator() {
+ let stream = vec![
+ future::ready::<i32>(1),
+ future::ready::<i32>(2),
+ future::ready::<i32>(3)
+ ].into_iter().collect::<FuturesOrdered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
+}
+
+#[test]
+fn queue_never_unblocked() {
+ let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+ let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+ let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+
+ let mut stream = vec![
+ Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
+ Box::new(future::try_select(b_rx, c_rx)
+ .map_err(|e| e.factor_first().0)
+ .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
+ ].into_iter().collect::<FuturesOrdered<_>>();
+
+ let cx = &mut noop_context();
+ for _ in 0..10 {
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ }
+
+ b_tx.send(Box::new(())).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ c_tx.send(Box::new(())).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+}
diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs
new file mode 100644
index 0000000..57eb98f
--- /dev/null
+++ b/tests/futures_unordered.rs
@@ -0,0 +1,287 @@
+use std::marker::Unpin;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt};
+use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::noop_context;
+use futures_test::{assert_stream_done, assert_stream_next};
+
+#[test]
+fn is_terminated() {
+ let mut cx = noop_context();
+ let mut tasks = FuturesUnordered::new();
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+
+ // Test that the sentinel value doesn't leak
+ assert_eq!(tasks.is_empty(), true);
+ assert_eq!(tasks.len(), 0);
+ assert_eq!(tasks.iter_mut().len(), 0);
+
+ tasks.push(future::ready(1));
+
+ assert_eq!(tasks.is_empty(), false);
+ assert_eq!(tasks.len(), 1);
+ assert_eq!(tasks.iter_mut().len(), 1);
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+}
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut iter = block_on_stream(
+ vec![a_rx, b_rx, c_rx]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>(),
+ );
+
+ b_tx.send(99).unwrap();
+ assert_eq!(Some(Ok(99)), iter.next());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(None, iter.next());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ a_rx.boxed(),
+ join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ a_tx.send(9).unwrap();
+ b_tx.send(10).unwrap();
+
+ let mut cx = noop_context();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9))));
+ c_tx.send(20).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30))));
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
+}
+
+#[test]
+fn from_iterator() {
+ let stream = vec![
+ future::ready::<i32>(1),
+ future::ready::<i32>(2),
+ future::ready::<i32>(3),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
+}
+
+#[test]
+fn finished_future() {
+ let (_a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
+ Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let cx = &mut noop_context();
+ for _ in 0..10 {
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ }
+
+ b_tx.send(12).unwrap();
+ c_tx.send(3).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_ready());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+}
+
+#[test]
+fn iter_mut_cancel() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![a_rx, b_rx, c_rx]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ for rx in stream.iter_mut() {
+ rx.close();
+ }
+
+ let mut iter = block_on_stream(stream);
+
+ assert!(a_tx.is_canceled());
+ assert!(b_tx.is_canceled());
+ assert!(c_tx.is_canceled());
+
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn iter_mut_len() {
+ let mut stream = vec![
+ future::pending::<()>(),
+ future::pending::<()>(),
+ future::pending::<()>(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut iter_mut = stream.iter_mut();
+ assert_eq!(iter_mut.len(), 3);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 2);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 1);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 0);
+ assert!(iter_mut.next().is_none());
+}
+
+#[test]
+fn iter_cancel() {
+ struct AtomicCancel<F> {
+ future: F,
+ cancel: AtomicBool,
+ }
+
+ impl<F: Future + Unpin> Future for AtomicCancel<F> {
+ type Output = Option<<F as Future>::Output>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.cancel.load(Ordering::Relaxed) {
+ Poll::Ready(None)
+ } else {
+ self.future.poll_unpin(cx).map(Some)
+ }
+ }
+ }
+
+ impl<F: Future + Unpin> AtomicCancel<F> {
+ fn new(future: F) -> Self {
+ Self { future, cancel: AtomicBool::new(false) }
+ }
+ }
+
+ let stream = vec![
+ AtomicCancel::new(future::pending::<()>()),
+ AtomicCancel::new(future::pending::<()>()),
+ AtomicCancel::new(future::pending::<()>()),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ for f in stream.iter() {
+ f.cancel.store(true, Ordering::Relaxed);
+ }
+
+ let mut iter = block_on_stream(stream);
+
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn iter_len() {
+ let stream = vec![
+ future::pending::<()>(),
+ future::pending::<()>(),
+ future::pending::<()>(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn futures_not_moved_after_poll() {
+ // Future that will be ready after being polled twice,
+ // asserting that it does not move.
+ let fut = future::ready(()).pending_once().assert_unmoved();
+ let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
+ assert_stream_next!(stream, ());
+ assert_stream_next!(stream, ());
+ assert_stream_next!(stream, ());
+ assert_stream_done!(stream);
+}
+
+#[test]
+fn len_valid_during_out_of_order_completion() {
+ // Complete futures out-of-order and add new futures afterwards to ensure
+ // length values remain correct.
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut cx = noop_context();
+ let mut stream = FuturesUnordered::new();
+ assert_eq!(stream.len(), 0);
+
+ stream.push(a_rx);
+ assert_eq!(stream.len(), 1);
+ stream.push(b_rx);
+ assert_eq!(stream.len(), 2);
+ stream.push(c_rx);
+ assert_eq!(stream.len(), 3);
+
+ b_tx.send(4).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4))));
+ assert_eq!(stream.len(), 2);
+
+ stream.push(d_rx);
+ assert_eq!(stream.len(), 3);
+
+ c_tx.send(5).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5))));
+ assert_eq!(stream.len(), 2);
+
+ d_tx.send(6).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6))));
+ assert_eq!(stream.len(), 1);
+
+ a_tx.send(7).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
+ assert_eq!(stream.len(), 0);
+}
diff --git a/tests/inspect.rs b/tests/inspect.rs
new file mode 100644
index 0000000..42f6f73
--- /dev/null
+++ b/tests/inspect.rs
@@ -0,0 +1,14 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+
+#[test]
+fn smoke() {
+ let mut counter = 0;
+
+ {
+ let work = future::ready::<i32>(40).inspect(|val| { counter += *val; });
+ assert_eq!(block_on(work), 40);
+ }
+
+ assert_eq!(counter, 40);
+}
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
new file mode 100644
index 0000000..a3d723a
--- /dev/null
+++ b/tests/io_buf_reader.rs
@@ -0,0 +1,321 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+ AsyncSeek, AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt,
+ AllowStdIo, BufReader, Cursor, SeekFrom,
+};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::cmp;
+use std::io;
+use std::pin::Pin;
+
+/// A dummy reader intended at testing short-reads propagation.
+struct ShortReader {
+ lengths: Vec<usize>,
+}
+
+impl io::Read for ShortReader {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ if self.lengths.is_empty() {
+ Ok(0)
+ } else {
+ Ok(self.lengths.remove(0))
+ }
+ }
+}
+
+macro_rules! run_fill_buf {
+ ($reader:expr) => {{
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ break x;
+ }
+ }
+ }};
+}
+
+#[test]
+fn test_buffered_reader() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, inner);
+
+ let mut buf = [0, 0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+}
+
+#[test]
+fn test_buffered_reader_seek() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
+
+ assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+}
+
+#[test]
+fn test_buffered_reader_seek_underflow() {
+ // gimmick reader that yields its position modulo 256 for each byte
+ struct PositionReader {
+ pos: u64
+ }
+ impl io::Read for PositionReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let len = buf.len();
+ for x in buf {
+ *x = self.pos as u8;
+ self.pos = self.pos.wrapping_add(1);
+ }
+ Ok(len)
+ }
+ }
+ impl io::Seek for PositionReader {
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+ match pos {
+ SeekFrom::Start(n) => {
+ self.pos = n;
+ }
+ SeekFrom::Current(n) => {
+ self.pos = self.pos.wrapping_add(n as u64);
+ }
+ SeekFrom::End(n) => {
+ self.pos = u64::max_value().wrapping_add(n as u64);
+ }
+ }
+ Ok(self.pos)
+ }
+ }
+
+ let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
+ assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
+ assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
+ // the following seek will require two underlying seeks
+ let expected = 9_223_372_036_854_775_802;
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
+ assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
+ // seeking to 0 should empty the buffer.
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
+ assert_eq!(reader.get_ref().get_ref().pos, expected);
+}
+
+#[test]
+fn test_short_reads() {
+ let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+ let mut reader = BufReader::new(AllowStdIo::new(inner));
+ let mut buf = [0, 0];
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+}
+
+struct MaybePending<'a> {
+ inner: &'a [u8],
+ ready_read: bool,
+ ready_fill_buf: bool,
+}
+
+impl<'a> MaybePending<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self { inner, ready_read: false, ready_fill_buf: false }
+ }
+}
+
+impl AsyncRead for MaybePending<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ if self.ready_read {
+ self.ready_read = false;
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ } else {
+ self.ready_read = true;
+ Poll::Pending
+ }
+ }
+}
+
+impl AsyncBufRead for MaybePending<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ if self.ready_fill_buf {
+ self.ready_fill_buf = false;
+ if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
+ let len = cmp::min(2, self.inner.len());
+ Poll::Ready(Ok(&self.inner[0..len]))
+ } else {
+ self.ready_fill_buf = true;
+ Poll::Pending
+ }
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.inner = &self.inner[amt..];
+ }
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
+
+ let mut buf = [0, 0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
+}
+
+#[test]
+fn maybe_pending_buf_read() {
+ let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
+ let mut reader = BufReader::with_capacity(2, inner);
+ let mut v = Vec::new();
+ run(reader.read_until(3, &mut v)).unwrap();
+ assert_eq!(v, [0, 1, 2, 3]);
+ v.clear();
+ run(reader.read_until(1, &mut v)).unwrap();
+ assert_eq!(v, [1]);
+ v.clear();
+ run(reader.read_until(8, &mut v)).unwrap();
+ assert_eq!(v, [0]);
+ v.clear();
+ run(reader.read_until(9, &mut v)).unwrap();
+ assert_eq!(v, []);
+}
+
+struct MaybePendingSeek<'a> {
+ inner: Cursor<&'a [u8]>,
+ ready: bool,
+}
+
+impl<'a> MaybePendingSeek<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self { inner: Cursor::new(inner), ready: true }
+ }
+}
+
+impl AsyncRead for MaybePendingSeek<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl AsyncBufRead for MaybePendingSeek<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ let this: *mut Self = &mut *self as *mut _;
+ Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ Pin::new(&mut self.inner).consume(amt)
+ }
+}
+
+impl AsyncSeek for MaybePendingSeek<'_> {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
+ }
+}
+
+// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
+#[test]
+fn maybe_pending_seek() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+
+ assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+}
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
new file mode 100644
index 0000000..7bdcd16
--- /dev/null
+++ b/tests/io_buf_writer.rs
@@ -0,0 +1,236 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::io;
+use std::pin::Pin;
+
+#[test]
+fn buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, Vec::new());
+
+ block_on(writer.write(&[0, 1])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[2])).unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[3])).unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ block_on(writer.write(&[4])).unwrap();
+ block_on(writer.write(&[5])).unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ block_on(writer.write(&[6])).unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
+
+ block_on(writer.write(&[7, 8])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ block_on(writer.write(&[9, 10, 11])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+}
+
+#[test]
+fn buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, Vec::new());
+ block_on(w.write(&[0, 1])).unwrap();
+ assert_eq!(*w.get_ref(), []);
+ block_on(w.flush()).unwrap();
+ let w = w.into_inner();
+ assert_eq!(w, [0, 1]);
+}
+
+#[test]
+fn buf_writer_seek() {
+ // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
+ // use `Vec::new` instead of `vec![0; 8]`.
+ let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
+ block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
+ block_on(w.write_all(&[6, 7])).unwrap();
+ assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8));
+ assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
+ assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2));
+ block_on(w.write_all(&[8, 9])).unwrap();
+ block_on(w.flush()).unwrap();
+ assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
+}
+
+struct MaybePending {
+ inner: Vec<u8>,
+ ready: bool,
+}
+
+impl MaybePending {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner, ready: false }
+ }
+}
+
+impl AsyncWrite for MaybePending {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending_buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
+
+ run(writer.write(&[0, 1])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.write(&[2])).unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.write(&[3])).unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ run(writer.write(&[4])).unwrap();
+ run(writer.write(&[5])).unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ run(writer.write(&[6])).unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]);
+
+ run(writer.write(&[7, 8])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ run(writer.write(&[9, 10, 11])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+
+ run(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+}
+
+#[test]
+fn maybe_pending_buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
+ run(w.write(&[0, 1])).unwrap();
+ assert_eq!(&w.get_ref().inner, &[]);
+ run(w.flush()).unwrap();
+ let w = w.into_inner().inner;
+ assert_eq!(w, [0, 1]);
+}
+
+
+struct MaybePendingSeek {
+ inner: Cursor<Vec<u8>>,
+ ready_write: bool,
+ ready_seek: bool,
+}
+
+impl MaybePendingSeek {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
+ }
+}
+
+impl AsyncWrite for MaybePendingSeek {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready_write {
+ self.ready_write = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready_write = true;
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
+}
+
+impl AsyncSeek for MaybePendingSeek {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready_seek {
+ self.ready_seek = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready_seek = true;
+ Poll::Pending
+ }
+ }
+}
+
+#[test]
+fn maybe_pending_buf_writer_seek() {
+ // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
+ // use `Vec::new` instead of `vec![0; 8]`.
+ let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8]));
+ run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
+ run(w.write_all(&[6, 7])).unwrap();
+ assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8));
+ assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
+ assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2));
+ run(w.write_all(&[8, 9])).unwrap();
+ run(w.flush()).unwrap();
+ assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
+}
diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs
new file mode 100644
index 0000000..4f80a75
--- /dev/null
+++ b/tests/io_cursor.rs
@@ -0,0 +1,29 @@
+use assert_matches::assert_matches;
+use futures::future::lazy;
+use futures::io::{AsyncWrite, Cursor};
+use futures::task::Poll;
+use std::pin::Pin;
+
+#[test]
+fn cursor_asyncwrite_vec() {
+ let mut cursor = Cursor::new(vec![0; 5]);
+ futures::executor::block_on(lazy(|cx| {
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(2)));
+ }));
+ assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]);
+}
+
+#[test]
+fn cursor_asyncwrite_box() {
+ let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice());
+ futures::executor::block_on(lazy(|cx| {
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(0)));
+ }));
+ assert_eq!(&*cursor.into_inner(), [1, 2, 3, 4, 5]);
+}
diff --git a/tests/io_lines.rs b/tests/io_lines.rs
new file mode 100644
index 0000000..39eafa9
--- /dev/null
+++ b/tests/io_lines.rs
@@ -0,0 +1,62 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+macro_rules! block_on_next {
+ ($expr:expr) => {
+ block_on($expr.next()).unwrap().unwrap()
+ };
+}
+
+#[test]
+fn lines() {
+ let buf = Cursor::new(&b"12\r"[..]);
+ let mut s = buf.lines();
+ assert_eq!(block_on_next!(s), "12\r".to_string());
+ assert!(block_on(s.next()).is_none());
+
+ let buf = Cursor::new(&b"12\r\n\n"[..]);
+ let mut s = buf.lines();
+ assert_eq!(block_on_next!(s), "12".to_string());
+ assert_eq!(block_on_next!(s), "".to_string());
+ assert!(block_on(s.next()).is_none());
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+macro_rules! run_next {
+ ($expr:expr) => {
+ run($expr.next()).unwrap().unwrap()
+ };
+}
+
+#[test]
+fn maybe_pending() {
+ let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut s = buf.lines();
+ assert_eq!(run_next!(s), "12\r".to_string());
+ assert!(run(s.next()).is_none());
+
+ let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut s = buf.lines();
+ assert_eq!(run_next!(s), "12".to_string());
+ assert_eq!(run_next!(s), "".to_string());
+ assert!(run(s.next()).is_none());
+}
diff --git a/tests/io_read.rs b/tests/io_read.rs
new file mode 100644
index 0000000..f99c4ed
--- /dev/null
+++ b/tests/io_read.rs
@@ -0,0 +1,65 @@
+use futures::io::AsyncRead;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct MockReader {
+ fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockReader {
+ pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockReader { fun: Box::new(fun) }
+ }
+}
+
+impl AsyncRead for MockReader {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8]
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
+}
+
+/// Verifies that the default implementation of `poll_read_vectored`
+/// calls `poll_read` with an empty slice if no buffers are provided.
+#[test]
+fn read_vectored_no_buffers() {
+ let mut reader = MockReader::new(|buf| {
+ assert_eq!(buf, b"");
+ Err(io::ErrorKind::BrokenPipe.into()).into()
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [];
+
+ let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
+}
+
+/// Verifies that the default implementation of `poll_read_vectored`
+/// calls `poll_read` with the first non-empty buffer.
+#[test]
+fn read_vectored_first_non_empty() {
+ let mut reader = MockReader::new(|buf| {
+ assert_eq!(buf.len(), 4);
+ buf.copy_from_slice(b"four");
+ Poll::Ready(Ok(4))
+ });
+ let cx = &mut panic_context();
+ let mut buf = [0; 4];
+ let bufs = &mut [
+ io::IoSliceMut::new(&mut []),
+ io::IoSliceMut::new(&mut []),
+ io::IoSliceMut::new(&mut buf),
+ ];
+
+ let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Ok(4)));
+ assert_eq!(buf, b"four"[..]);
+}
+
diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs
new file mode 100644
index 0000000..4941773
--- /dev/null
+++ b/tests/io_read_exact.rs
@@ -0,0 +1,17 @@
+use futures::executor::block_on;
+use futures::io::AsyncReadExt;
+
+#[test]
+fn read_exact() {
+ let mut reader: &[u8] = &[1, 2, 3, 4, 5];
+ let mut out = [0u8; 3];
+
+ let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out
+ assert!(res.is_ok());
+ assert_eq!(out, [1,2,3]);
+ assert_eq!(reader.len(), 2);
+
+ let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left
+ assert!(res.is_err());
+ assert_eq!(reader.len(), 0);
+}
diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs
new file mode 100644
index 0000000..d1dba5e
--- /dev/null
+++ b/tests/io_read_line.rs
@@ -0,0 +1,60 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_line() {
+ let mut buf = Cursor::new(b"12");
+ let mut v = String::new();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
+ assert_eq!(v, "12");
+
+ let mut buf = Cursor::new(b"12\n\n");
+ let mut v = String::new();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
+ assert_eq!(v, "12\n");
+ v.clear();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
+ assert_eq!(v, "\n");
+ v.clear();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let mut buf = b"12".interleave_pending();
+ let mut v = String::new();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
+ assert_eq!(v, "12");
+
+ let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut v = String::new();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
+ assert_eq!(v, "12\n");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
+ assert_eq!(v, "\n");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+}
diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs
new file mode 100644
index 0000000..db825af
--- /dev/null
+++ b/tests/io_read_to_string.rs
@@ -0,0 +1,45 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_to_string() {
+ let mut c = Cursor::new(&b""[..]);
+ let mut v = String::new();
+ assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+
+ let mut c = Cursor::new(&b"1"[..]);
+ let mut v = String::new();
+ assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1);
+ assert_eq!(v, "1");
+
+ let mut c = Cursor::new(&b"\xff"[..]);
+ let mut v = String::new();
+ assert!(block_on(c.read_to_string(&mut v)).is_err());
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn interleave_pending() {
+ let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+
+ let mut v = String::new();
+ assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5);
+ assert_eq!(v, "12333");
+}
diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs
new file mode 100644
index 0000000..5152281
--- /dev/null
+++ b/tests/io_read_until.rs
@@ -0,0 +1,60 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_until() {
+ let mut buf = Cursor::new(b"12");
+ let mut v = Vec::new();
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
+ assert_eq!(v, b"12");
+
+ let mut buf = Cursor::new(b"1233");
+ let mut v = Vec::new();
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3);
+ assert_eq!(v, b"123");
+ v.truncate(0);
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.truncate(0);
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 0);
+ assert_eq!(v, []);
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let mut buf = b"12".interleave_pending();
+ let mut v = Vec::new();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
+ assert_eq!(v, b"12");
+
+ let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut v = Vec::new();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3);
+ assert_eq!(v, b"123");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 0);
+ assert_eq!(v, []);
+}
diff --git a/tests/io_window.rs b/tests/io_window.rs
new file mode 100644
index 0000000..98df69c
--- /dev/null
+++ b/tests/io_window.rs
@@ -0,0 +1,26 @@
+use futures::io::Window;
+
+#[test]
+fn set() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(..3);
+ buffer.set(3..3);
+ buffer.set(3..=2); // == 3..3
+ buffer.set(0..2);
+
+ assert_eq!(buffer.as_ref(), &[1, 2]);
+}
+
+#[test]
+#[should_panic]
+fn set_panic_out_of_bounds() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(2..4);
+}
+
+#[test]
+#[should_panic]
+fn set_panic_start_is_greater_than_end() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(3..2);
+}
diff --git a/tests/io_write.rs b/tests/io_write.rs
new file mode 100644
index 0000000..b963444
--- /dev/null
+++ b/tests/io_write.rs
@@ -0,0 +1,70 @@
+use futures::io::AsyncWrite;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct MockWriter {
+ fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockWriter {
+ pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockWriter { fun: Box::new(fun) }
+ }
+}
+
+impl AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
+}
+
+/// Verifies that the default implementation of `poll_write_vectored`
+/// calls `poll_write` with an empty slice if no buffers are provided.
+#[test]
+fn write_vectored_no_buffers() {
+ let mut writer = MockWriter::new(|buf| {
+ assert_eq!(buf, b"");
+ Err(io::ErrorKind::BrokenPipe.into()).into()
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [];
+
+ let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
+}
+
+/// Verifies that the default implementation of `poll_write_vectored`
+/// calls `poll_write` with the first non-empty buffer.
+#[test]
+fn write_vectored_first_non_empty() {
+ let mut writer = MockWriter::new(|buf| {
+ assert_eq!(buf, b"four");
+ Poll::Ready(Ok(4))
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"four")
+ ];
+
+ let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Ok(4)));
+}
+
diff --git a/tests/join_all.rs b/tests/join_all.rs
new file mode 100644
index 0000000..63967bf
--- /dev/null
+++ b/tests/join_all.rs
@@ -0,0 +1,43 @@
+use futures_util::future::*;
+use std::future::Future;
+use futures::executor::block_on;
+use std::fmt::Debug;
+
+fn assert_done<T, F>(actual_fut: F, expected: T)
+where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+{
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
+ assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ // REVIEW: should this be implemented?
+ // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `JoinAll`.
+ fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+ let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
+ Box::new(join_all(iter))
+ }
+
+ assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
+}
+
+#[test]
+fn join_all_from_iter() {
+ assert_done(
+ || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
+ vec![1, 2],
+ )
+}
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
new file mode 100644
index 0000000..111f65a
--- /dev/null
+++ b/tests/macro_comma_support.rs
@@ -0,0 +1,42 @@
+#[macro_use]
+extern crate futures;
+
+use futures::{
+ executor::block_on,
+ future::{self, FutureExt},
+ task::Poll,
+};
+
+#[test]
+fn ready() {
+ block_on(future::poll_fn(|_| {
+ ready!(Poll::Ready(()),);
+ Poll::Ready(())
+ }))
+}
+
+#[test]
+fn poll() {
+ block_on(async {
+ let _ = poll!(async {}.boxed(),);
+ })
+}
+
+#[test]
+fn join() {
+ block_on(async {
+ let future1 = async { 1 };
+ let future2 = async { 2 };
+ join!(future1, future2,);
+ })
+}
+
+#[test]
+fn try_join() {
+ block_on(async {
+ let future1 = async { 1 }.never_error();
+ let future2 = async { 2 }.never_error();
+ try_join!(future1, future2,)
+ })
+ .unwrap();
+}
diff --git a/tests/mutex.rs b/tests/mutex.rs
new file mode 100644
index 0000000..bad53a9
--- /dev/null
+++ b/tests/mutex.rs
@@ -0,0 +1,69 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::{ready, FutureExt};
+use futures::lock::Mutex;
+use futures::stream::StreamExt;
+use futures::task::{Context, SpawnExt};
+use futures_test::future::FutureTestExt;
+use futures_test::task::{new_count_waker, panic_context};
+use std::sync::Arc;
+
+#[test]
+fn mutex_acquire_uncontested() {
+ let mutex = Mutex::new(());
+ for _ in 0..10 {
+ assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready());
+ }
+}
+
+#[test]
+fn mutex_wakes_waiters() {
+ let mutex = Mutex::new(());
+ let (waker, counter) = new_count_waker();
+ let lock = mutex.lock().poll_unpin(&mut panic_context());
+ assert!(lock.is_ready());
+
+ let mut cx = Context::from_waker(&waker);
+ let mut waiter = mutex.lock();
+ assert!(waiter.poll_unpin(&mut cx).is_pending());
+ assert_eq!(counter, 0);
+
+ drop(lock);
+
+ assert_eq!(counter, 1);
+ assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
+}
+
+#[test]
+fn mutex_contested() {
+ let (tx, mut rx) = mpsc::unbounded();
+ let pool = futures::executor::ThreadPool::builder()
+ .pool_size(16)
+ .create()
+ .unwrap();
+
+ let tx = Arc::new(tx);
+ let mutex = Arc::new(Mutex::new(0));
+
+ let num_tasks = 1000;
+ for _ in 0..num_tasks {
+ let tx = tx.clone();
+ let mutex = mutex.clone();
+ pool.spawn(async move {
+ let mut lock = mutex.lock().await;
+ ready(()).pending_once().await;
+ *lock += 1;
+ tx.unbounded_send(()).unwrap();
+ drop(lock);
+ })
+ .unwrap();
+ }
+
+ block_on(async {
+ for _ in 0..num_tasks {
+ rx.next().await.unwrap();
+ }
+ let lock = mutex.lock().await;
+ assert_eq!(num_tasks, *lock);
+ })
+}
diff --git a/tests/object_safety.rs b/tests/object_safety.rs
new file mode 100644
index 0000000..30c892f
--- /dev/null
+++ b/tests/object_safety.rs
@@ -0,0 +1,49 @@
+fn assert_is_object_safe<T>() {}
+
+#[test]
+fn future() {
+ // `FutureExt`, `TryFutureExt` and `UnsafeFutureObj` are not object safe.
+ use futures::future::{FusedFuture, Future, TryFuture};
+
+ assert_is_object_safe::<&dyn Future<Output = ()>>();
+ assert_is_object_safe::<&dyn FusedFuture<Output = ()>>();
+ assert_is_object_safe::<&dyn TryFuture<Ok = (), Error = (), Output = Result<(), ()>>>();
+}
+
+#[test]
+fn stream() {
+ // `StreamExt` and `TryStreamExt` are not object safe.
+ use futures::stream::{FusedStream, Stream, TryStream};
+
+ assert_is_object_safe::<&dyn Stream<Item = ()>>();
+ assert_is_object_safe::<&dyn FusedStream<Item = ()>>();
+ assert_is_object_safe::<&dyn TryStream<Ok = (), Error = (), Item = Result<(), ()>>>();
+}
+
+#[test]
+fn sink() {
+ // `SinkExt` is not object safe.
+ use futures::sink::Sink;
+
+ assert_is_object_safe::<&dyn Sink<(), Error = ()>>();
+}
+
+#[test]
+fn io() {
+ // `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe.
+ use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
+
+ assert_is_object_safe::<&dyn AsyncRead>();
+ assert_is_object_safe::<&dyn AsyncWrite>();
+ assert_is_object_safe::<&dyn AsyncSeek>();
+ assert_is_object_safe::<&dyn AsyncBufRead>();
+}
+
+#[test]
+fn task() {
+ // `ArcWake`, `SpawnExt` and `LocalSpawnExt` are not object safe.
+ use futures::task::{LocalSpawn, Spawn};
+
+ assert_is_object_safe::<&dyn Spawn>();
+ assert_is_object_safe::<&dyn LocalSpawn>();
+}
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
new file mode 100644
index 0000000..58951ec
--- /dev/null
+++ b/tests/oneshot.rs
@@ -0,0 +1,66 @@
+use futures::channel::oneshot;
+use futures::future::{FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn oneshot_send1() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ let t = thread::spawn(|| tx1.send(1).unwrap());
+ rx1.map_ok(move |x| tx2.send(x)).run_in_background();
+ assert_eq!(1, rx2.recv().unwrap());
+ t.join().unwrap();
+}
+
+#[test]
+fn oneshot_send2() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ thread::spawn(|| tx1.send(1).unwrap()).join().unwrap();
+ rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background();
+ assert_eq!(1, rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_send3() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background();
+ thread::spawn(|| tx1.send(1).unwrap()).join().unwrap();
+ assert_eq!(1, rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_tx1() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ drop(tx1);
+ rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();
+
+ assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_tx2() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ let t = thread::spawn(|| drop(tx1));
+ rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();
+ t.join().unwrap();
+
+ assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_rx() {
+ let (tx, rx) = oneshot::channel::<i32>();
+ drop(rx);
+ assert_eq!(Err(2), tx.send(2));
+}
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
new file mode 100644
index 0000000..15a0bef
--- /dev/null
+++ b/tests/ready_queue.rs
@@ -0,0 +1,151 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Barrier};
+use std::thread;
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for FuturesUnordered<()> {}
+
+#[test]
+fn basic_usage() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx2.send("hello").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx1.send("world").unwrap();
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn resolving_errors() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx2);
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx1);
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn dropping_ready_queue() {
+ block_on(future::lazy(move |_| {
+ let queue = FuturesUnordered::new();
+ let (mut tx1, rx1) = oneshot::channel::<()>();
+ let (mut tx2, rx2) = oneshot::channel::<()>();
+ let (mut tx3, rx3) = oneshot::channel::<()>();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ {
+ let cx = &mut noop_context();
+ assert!(!tx1.poll_canceled(cx).is_ready());
+ assert!(!tx2.poll_canceled(cx).is_ready());
+ assert!(!tx3.poll_canceled(cx).is_ready());
+
+ drop(queue);
+
+ assert!(tx1.poll_canceled(cx).is_ready());
+ assert!(tx2.poll_canceled(cx).is_ready());
+ assert!(tx3.poll_canceled(cx).is_ready());
+ }
+ }));
+}
+
+#[test]
+fn stress() {
+ const ITER: usize = 300;
+
+ for i in 0..ITER {
+ let n = (i % 10) + 1;
+
+ let mut queue = FuturesUnordered::new();
+
+ for _ in 0..5 {
+ let barrier = Arc::new(Barrier::new(n + 1));
+
+ for num in 0..n {
+ let barrier = barrier.clone();
+ let (tx, rx) = oneshot::channel();
+
+ queue.push(rx);
+
+ thread::spawn(move || {
+ barrier.wait();
+ tx.send(num).unwrap();
+ });
+ }
+
+ barrier.wait();
+
+ let mut sync = block_on_stream(queue);
+
+ let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
+
+ assert_eq!(rx.len(), n);
+
+ rx.sort();
+
+ for (i, x) in rx.into_iter().enumerate() {
+ assert_eq!(i, x);
+ }
+
+ queue = sync.into_inner();
+ }
+ }
+}
+
+#[test]
+fn panicking_future_dropped() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
+
+ let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
+ assert!(r.is_err());
+ assert!(queue.is_empty());
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
diff --git a/tests/recurse.rs b/tests/recurse.rs
new file mode 100644
index 0000000..2920a41
--- /dev/null
+++ b/tests/recurse.rs
@@ -0,0 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt, BoxFuture};
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn lots() {
+ fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> {
+ let (n, x) = input;
+ if n == 0 {
+ future::ready(x).boxed()
+ } else {
+ future::ready((n - 1, x + n)).then(do_it).boxed()
+ }
+ }
+
+ let (tx, rx) = mpsc::channel();
+ thread::spawn(|| {
+ block_on(do_it((1_000, 0)).map(move |x| tx.send(x).unwrap()))
+ });
+ assert_eq!(500_500, rx.recv().unwrap());
+}
diff --git a/tests/select_all.rs b/tests/select_all.rs
new file mode 100644
index 0000000..aad977d
--- /dev/null
+++ b/tests/select_all.rs
@@ -0,0 +1,29 @@
+use futures::executor::block_on;
+use futures::future::{ready, select_all};
+use std::collections::HashSet;
+
+#[test]
+fn smoke() {
+ let v = vec![
+ ready(1),
+ ready(2),
+ ready(3),
+ ];
+
+ let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>();
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ assert!(c.is_empty());
+ assert!(v.is_empty());
+}
diff --git a/tests/select_ok.rs b/tests/select_ok.rs
new file mode 100644
index 0000000..db88a95
--- /dev/null
+++ b/tests/select_ok.rs
@@ -0,0 +1,39 @@
+use futures::executor::block_on;
+use futures::future::{err, ok, select_ok};
+
+#[test]
+fn ignore_err() {
+ let v = vec![
+ err(1),
+ err(2),
+ ok(3),
+ ok(4),
+ ];
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 3);
+
+ assert_eq!(v.len(), 1);
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 4);
+
+ assert!(v.is_empty());
+}
+
+#[test]
+fn last_err() {
+ let v = vec![
+ ok(1),
+ err(2),
+ err(3),
+ ];
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 1);
+
+ assert_eq!(v.len(), 2);
+
+ let i = block_on(select_ok(v)).err().unwrap();
+ assert_eq!(i, 3);
+}
diff --git a/tests/shared.rs b/tests/shared.rs
new file mode 100644
index 0000000..8402bfe
--- /dev/null
+++ b/tests/shared.rs
@@ -0,0 +1,151 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, LocalPool};
+use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
+use futures::task::LocalSpawn;
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::thread;
+
+fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
+ let (tx, rx) = oneshot::channel::<i32>();
+ let f = rx.shared();
+ let join_handles = (0..threads_number)
+ .map(|_| {
+ let cloned_future = f.clone();
+ thread::spawn(move || {
+ assert_eq!(block_on(cloned_future).unwrap(), 6);
+ })
+ })
+ .collect::<Vec<_>>();
+
+ tx.send(6).unwrap();
+
+ assert_eq!(block_on(f).unwrap(), 6);
+ for join_handle in join_handles {
+ join_handle.join().unwrap();
+ }
+}
+
+#[test]
+fn one_thread() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1);
+}
+
+#[test]
+fn two_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(2);
+}
+
+#[test]
+fn many_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1000);
+}
+
+#[test]
+fn drop_on_one_task_ok() {
+ let (tx, rx) = oneshot::channel::<u32>();
+ let f1 = rx.shared();
+ let f2 = f1.clone();
+
+ let (tx2, rx2) = oneshot::channel::<u32>();
+
+ let t1 = thread::spawn(|| {
+ let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
+ drop(block_on(f));
+ });
+
+ let (tx3, rx3) = oneshot::channel::<u32>();
+
+ let t2 = thread::spawn(|| {
+ let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
+ });
+
+ tx2.send(11).unwrap(); // cancel `f1`
+ t1.join().unwrap();
+
+ tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved.
+ let result = block_on(rx3).unwrap();
+ assert_eq!(result, 42);
+ t2.join().unwrap();
+}
+
+#[test]
+fn drop_in_poll() {
+ let slot1 = Rc::new(RefCell::new(None));
+ let slot2 = slot1.clone();
+
+ let future1 = future::lazy(move |_| {
+ slot2.replace(None); // Drop future
+ 1
+ }).shared();
+
+ let future2 = LocalFutureObj::new(Box::new(future1.clone()));
+ slot1.replace(Some(future2));
+
+ assert_eq!(block_on(future1), 1);
+}
+
+#[test]
+fn peek() {
+ let mut local_pool = LocalPool::new();
+ let spawn = &mut local_pool.spawner();
+
+ let (tx0, rx0) = oneshot::channel::<i32>();
+ let f1 = rx0.shared();
+ let f2 = f1.clone();
+
+ // Repeated calls on the original or clone do not change the outcome.
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Completing the underlying future has no effect, because the value has not been `poll`ed in.
+ tx0.send(42).unwrap();
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Once the Shared has been polled, the value is peekable on the clone.
+ spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
+ local_pool.run();
+ for _ in 0..2 {
+ assert_eq!(*f2.peek().unwrap(), Ok(42));
+ }
+}
+
+struct CountClone(Rc<Cell<i32>>);
+
+impl Clone for CountClone {
+ fn clone(&self) -> Self {
+ self.0.set(self.0.get() + 1);
+ CountClone(self.0.clone())
+ }
+}
+
+#[test]
+fn dont_clone_in_single_owner_shared_future() {
+ let counter = CountClone(Rc::new(Cell::new(0)));
+ let (tx, rx) = oneshot::channel();
+
+ let rx = rx.shared();
+
+ tx.send(counter).ok().unwrap();
+
+ assert_eq!(block_on(rx).unwrap().0.get(), 0);
+}
+
+#[test]
+fn dont_do_unnecessary_clones_on_output() {
+ let counter = CountClone(Rc::new(Cell::new(0)));
+ let (tx, rx) = oneshot::channel();
+
+ let rx = rx.shared();
+
+ tx.send(counter).ok().unwrap();
+
+ assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1);
+ assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
+ assert_eq!(block_on(rx).unwrap().0.get(), 2);
+}
diff --git a/tests/sink.rs b/tests/sink.rs
new file mode 100644
index 0000000..f967e1b
--- /dev/null
+++ b/tests/sink.rs
@@ -0,0 +1,516 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::never::Never;
+use futures::ready;
+use futures::sink::{Sink, SinkErrInto, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{self, ArcWake, Context, Poll, Waker};
+use futures_test::task::panic_context;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::fmt;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+fn sassert_next<S>(s: &mut S, item: S::Item)
+where
+ S: Stream + Unpin,
+ S::Item: Eq + fmt::Debug,
+{
+ match s.poll_next_unpin(&mut panic_context()) {
+ Poll::Ready(None) => panic!("stream is at its end"),
+ Poll::Ready(Some(e)) => assert_eq!(e, item),
+ Poll::Pending => panic!("stream wasn't ready"),
+ }
+}
+
+fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
+ match x {
+ Poll::Ready(Ok(x)) => x,
+ Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
+ Poll::Pending => panic!("Poll::Pending"),
+ }
+}
+
+#[test]
+fn either_sink() {
+ let mut s = if true {
+ Vec::<i32>::new().left_sink()
+ } else {
+ VecDeque::<i32>::new().right_sink()
+ };
+
+ Pin::new(&mut s).start_send(0).unwrap();
+}
+
+#[test]
+fn vec_sink() {
+ let mut v = Vec::new();
+ Pin::new(&mut v).start_send(0).unwrap();
+ Pin::new(&mut v).start_send(1).unwrap();
+ assert_eq!(v, vec![0, 1]);
+ block_on(v.flush()).unwrap();
+ assert_eq!(v, vec![0, 1]);
+}
+
+#[test]
+fn vecdeque_sink() {
+ let mut deque = VecDeque::new();
+ Pin::new(&mut deque).start_send(2).unwrap();
+ Pin::new(&mut deque).start_send(3).unwrap();
+
+ assert_eq!(deque.pop_front(), Some(2));
+ assert_eq!(deque.pop_front(), Some(3));
+ assert_eq!(deque.pop_front(), None);
+}
+
+#[test]
+fn send() {
+ let mut v = Vec::new();
+
+ block_on(v.send(0)).unwrap();
+ assert_eq!(v, vec![0]);
+
+ block_on(v.send(1)).unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ block_on(v.send(2)).unwrap();
+ assert_eq!(v, vec![0, 1, 2]);
+}
+
+#[test]
+fn send_all() {
+ let mut v = Vec::new();
+
+ block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
+}
+
+// An Unpark struct that records unpark events for inspection
+struct Flag(AtomicBool);
+
+impl Flag {
+ fn new() -> Arc<Self> {
+ Arc::new(Self(AtomicBool::new(false)))
+ }
+
+ fn take(&self) -> bool {
+ self.0.swap(false, Ordering::SeqCst)
+ }
+
+ fn set(&self, v: bool) {
+ self.0.store(v, Ordering::SeqCst)
+ }
+}
+
+impl ArcWake for Flag {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.set(true)
+ }
+}
+
+fn flag_cx<F, R>(f: F) -> R
+where
+ F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
+{
+ let flag = Flag::new();
+ let waker = task::waker_ref(&flag);
+ let cx = &mut Context::from_waker(&waker);
+ f(flag.clone(), cx)
+}
+
+// Sends a value on an i32 channel sink
+struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
+
+impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
+ fn new(sink: S, item: Item) -> Self {
+ Self(Some(sink), Some(item))
+ }
+}
+
+impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
+ type Output = Result<S, S::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self(inner, item) = self.get_mut();
+ {
+ let mut inner = inner.as_mut().unwrap();
+ ready!(Pin::new(&mut inner).poll_ready(cx))?;
+ Pin::new(&mut inner).start_send(item.take().unwrap())?;
+ }
+ Poll::Ready(Ok(inner.take().unwrap()))
+ }
+}
+
+// Test that `start_send` on an `mpsc` channel does indeed block when the
+// channel is full
+#[test]
+fn mpsc_blocking_start_send() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(0);
+
+ block_on(future::lazy(|_| {
+ tx.start_send(0).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = StartSendFut::new(tx, 1);
+
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ sassert_next(&mut rx, 0);
+ assert!(flag.take());
+ unwrap(task.poll_unpin(cx));
+ assert!(!flag.take());
+ sassert_next(&mut rx, 1);
+ })
+ }));
+}
+
+// test `flush` by using `with` to make the first insertion into a sink block
+// until a oneshot is completed
+#[test]
+fn with_flush() {
+ let (tx, rx) = oneshot::channel();
+ let mut block = rx.boxed();
+ let mut sink = Vec::new().with(|elem| {
+ mem::replace(&mut block, future::ok(()).boxed())
+ .map_ok(move |()| elem + 1)
+ .map_err(|_| -> Never { panic!() })
+ });
+
+ assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.flush();
+ assert!(task.poll_unpin(cx).is_pending());
+ tx.send(()).unwrap();
+ assert!(flag.take());
+
+ unwrap(task.poll_unpin(cx));
+
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[1, 2]);
+ })
+}
+
+// test simple use of with to change data
+#[test]
+fn with_as_map() {
+ let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ block_on(sink.send(2)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 2, 4]);
+}
+
+// test simple use of with_flat_map
+#[test]
+fn with_flat_map() {
+ let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ block_on(sink.send(2)).unwrap();
+ block_on(sink.send(3)).unwrap();
+ assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
+}
+
+// Check that `with` propagates `poll_ready` to the inner sink.
+// Regression test for the issue #1834.
+#[test]
+fn with_propagates_poll_ready() {
+ let (tx, mut rx) = mpsc::channel::<i32>(0);
+ let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
+
+ block_on(future::lazy(|_| {
+ flag_cx(|flag, cx| {
+ let mut tx = Pin::new(&mut tx);
+
+ // Should be ready for the first item.
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(tx.as_mut().start_send(0), Ok(()));
+
+ // Should be ready for the second item only after the first one is received.
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
+ assert!(!flag.take());
+ sassert_next(&mut rx, 10);
+ assert!(flag.take());
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(tx.as_mut().start_send(1), Ok(()));
+ })
+ }));
+}
+
+// Immediately accepts all requests to start pushing, but completion is managed
+// by manually flushing
+struct ManualFlush<T: Unpin> {
+ data: Vec<T>,
+ waiting_tasks: Vec<Waker>,
+}
+
+impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
+ if let Some(item) = item {
+ self.data.push(item);
+ } else {
+ self.force_flush();
+ }
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.data.is_empty() {
+ Poll::Ready(Ok(()))
+ } else {
+ self.waiting_tasks.push(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+}
+
+impl<T: Unpin> ManualFlush<T> {
+ fn new() -> Self {
+ Self {
+ data: Vec::new(),
+ waiting_tasks: Vec::new(),
+ }
+ }
+
+ fn force_flush(&mut self) -> Vec<T> {
+ for task in self.waiting_tasks.drain(..) {
+ task.wake()
+ }
+ mem::replace(&mut self.data, Vec::new())
+ }
+}
+
+// test that the `with` sink doesn't require the underlying sink to flush,
+// but doesn't claim to be flushed until the underlying sink is
+#[test]
+fn with_flush_propagate() {
+ let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
+ flag_cx(|flag, cx| {
+ unwrap(Pin::new(&mut sink).poll_ready(cx));
+ Pin::new(&mut sink).start_send(Some(0)).unwrap();
+ unwrap(Pin::new(&mut sink).poll_ready(cx));
+ Pin::new(&mut sink).start_send(Some(1)).unwrap();
+
+ {
+ let mut task = sink.flush();
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ }
+ assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
+ assert!(flag.take());
+ unwrap(sink.flush().poll_unpin(cx));
+ })
+}
+
+// test that a buffer is a no-nop around a sink that always accepts sends
+#[test]
+fn buffer_noop() {
+ let mut sink = Vec::new().buffer(0);
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+
+ let mut sink = Vec::new().buffer(1);
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+}
+
+struct ManualAllow<T: Unpin> {
+ data: Vec<T>,
+ allow: Rc<Allow>,
+}
+
+struct Allow {
+ flag: Cell<bool>,
+ tasks: RefCell<Vec<Waker>>,
+}
+
+impl Allow {
+ fn new() -> Self {
+ Self {
+ flag: Cell::new(false),
+ tasks: RefCell::new(Vec::new()),
+ }
+ }
+
+ fn check(&self, cx: &mut Context<'_>) -> bool {
+ if self.flag.get() {
+ true
+ } else {
+ self.tasks.borrow_mut().push(cx.waker().clone());
+ false
+ }
+ }
+
+ fn start(&self) {
+ self.flag.set(true);
+ let mut tasks = self.tasks.borrow_mut();
+ for task in tasks.drain(..) {
+ task.wake();
+ }
+ }
+}
+
+impl<T: Unpin> Sink<T> for ManualAllow<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.allow.check(cx) {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ self.data.push(item);
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
+ let allow = Rc::new(Allow::new());
+ let manual_allow = ManualAllow {
+ data: Vec::new(),
+ allow: allow.clone(),
+ };
+ (manual_allow, allow)
+}
+
+// test basic buffer functionality, including both filling up to capacity,
+// and writing out when the underlying sink is ready
+#[test]
+fn buffer() {
+ let (sink, allow) = manual_allow::<i32>();
+ let sink = sink.buffer(2);
+
+ let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
+ let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.send(2);
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ allow.start();
+ assert!(flag.take());
+ unwrap(task.poll_unpin(cx));
+ assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
+ })
+}
+
+#[test]
+fn fanout_smoke() {
+ let sink1 = Vec::new();
+ let sink2 = Vec::new();
+ let mut sink = sink1.fanout(sink2);
+ block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
+ let (sink1, sink2) = sink.into_inner();
+ assert_eq!(sink1, vec![1, 2, 3]);
+ assert_eq!(sink2, vec![1, 2, 3]);
+}
+
+#[test]
+fn fanout_backpressure() {
+ let (left_send, mut left_recv) = mpsc::channel(0);
+ let (right_send, mut right_recv) = mpsc::channel(0);
+ let sink = left_send.fanout(right_send);
+
+ let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.send(2);
+ assert!(!flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(left_recv.next()), Some(0));
+ assert!(flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(right_recv.next()), Some(0));
+ assert!(flag.take());
+
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(left_recv.next()), Some(2));
+ assert!(flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(right_recv.next()), Some(2));
+ assert!(flag.take());
+
+ unwrap(task.poll_unpin(cx));
+ // make sure receivers live until end of test to prevent send errors
+ drop(left_recv);
+ drop(right_recv);
+ })
+}
+
+#[test]
+fn sink_map_err() {
+ {
+ let cx = &mut panic_context();
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx = tx.sink_map_err(|_| ());
+ assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
+ assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(
+ Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()),
+ Err(())
+ );
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct ErrIntoTest;
+
+impl From<mpsc::SendError> for ErrIntoTest {
+ fn from(_: mpsc::SendError) -> Self {
+ Self
+ }
+}
+
+#[test]
+fn err_into() {
+ {
+ let cx = &mut panic_context();
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
+ assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
+ assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(
+ Pin::new(&mut tx.sink_err_into()).start_send(()),
+ Err(ErrIntoTest)
+ );
+}
diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs
new file mode 100644
index 0000000..e57b2d8
--- /dev/null
+++ b/tests/sink_fanout.rs
@@ -0,0 +1,24 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::join3;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn it_works() {
+ let (tx1, rx1) = mpsc::channel(1);
+ let (tx2, rx2) = mpsc::channel(2);
+ let tx = tx1.fanout(tx2).sink_map_err(|_| ());
+
+ let src = stream::iter((0..10).map(Ok));
+ let fwd = src.forward(tx);
+
+ let collect_fut1 = rx1.collect::<Vec<_>>();
+ let collect_fut2 = rx2.collect::<Vec<_>>();
+ let (_, vec1, vec2) = block_on(join3(fwd, collect_fut1, collect_fut2));
+
+ let expected = (0..10).collect::<Vec<_>>();
+
+ assert_eq!(vec1, expected);
+ assert_eq!(vec2, expected);
+}
diff --git a/tests/split.rs b/tests/split.rs
new file mode 100644
index 0000000..9f4f1a0
--- /dev/null
+++ b/tests/split.rs
@@ -0,0 +1,77 @@
+use futures::executor::block_on;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+use std::pin::Pin;
+
+struct Join<T, U> {
+ stream: T,
+ sink: U
+}
+
+impl<T, U> Join<T, U> {
+ unsafe_pinned!(stream: T);
+ unsafe_pinned!(sink: U);
+}
+
+impl<T: Stream, U> Stream for Join<T, U> {
+ type Item = T::Item;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T::Item>> {
+ self.stream().poll_next(cx)
+ }
+}
+
+impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
+ type Error = U::Error;
+
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_ready(cx)
+ }
+
+ fn start_send(
+ self: Pin<&mut Self>,
+ item: Item,
+ ) -> Result<(), Self::Error> {
+ self.sink().start_send(item)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_flush(cx)
+ }
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_close(cx)
+ }
+}
+
+#[test]
+fn test_split() {
+ let mut dest: Vec<i32> = Vec::new();
+ {
+ let join = Join {
+ stream: stream::iter(vec![10, 20, 30]),
+ sink: &mut dest
+ };
+
+ let (sink, stream) = join.split();
+ let join = sink.reunite(stream).expect("test_split: reunite error");
+ let (mut sink, stream) = join.split();
+ let mut stream = stream.map(Ok);
+ block_on(sink.send_all(&mut stream)).unwrap();
+ }
+ assert_eq!(dest, vec![10, 20, 30]);
+}
diff --git a/tests/stream.rs b/tests/stream.rs
new file mode 100644
index 0000000..fd6a8b6
--- /dev/null
+++ b/tests/stream.rs
@@ -0,0 +1,32 @@
+use futures::executor::block_on;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn select() {
+ fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
+ let a = stream::iter(a);
+ let b = stream::iter(b);
+ let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
+ assert_eq!(vec, expected);
+ }
+
+ select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
+ select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
+ select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
+}
+
+#[test]
+fn scan() {
+ futures::executor::block_on(async {
+ assert_eq!(
+ stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
+ .scan(1, |acc, e| {
+ *acc += 1;
+ futures::future::ready(if e < *acc { Some(e) } else { None })
+ })
+ .collect::<Vec<_>>()
+ .await,
+ vec![1u8, 2, 3, 4]
+ );
+ });
+}
diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs
new file mode 100644
index 0000000..8b23a0a
--- /dev/null
+++ b/tests/stream_catch_unwind.rs
@@ -0,0 +1,27 @@
+use futures::executor::block_on_stream;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn panic_in_the_middle_of_the_stream() {
+ let stream = stream::iter(vec![Some(10), None, Some(11)]);
+
+ // panic on second element
+ let stream_panicking = stream.map(|o| o.unwrap());
+ let mut iter = block_on_stream(stream_panicking.catch_unwind());
+
+ assert_eq!(10, iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().unwrap().is_err());
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn no_panic() {
+ let stream = stream::iter(vec![10, 11, 12]);
+
+ let mut iter = block_on_stream(stream.catch_unwind());
+
+ assert_eq!(10, iter.next().unwrap().ok().unwrap());
+ assert_eq!(11, iter.next().unwrap().ok().unwrap());
+ assert_eq!(12, iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().is_none());
+}
diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs
new file mode 100644
index 0000000..c528af0
--- /dev/null
+++ b/tests/stream_into_async_read.rs
@@ -0,0 +1,96 @@
+use core::pin::Pin;
+use futures::io::{AsyncRead, AsyncBufRead};
+use futures::stream::{self, TryStreamExt};
+use futures::task::Poll;
+use futures_test::{task::noop_context, stream::StreamTestExt};
+
+macro_rules! assert_read {
+ ($reader:expr, $buf:expr, $item:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $item);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+}
+
+macro_rules! assert_fill_buf {
+ ($reader:expr, $buf:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $buf);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+}
+
+#[test]
+fn test_into_async_read() {
+ let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
+ let mut reader = stream.interleave_pending().into_async_read();
+ let mut buf = vec![0; 3];
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 0);
+}
+
+#[test]
+fn test_into_async_bufread() -> std::io::Result<()> {
+ let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
+ let mut reader = stream.interleave_pending().into_async_read();
+
+ let mut reader = Pin::new(&mut reader);
+
+ assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
+ reader.as_mut().consume(3);
+
+ assert_fill_buf!(reader, &[4, 5][..]);
+ reader.as_mut().consume(2);
+
+ assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
+ reader.as_mut().consume(2);
+
+ assert_fill_buf!(reader, &[3, 4, 5][..]);
+ reader.as_mut().consume(3);
+
+ assert_fill_buf!(reader, &[][..]);
+
+ Ok(())
+}
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
new file mode 100644
index 0000000..b65a057
--- /dev/null
+++ b/tests/stream_peekable.rs
@@ -0,0 +1,13 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures::stream::{self, Peekable, StreamExt};
+
+#[test]
+fn peekable() {
+ block_on(async {
+ let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
+ pin_mut!(peekable);
+ assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
+ assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+ });
+}
diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs
new file mode 100644
index 0000000..eb711dd
--- /dev/null
+++ b/tests/stream_select_all.rs
@@ -0,0 +1,78 @@
+use futures::channel::mpsc;
+use futures::executor::block_on_stream;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+
+#[test]
+fn is_terminated() {
+ let mut cx = noop_context();
+ let mut tasks = SelectAll::new();
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+
+ // Test that the sentinel value doesn't leak
+ assert_eq!(tasks.is_empty(), true);
+ assert_eq!(tasks.len(), 0);
+
+ tasks.push(future::ready(1).into_stream());
+
+ assert_eq!(tasks.is_empty(), false);
+ assert_eq!(tasks.len(), 1);
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+}
+
+#[test]
+fn issue_1626() {
+ let a = stream::iter(0..=2);
+ let b = stream::iter(10..=14);
+
+ let mut s = block_on_stream(stream::select_all(vec![a, b]));
+
+ assert_eq!(s.next(), Some(0));
+ assert_eq!(s.next(), Some(10));
+ assert_eq!(s.next(), Some(1));
+ assert_eq!(s.next(), Some(11));
+ assert_eq!(s.next(), Some(2));
+ assert_eq!(s.next(), Some(12));
+ assert_eq!(s.next(), Some(13));
+ assert_eq!(s.next(), Some(14));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = mpsc::unbounded::<u32>();
+ let (b_tx, b_rx) = mpsc::unbounded::<u32>();
+ let (c_tx, c_rx) = mpsc::unbounded::<u32>();
+
+ let streams = vec![a_rx, b_rx, c_rx];
+
+ let mut stream = block_on_stream(select_all(streams));
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ c_tx.unbounded_send(42).unwrap();
+ assert_eq!(Some(42), stream.next());
+ a_tx.unbounded_send(43).unwrap();
+ assert_eq!(Some(43), stream.next());
+
+ drop((a_tx, b_tx, c_tx));
+ assert_eq!(None, stream.next());
+}
diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs
new file mode 100644
index 0000000..09d7e89
--- /dev/null
+++ b/tests/stream_select_next_some.rs
@@ -0,0 +1,85 @@
+use futures::{future, select};
+use futures::future::{FusedFuture, FutureExt};
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::new_count_waker;
+
+#[test]
+fn is_terminated() {
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let mut tasks = FuturesUnordered::new();
+
+ let mut select_next_some = tasks.select_next_some();
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(counter, 1);
+ assert_eq!(select_next_some.is_terminated(), true);
+ drop(select_next_some);
+
+ tasks.push(future::ready(1));
+
+ let mut select_next_some = tasks.select_next_some();
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Ready(1));
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(select_next_some.is_terminated(), true);
+}
+
+#[test]
+fn select() {
+ // Checks that even though `async_tasks` will yield a `None` and return
+ // `is_terminated() == true` during the first poll, it manages to toggle
+ // back to having items after a future is pushed into it during the second
+ // poll (after pending_once completes).
+ futures::executor::block_on(async {
+ let mut fut = future::ready(1).pending_once();
+ let mut async_tasks = FuturesUnordered::new();
+ let mut total = 0;
+ loop {
+ select! {
+ num = fut => {
+ total += num;
+ async_tasks.push(async { 5 });
+ },
+ num = async_tasks.select_next_some() => {
+ total += num;
+ }
+ complete => break,
+ }
+ }
+ assert_eq!(total, 6);
+ });
+}
+
+// Check that `select!` macro does not fail when importing from `futures_util`.
+#[test]
+fn futures_util_select() {
+ use futures_util::select;
+
+ // Checks that even though `async_tasks` will yield a `None` and return
+ // `is_terminated() == true` during the first poll, it manages to toggle
+ // back to having items after a future is pushed into it during the second
+ // poll (after pending_once completes).
+ futures::executor::block_on(async {
+ let mut fut = future::ready(1).pending_once();
+ let mut async_tasks = FuturesUnordered::new();
+ let mut total = 0;
+ loop {
+ select! {
+ num = fut => {
+ total += num;
+ async_tasks.push(async { 5 });
+ },
+ num = async_tasks.select_next_some() => {
+ total += num;
+ }
+ complete => break,
+ }
+ }
+ assert_eq!(total, 6);
+ });
+}
diff --git a/tests/try_join.rs b/tests/try_join.rs
new file mode 100644
index 0000000..6c6d084
--- /dev/null
+++ b/tests/try_join.rs
@@ -0,0 +1,36 @@
+#![deny(unreachable_code)]
+
+use futures::{try_join, executor::block_on};
+
+// TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to
+// test behaviour of the `try_join!` macro with the never type before it is
+// stabilized. Once `!` is again stabilized this can be removed and replaced
+// with direct use of `!` below where `Never` is used.
+trait MyTrait {
+ type Output;
+}
+impl<T> MyTrait for fn() -> T {
+ type Output = T;
+}
+type Never = <fn() -> ! as MyTrait>::Output;
+
+
+#[test]
+fn try_join_never_error() {
+ block_on(async {
+ let future1 = async { Ok::<(), Never>(()) };
+ let future2 = async { Ok::<(), Never>(()) };
+ try_join!(future1, future2)
+ })
+ .unwrap();
+}
+
+#[test]
+fn try_join_never_ok() {
+ block_on(async {
+ let future1 = async { Err::<Never, ()>(()) };
+ let future2 = async { Err::<Never, ()>(()) };
+ try_join!(future1, future2)
+ })
+ .unwrap_err();
+}
diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs
new file mode 100644
index 0000000..662b866
--- /dev/null
+++ b/tests/try_join_all.rs
@@ -0,0 +1,44 @@
+use futures_util::future::*;
+use std::future::Future;
+use futures::executor::block_on;
+use std::fmt::Debug;
+
+fn assert_done<T, F>(actual_fut: F, expected: T)
+where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+{
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
+ assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
+ assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+ // REVIEW: should this be implemented?
+ // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn try_join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `TryJoinAll`.
+ fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+ let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
+ Box::new(try_join_all(iter))
+ }
+
+ assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1]));
+}
+
+#[test]
+fn try_join_all_from_iter() {
+ assert_done(
+ || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+ Ok::<_, usize>(vec![1, 2]),
+ )
+}
diff --git a/tests/unfold.rs b/tests/unfold.rs
new file mode 100644
index 0000000..95722cf
--- /dev/null
+++ b/tests/unfold.rs
@@ -0,0 +1,35 @@
+use futures::future;
+use futures::stream;
+
+use futures_test::future::FutureTestExt;
+use futures_test::{
+ assert_stream_done, assert_stream_next, assert_stream_pending,
+};
+
+#[test]
+fn unfold1() {
+ let mut stream = stream::unfold(0, |state| {
+ if state <= 2 {
+ future::ready(Some((state * 2, state + 1))).pending_once()
+ } else {
+ future::ready(None).pending_once()
+ }
+ });
+
+ // Creates the future with the closure
+ // Not ready (delayed future)
+ assert_stream_pending!(stream);
+ // Future is ready, yields the item
+ assert_stream_next!(stream, 0);
+
+ // Repeat
+ assert_stream_pending!(stream);
+ assert_stream_next!(stream, 2);
+
+ assert_stream_pending!(stream);
+ assert_stream_next!(stream, 4);
+
+ // No more items
+ assert_stream_pending!(stream);
+ assert_stream_done!(stream);
+}
diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs
new file mode 100644
index 0000000..6c7e11c
--- /dev/null
+++ b/tests_disabled/all.rs
@@ -0,0 +1,351 @@
+use futures::future;
+use futures::executor::block_on;
+use futures::channel::oneshot::{self, Canceled};
+use std::sync::mpsc::{channel, TryRecvError};
+
+mod support;
+use support::*;
+
+fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
+ match r {
+ Ok(Either::Left((t, _))) |
+ Ok(Either::Right((t, _))) => Ok(t),
+ Err(Either::Left((e, _))) |
+ Err(Either::Right((e, _))) => Err(e),
+ }
+}
+
+#[test]
+fn result_smoke() {
+ fn is_future_v<A, B, C>(_: C)
+ where A: Send + 'static,
+ B: Send + 'static,
+ C: Future<Item=A, Error=B>
+ {}
+
+ is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
+ is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
+ is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
+ is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
+
+ assert_done(|| f_ok(1), r_ok(1));
+ assert_done(|| f_err(1), r_err(1));
+ assert_done(|| result(Ok(1)), r_ok(1));
+ assert_done(|| result(Err(1)), r_err(1));
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+ assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
+ assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
+ assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
+ assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
+ assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
+ assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
+ assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
+ assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
+ assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
+ assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
+ assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
+ assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
+ assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
+ assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
+ assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
+ assert_done(|| f_ok(1).join(f_err(1)), Err(1));
+ assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
+ assert_done(|| f_err(1).join(f_ok(1)), Err(1));
+ assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
+ assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
+}
+
+#[test]
+fn test_empty() {
+ fn empty() -> Empty<i32, u32> { future::empty() }
+
+ assert_empty(|| empty());
+ assert_empty(|| empty().select(empty()));
+ assert_empty(|| empty().join(empty()));
+ assert_empty(|| empty().join(f_ok(1)));
+ assert_empty(|| f_ok(1).join(empty()));
+ assert_empty(|| empty().or_else(move |_| empty()));
+ assert_empty(|| empty().and_then(move |_| empty()));
+ assert_empty(|| f_err(1).or_else(move |_| empty()));
+ assert_empty(|| f_ok(1).and_then(move |_| empty()));
+ assert_empty(|| empty().map(|a| a + 1));
+ assert_empty(|| empty().map_err(|a| a + 1));
+ assert_empty(|| empty().then(|a| a));
+}
+
+#[test]
+fn test_ok() {
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+}
+
+#[test]
+fn flatten() {
+ fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
+ future::ok(a)
+ }
+ fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
+ future::err(b)
+ }
+
+ assert_done(|| ok(ok(1)).flatten(), r_ok(1));
+ assert_done(|| ok(err(1)).flatten(), r_err(1));
+ assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
+ assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1));
+ assert_empty(|| ok(empty::<i32, u32>()).flatten());
+ assert_empty(|| empty::<i32, u32>().map(ok).flatten());
+}
+
+#[test]
+fn smoke_oneshot() {
+ assert_done(|| {
+ let (c, p) = oneshot::channel();
+ c.send(1).unwrap();
+ p
+ }, Ok(1));
+ assert_done(|| {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ p
+ }, Err(Canceled));
+ let mut completes = Vec::new();
+ assert_empty(|| {
+ let (a, b) = oneshot::channel::<i32>();
+ completes.push(a);
+ b
+ });
+
+ let (c, mut p) = oneshot::channel::<i32>();
+ drop(c);
+ let res = panic_waker_lw(|lw| p.poll(lw));
+ assert!(res.is_err());
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ let (tx, rx) = channel();
+ p.then(move |_| {
+ tx.send(())
+ }).forget();
+ rx.recv().unwrap();
+}
+
+#[test]
+fn select_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let mut f = b.select(d).then(unselect);
+ // assert!(f.poll(&mut Task::new()).is_pending());
+ assert!(brx.try_recv().is_err());
+ assert!(drx.try_recv().is_err());
+ a.send(1).unwrap();
+ noop_waker_lw(|lw| {
+ let res = f.poll(lw);
+ assert!(res.ok().unwrap().is_ready());
+ assert_eq!(brx.recv().unwrap(), 1);
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let mut f = b.select(d).then(unselect);
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ a.send(1).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ drop((c, f));
+ assert!(drx.recv().is_err());
+ })
+}
+
+#[test]
+fn join_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let mut f = b.join(d);
+ drop(a);
+ let res = panic_waker_lw(|lw| f.poll(lw));
+ assert!(res.is_err());
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let (tx, rx) = channel();
+ let f = b.join(d);
+ f.then(move |_| {
+ tx.send(()).unwrap();
+ let res: Result<(), ()> = Ok(());
+ res
+ }).forget();
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ rx.recv().unwrap();
+ drop(c);
+ assert!(drx.recv().is_err());
+}
+
+#[test]
+fn join_incomplete() {
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ noop_waker_lw(|lw| {
+ let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ a.send(2).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ a.send(1).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll(lw).is_err());
+ assert_eq!(rx.recv().unwrap(), 2);
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll(lw).is_err());
+ assert_eq!(rx.recv().unwrap(), 1);
+ })
+}
+
+
+#[test]
+fn select2() {
+ assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
+ assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2));
+ assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
+ assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
+
+ assert_done(|| {
+ f_ok(1).select(f_ok(2))
+ .map_err(|_| 0)
+ .and_then(|either_tup| {
+ let (a, b) = either_tup.into_inner();
+ b.map(move |b| a + b)
+ })
+ }, Ok(3));
+
+ // Finish one half of a select and then fail the second, ensuring that we
+ // get the notification of the second one.
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map(move |r| tx.send(r).unwrap()).forget();
+ a.send(1).unwrap();
+ let (val, next) = rx.recv().unwrap().into_inner();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map_err(move |_r| tx.send(2).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ drop(c);
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Fail the second half and ensure that we see the first one finish
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget();
+ drop(c);
+ let (val, next) = rx.recv().unwrap();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map(move |r| tx.send(r).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ a.send(2).unwrap();
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Cancelling the first half should cancel the second
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let f = b.select(d);
+ drop(f);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel after a schedule
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let mut f = b.select(d);
+ let _res = noop_waker_lw(|lw| f.poll(lw));
+ drop(f);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel propagates
+ {
+ let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let (tx, rx) = channel();
+ b.select(d).map(move |_| tx.send(()).unwrap()).forget();
+ drop(a);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ assert!(rx.recv().is_err());
+ }
+
+ // Cancel on early drop
+ {
+ let (tx, rx) = channel();
+ let f = f_ok(1).select(empty::<_, ()>().map(move |()| {
+ tx.send(()).unwrap();
+ 1
+ }));
+ drop(f);
+ assert!(rx.recv().is_err());
+ }
+}
+
+#[test]
+fn option() {
+ assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future()));
+ assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future()));
+}
diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs
new file mode 100644
index 0000000..c1bc33f
--- /dev/null
+++ b/tests_disabled/bilock.rs
@@ -0,0 +1,105 @@
+use futures::task;
+use futures::stream;
+use futures::future;
+use futures_util::lock::BiLock;
+use std::thread;
+
+mod support;
+use support::*;
+
+#[test]
+fn smoke() {
+ let future = future::lazy(|_| {
+ let (a, b) = BiLock::new(1);
+
+ {
+ let mut lock = match a.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 1);
+ *lock = 2;
+
+ assert!(b.poll_lock().is_pending());
+ assert!(a.poll_lock().is_pending());
+ }
+
+ assert!(b.poll_lock().is_ready());
+ assert!(a.poll_lock().is_ready());
+
+ {
+ let lock = match b.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 2);
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
+
+ Ok::<(), ()>(())
+ });
+
+ assert!(task::spawn(future)
+ .poll_future_notify(&notify_noop(), 0)
+ .expect("failure in poll")
+ .is_ready());
+}
+
+#[test]
+fn concurrent() {
+ const N: usize = 10000;
+ let (a, b) = BiLock::new(0);
+
+ let a = Increment {
+ a: Some(a),
+ remaining: N,
+ };
+ let b = stream::iter_ok(0..N).fold(b, |b, _n| {
+ b.lock().map(|mut b| {
+ *b += 1;
+ b.unlock()
+ })
+ });
+
+ let t1 = thread::spawn(move || a.wait());
+ let b = b.wait().expect("b error");
+ let a = t1.join().unwrap().expect("a error");
+
+ match a.poll_lock() {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+ match b.poll_lock() {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
+
+ struct Increment {
+ remaining: usize,
+ a: Option<BiLock<usize>>,
+ }
+
+ impl Future for Increment {
+ type Item = BiLock<usize>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
+ loop {
+ if self.remaining == 0 {
+ return Ok(self.a.take().unwrap().into())
+ }
+
+ let a = self.a.as_ref().unwrap();
+ let mut a = match a.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => return Ok(Poll::Pending),
+ };
+ self.remaining -= 1;
+ *a += 1;
+ }
+ }
+ }
+}
diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs
new file mode 100644
index 0000000..4eaf12e
--- /dev/null
+++ b/tests_disabled/stream.rs
@@ -0,0 +1,393 @@
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{err, ok};
+use futures::stream::{empty, iter_ok, poll_fn, Peekable};
+use futures::channel::oneshot;
+use futures::channel::mpsc;
+
+mod support;
+use support::*;
+
+pub struct Iter<I> {
+ iter: I,
+}
+
+pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
+ where J: IntoIterator<Item=Result<T, E>>,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I, T, E> Stream for Iter<I>
+ where I: Iterator<Item=Result<T, E>>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E> {
+ match self.iter.next() {
+ Some(Ok(e)) => Ok(Poll::Ready(Some(e))),
+ Some(Err(e)) => Err(e),
+ None => Ok(Poll::Ready(None)),
+ }
+ }
+}
+
+fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1))
+ .and_then(|tx| tx.send(Ok(2)))
+ .and_then(|tx| tx.send(Ok(3)))
+ .forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1))
+ .and_then(|tx| tx.send(Ok(2)))
+ .and_then(|tx| tx.send(Err(3)))
+ .forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+#[test]
+fn map() {
+ assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
+}
+
+#[test]
+fn map_err() {
+ assert_done(|| err_list().map_err(|a| a + 1).collect::<Vec<_>>(), Err(4));
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct FromErrTest(u32);
+
+impl From<u32> for FromErrTest {
+ fn from(i: u32) -> FromErrTest {
+ FromErrTest(i)
+ }
+}
+
+#[test]
+fn from_err() {
+ assert_done(|| err_list().err_into().collect::<Vec<_>>(), Err(FromErrTest(3)));
+}
+
+#[test]
+fn fold() {
+ assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
+ assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
+}
+
+#[test]
+fn filter() {
+ assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2]));
+}
+
+#[test]
+fn filter_map() {
+ assert_done(|| list().filter_map(|x| {
+ ok(if x % 2 == 0 {
+ Some(x + 10)
+ } else {
+ None
+ })
+ }).collect(), Ok(vec![12]));
+}
+
+#[test]
+fn and_then() {
+ assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
+ assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(),
+ Err(1));
+}
+
+#[test]
+fn then() {
+ assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
+
+}
+
+#[test]
+fn or_else() {
+ assert_done(|| err_list().or_else(|a| {
+ ok::<i32, u32>(a as i32)
+ }).collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn flatten() {
+ assert_done(|| list().map(|_| list()).flatten().collect(),
+ Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
+
+}
+
+#[test]
+fn skip() {
+ assert_done(|| list().skip(2).collect(), Ok(vec![3]));
+}
+
+#[test]
+fn skip_passes_errors_through() {
+ let mut s = block_on_stream(
+ iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)
+ );
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(4)));
+ assert_eq!(s.next(), Some(Ok(5)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn skip_while() {
+ assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
+ Ok(vec![2, 3]));
+}
+#[test]
+fn take() {
+ assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_while() {
+ assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
+ Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_passes_errors_through() {
+ let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1));
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(3)));
+ assert_eq!(s.next(), None);
+
+ let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1));
+ assert_eq!(s.next(), Some(Ok(1)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn peekable() {
+ assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn fuse() {
+ let mut stream = block_on_stream(list().fuse());
+ assert_eq!(stream.next(), Some(Ok(1)));
+ assert_eq!(stream.next(), Some(Ok(2)));
+ assert_eq!(stream.next(), Some(Ok(3)));
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+}
+
+#[test]
+fn buffered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffered(2);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn unordered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffer_unordered(2);
+ sassert_empty(&mut rx);
+ let mut rx = block_on_stream(rx);
+ c.send(3).unwrap();
+ assert_eq!(rx.next(), Some(Ok(3)));
+ a.send(5).unwrap();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
+
+ // We don't even get to see `c` until `a` completes.
+ let mut rx = rx.buffer_unordered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn zip() {
+ assert_done(|| list().zip(list()).collect(),
+ Ok(vec![(1, 1), (2, 2), (3, 3)]));
+ assert_done(|| list().zip(list().take(2)).collect(),
+ Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| list().take(2).zip(list()).collect(),
+ Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
+ assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
+ Ok(vec![(1, 2), (2, 3), (3, 4)]));
+}
+
+#[test]
+fn peek() {
+ struct Peek {
+ inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
+ }
+
+ impl Future for Peek {
+ type Item = ();
+ type Error = u32;
+
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> {
+ {
+ let res = ready!(self.inner.peek(cx))?;
+ assert_eq!(res, Some(&1));
+ }
+ assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into());
+ assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into());
+ Ok(Poll::Ready(()))
+ }
+ }
+
+ block_on(Peek {
+ inner: list().peekable(),
+ }).unwrap()
+}
+
+#[test]
+fn wait() {
+ assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(),
+ Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn chunks() {
+ assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
+ assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
+ assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
+ let mut list = block_on_stream(err_list().chunks(3));
+ let i = list.next().unwrap().unwrap();
+ assert_eq!(i, vec![1, 2]);
+ let i = list.next().unwrap().unwrap_err();
+ assert_eq!(i, 3);
+}
+
+#[test]
+#[should_panic]
+fn chunks_panic_on_cap_zero() {
+ let _ = list().chunks(0);
+}
+
+#[test]
+fn forward() {
+ let v = Vec::new();
+ let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1;
+ assert_eq!(v, vec![0, 1]);
+
+ let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
+ Ok(vec![0, 1, 2, 3, 4, 5]));
+}
+
+#[test]
+#[allow(deprecated)]
+fn concat() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat(), Err(()));
+}
+
+#[test]
+fn concat2() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat(), Err(()));
+
+ let c = empty::<Vec<()>, ()>();
+ assert_done(move || c.concat(), Ok(vec![]))
+}
+
+#[test]
+fn stream_poll_fn() {
+ let mut counter = 5usize;
+
+ let read_stream = poll_fn(move |_| -> Poll<Option<usize>, std::io::Error> {
+ if counter == 0 {
+ return Ok(Poll::Ready(None));
+ }
+ counter -= 1;
+ Ok(Poll::Ready(Some(counter)))
+ });
+
+ assert_eq!(block_on_stream(read_stream).count(), 5);
+}
+
+#[test]
+fn inspect() {
+ let mut seen = vec![];
+ assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
+ assert_eq!(seen, [1, 2, 3]);
+}
+
+#[test]
+fn inspect_err() {
+ let mut seen = vec![];
+ assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::<Vec<_>>(), Err(3));
+ assert_eq!(seen, [3]);
+}