From 3d406bbb486e6ce5168e5e930f634befd9f56bb6 Mon Sep 17 00:00:00 2001 From: Jason Macnak Date: Thu, 19 Mar 2020 21:05:06 +0000 Subject: Import 'futures' rust crate version 0.3.4 Bug: b/151760391 Test: m crosvm.experimental Change-Id: Idf25787e5b46e6e1e0df0b978b41357cbdacaa25 --- .gitignore | 7 + Android.bp | 44 +++ Cargo.toml | 76 +++++ Cargo.toml.orig | 52 ++++ LICENSE | 1 + LICENSE-APACHE | 202 +++++++++++++ LICENSE-MIT | 26 ++ METADATA | 18 ++ MODULE_LICENSE_APACHE2 | 0 NOTICE | 1 + src/lib.rs | 605 +++++++++++++++++++++++++++++++++++++ tests/abortable.rs | 39 +++ tests/arc_wake.rs | 77 +++++ tests/async_await_macros.rs | 360 ++++++++++++++++++++++ tests/atomic_waker.rs | 49 +++ tests/basic_combinators.rs | 98 ++++++ tests/buffer_unordered.rs | 73 +++++ tests/compat.rs | 17 ++ tests/eager_drop.rs | 117 +++++++ tests/eventual.rs | 159 ++++++++++ tests/fuse.rs | 12 + tests/future_obj.rs | 33 ++ tests/future_try_flatten_stream.rs | 82 +++++ tests/futures_ordered.rs | 83 +++++ tests/futures_unordered.rs | 287 ++++++++++++++++++ tests/inspect.rs | 14 + tests/io_buf_reader.rs | 321 ++++++++++++++++++++ tests/io_buf_writer.rs | 236 +++++++++++++++ tests/io_cursor.rs | 29 ++ tests/io_lines.rs | 62 ++++ tests/io_read.rs | 65 ++++ tests/io_read_exact.rs | 17 ++ tests/io_read_line.rs | 60 ++++ tests/io_read_to_string.rs | 45 +++ tests/io_read_until.rs | 60 ++++ tests/io_window.rs | 26 ++ tests/io_write.rs | 70 +++++ tests/join_all.rs | 43 +++ tests/macro_comma_support.rs | 42 +++ tests/mutex.rs | 69 +++++ tests/object_safety.rs | 49 +++ tests/oneshot.rs | 66 ++++ tests/ready_queue.rs | 151 +++++++++ tests/recurse.rs | 22 ++ tests/select_all.rs | 29 ++ tests/select_ok.rs | 39 +++ tests/shared.rs | 151 +++++++++ tests/sink.rs | 516 +++++++++++++++++++++++++++++++ tests/sink_fanout.rs | 24 ++ tests/split.rs | 77 +++++ tests/stream.rs | 32 ++ tests/stream_catch_unwind.rs | 27 ++ tests/stream_into_async_read.rs | 96 ++++++ tests/stream_peekable.rs | 13 + tests/stream_select_all.rs | 78 +++++ tests/stream_select_next_some.rs | 85 ++++++ tests/try_join.rs | 36 +++ tests/try_join_all.rs | 44 +++ tests/unfold.rs | 35 +++ tests_disabled/all.rs | 351 +++++++++++++++++++++ tests_disabled/bilock.rs | 105 +++++++ tests_disabled/stream.rs | 393 ++++++++++++++++++++++++ 62 files changed, 6096 insertions(+) create mode 100644 .gitignore create mode 100644 Android.bp create mode 100644 Cargo.toml create mode 100644 Cargo.toml.orig create mode 120000 LICENSE create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 METADATA create mode 100644 MODULE_LICENSE_APACHE2 create mode 120000 NOTICE create mode 100644 src/lib.rs create mode 100644 tests/abortable.rs create mode 100644 tests/arc_wake.rs create mode 100644 tests/async_await_macros.rs create mode 100644 tests/atomic_waker.rs create mode 100644 tests/basic_combinators.rs create mode 100644 tests/buffer_unordered.rs create mode 100644 tests/compat.rs create mode 100644 tests/eager_drop.rs create mode 100644 tests/eventual.rs create mode 100644 tests/fuse.rs create mode 100644 tests/future_obj.rs create mode 100644 tests/future_try_flatten_stream.rs create mode 100644 tests/futures_ordered.rs create mode 100644 tests/futures_unordered.rs create mode 100644 tests/inspect.rs create mode 100644 tests/io_buf_reader.rs create mode 100644 tests/io_buf_writer.rs create mode 100644 tests/io_cursor.rs create mode 100644 tests/io_lines.rs create mode 100644 tests/io_read.rs create mode 100644 tests/io_read_exact.rs create mode 100644 tests/io_read_line.rs create mode 100644 tests/io_read_to_string.rs create mode 100644 tests/io_read_until.rs create mode 100644 tests/io_window.rs create mode 100644 tests/io_write.rs create mode 100644 tests/join_all.rs create mode 100644 tests/macro_comma_support.rs create mode 100644 tests/mutex.rs create mode 100644 tests/object_safety.rs create mode 100644 tests/oneshot.rs create mode 100644 tests/ready_queue.rs create mode 100644 tests/recurse.rs create mode 100644 tests/select_all.rs create mode 100644 tests/select_ok.rs create mode 100644 tests/shared.rs create mode 100644 tests/sink.rs create mode 100644 tests/sink_fanout.rs create mode 100644 tests/split.rs create mode 100644 tests/stream.rs create mode 100644 tests/stream_catch_unwind.rs create mode 100644 tests/stream_into_async_read.rs create mode 100644 tests/stream_peekable.rs create mode 100644 tests/stream_select_all.rs create mode 100644 tests/stream_select_next_some.rs create mode 100644 tests/try_join.rs create mode 100644 tests/try_join_all.rs create mode 100644 tests/unfold.rs create mode 100644 tests_disabled/all.rs create mode 100644 tests_disabled/bilock.rs create mode 100644 tests_disabled/stream.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..de02da3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +target +**/*.rs.bk +Cargo.lock +_site +.sass-cache +.idea +.DS_Store diff --git a/Android.bp b/Android.bp new file mode 100644 index 0000000..c988699 --- /dev/null +++ b/Android.bp @@ -0,0 +1,44 @@ +// This file is generated by cargo2android.py. + +rust_library_host_rlib { + name: "libfutures", + crate_name: "futures", + srcs: ["src/lib.rs"], + edition: "2018", + features: [ + "alloc", + "async-await", + "default", + "executor", + "futures-executor", + "std", + ], + rlibs: [ + "libfutures_channel", + "libfutures_core", + "libfutures_executor", + "libfutures_io", + "libfutures_sink", + "libfutures_task", + "libfutures_util", + ], +} + +// dependent_library ["feature_list"] +// futures-channel-0.3.4 "alloc,futures-sink,sink,std" +// futures-core-0.3.4 "alloc,std" +// futures-executor-0.3.4 "std" +// futures-io-0.3.4 "std" +// futures-macro-0.3.4 +// futures-sink-0.3.4 "alloc,std" +// futures-task-0.3.4 "alloc,std" +// futures-util-0.3.4 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" +// memchr-2.3.3 "default,std" +// pin-utils-0.1.0-alpha.4 +// proc-macro-hack-0.5.11 +// proc-macro-nested-0.1.3 +// proc-macro2-1.0.9 "default,proc-macro" +// quote-1.0.3 "default,proc-macro" +// slab-0.4.2 +// syn-1.0.16 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" +// unicode-xid-0.2.0 "default" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..3f93105 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,76 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "futures" +version = "0.3.4" +authors = ["Alex Crichton "] +description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" +homepage = "https://rust-lang.github.io/futures-rs" +documentation = "https://docs.rs/futures/0.3.0" +readme = "../README.md" +keywords = ["futures", "async", "future"] +categories = ["asynchronous"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang/futures-rs" +[package.metadata.docs.rs] +all-features = true + +[package.metadata.playground] +features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] +[dependencies.futures-channel] +version = "0.3.4" +features = ["sink"] +default-features = false + +[dependencies.futures-core] +version = "0.3.4" +default-features = false + +[dependencies.futures-executor] +version = "0.3.4" +optional = true +default-features = false + +[dependencies.futures-io] +version = "0.3.4" +default-features = false + +[dependencies.futures-sink] +version = "0.3.4" +default-features = false + +[dependencies.futures-task] +version = "0.3.4" +default-features = false + +[dependencies.futures-util] +version = "0.3.4" +features = ["sink"] +default-features = false + +[features] +alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] +async-await = ["futures-util/async-await", "futures-util/async-await-macro"] +bilock = ["futures-util/bilock"] +cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] +compat = ["std", "futures-util/compat"] +default = ["std", "async-await", "executor"] +executor = ["std", "futures-executor/std"] +io-compat = ["compat", "futures-util/io-compat"] +read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] +std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"] +thread-pool = ["executor", "futures-executor/thread-pool"] +unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] +[badges.travis-ci] +repository = "rust-lang/futures-rs" diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..7095c19 --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,52 @@ +[package] +name = "futures" +edition = "2018" +version = "0.3.4" +authors = ["Alex Crichton "] +license = "MIT OR Apache-2.0" +readme = "../README.md" +keywords = ["futures", "async", "future"] +repository = "https://github.com/rust-lang/futures-rs" +homepage = "https://rust-lang.github.io/futures-rs" +documentation = "https://docs.rs/futures/0.3.0" +description = """ +An implementation of futures and streams featuring zero allocations, +composability, and iterator-like interfaces. +""" +categories = ["asynchronous"] + +[badges] +travis-ci = { repository = "rust-lang/futures-rs" } + +[dependencies] +futures-core = { path = "../futures-core", version = "0.3.4", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.4", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.4", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.4", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.4", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.4", default-features = false, features = ["sink"] } + +[features] +default = ["std", "async-await", "executor"] +std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"] +alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] +async-await = ["futures-util/async-await", "futures-util/async-await-macro"] +compat = ["std", "futures-util/compat"] +io-compat = ["compat", "futures-util/io-compat"] +executor = ["std", "futures-executor/std"] +thread-pool = ["executor", "futures-executor/thread-pool"] + +# Unstable features +# These features are outside of the normal semver guarantees and require the +# `unstable` feature as an explicit opt-in to unstable API. +unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] +cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] +bilock = ["futures-util/bilock"] +read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] + +[package.metadata.docs.rs] +all-features = true + +[package.metadata.playground] +features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] diff --git a/LICENSE b/LICENSE new file mode 120000 index 0000000..6b579aa --- /dev/null +++ b/LICENSE @@ -0,0 +1 @@ +LICENSE-APACHE \ No newline at end of file diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..9eb0b09 --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..8ad082e --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,26 @@ +Copyright (c) 2016 Alex Crichton +Copyright (c) 2017 The Tokio Authors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/METADATA b/METADATA new file mode 100644 index 0000000..42d8dba --- /dev/null +++ b/METADATA @@ -0,0 +1,18 @@ +name: "futures" +description: + "An implementation of futures and streams featuring zero allocations, " + "composability, and iterator-like interfaces." + +third_party { + url { + type: HOMEPAGE + value: "https://crates.io/crates/futures" + } + url { + type: GIT + value: "https://github.com/rust-lang/futures-rs" + } + version: "0.3.4" + last_upgrade_date { year: 2020 month: 3 day: 17 } + license_type: NOTICE +} diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2 new file mode 100644 index 0000000..e69de29 diff --git a/NOTICE b/NOTICE new file mode 120000 index 0000000..7a694c9 --- /dev/null +++ b/NOTICE @@ -0,0 +1 @@ +LICENSE \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3cdb3d3 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,605 @@ +//! Abstractions for asynchronous programming. +//! +//! This crate provides a number of core abstractions for writing asynchronous +//! code: +//! +//! - [Futures](crate::future::Future) are single eventual values produced by +//! asynchronous computations. Some programming languages (e.g. JavaScript) +//! call this concept "promise". +//! - [Streams](crate::stream::Stream) represent a series of values +//! produced asynchronously. +//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of +//! data. +//! - [Executors](crate::executor) are responsible for running asynchronous +//! tasks. +//! +//! The crate also contains abstractions for [asynchronous I/O](crate::io) and +//! [cross-task communication](crate::channel). +//! +//! Underlying all of this is the *task system*, which is a form of lightweight +//! threading. Large asynchronous computations are built up using futures, +//! streams and sinks, and then spawned as independent tasks that are run to +//! completion, but *do not block* the thread running them. +//! +//! The following example describes how the task system context is built and used +//! within macros and keywords such as async and await!. +//! +//! ```rust +//! # use futures::channel::mpsc; +//! # use futures::executor; ///standard executors to provide a context for futures and streams +//! # use futures::executor::ThreadPool; +//! # use futures::StreamExt; +//! +//! fn main() { +//! let pool = ThreadPool::new().expect("Failed to build pool"); +//! let (tx, rx) = mpsc::unbounded::(); +//! +//! // Create a future by an async block, where async is responsible for an +//! // implementation of Future. At this point no executor has been provided +//! // to this future, so it will not be running. +//! let fut_values = async { +//! // Create another async block, again where the Future implementation +//! // is generated by async. Since this is inside of a parent async block, +//! // it will be provided with the executor of the parent block when the parent +//! // block is executed. +//! // +//! // This executor chaining is done by Future::poll whose second argument +//! // is a std::task::Context. This represents our executor, and the Future +//! // implemented by this async block can be polled using the parent async +//! // block's executor. +//! let fut_tx_result = async move { +//! (0..100).for_each(|v| { +//! tx.unbounded_send(v).expect("Failed to send"); +//! }) +//! }; +//! +//! // Use the provided thread pool to spawn the generated future +//! // responsible for transmission +//! pool.spawn_ok(fut_tx_result); +//! +//! let fut_values = rx +//! .map(|v| v * 2) +//! .collect(); +//! +//! // Use the executor provided to this async block to wait for the +//! // future to complete. +//! fut_values.await +//! }; +//! +//! // Actually execute the above future, which will invoke Future::poll and +//! // subsequenty chain appropriate Future::poll and methods needing executors +//! // to drive all futures. Eventually fut_values will be driven to completion. +//! let values: Vec = executor::block_on(fut_values); +//! +//! println!("Values={:?}", values); +//! } +//! ``` +//! +//! The majority of examples and code snippets in this crate assume that they are +//! inside an async block as written above. + +#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] +#![cfg_attr(feature = "read-initializer", feature(read_initializer))] + +#![cfg_attr(not(feature = "std"), no_std)] + +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] +// It cannot be included in the published code because this lints have false positives in the minimum required version. +#![cfg_attr(test, warn(single_use_lifetimes))] +#![warn(clippy::all)] + +#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] + +#![doc(html_root_url = "https://docs.rs/futures/0.3.0")] + +#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] +compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + +#[cfg(all(feature = "bilock", not(feature = "unstable")))] +compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + +#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] +compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + +#[doc(hidden)] pub use futures_core::future::{Future, TryFuture}; +#[doc(hidden)] pub use futures_util::future::{FutureExt, TryFutureExt}; + +#[doc(hidden)] pub use futures_core::stream::{Stream, TryStream}; +#[doc(hidden)] pub use futures_util::stream::{StreamExt, TryStreamExt}; + +#[doc(hidden)] pub use futures_sink::Sink; +#[doc(hidden)] pub use futures_util::sink::SinkExt; + +#[cfg(feature = "std")] +#[doc(hidden)] pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; +#[cfg(feature = "std")] +#[doc(hidden)] pub use futures_util::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt}; + +// Macro reexports +pub use futures_core::ready; // Readiness propagation +pub use futures_util::pin_mut; +#[cfg(feature = "std")] +#[cfg(feature = "async-await")] +pub use futures_util::{pending, poll}; // Async-await + +#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +pub mod channel { + //! Cross-task communication. + //! + //! Like threads, concurrent tasks sometimes need to communicate with each + //! other. This module contains two basic abstractions for doing so: + //! + //! - [oneshot](crate::channel::oneshot), a way of sending a single value + //! from one task to another. + //! - [mpsc](crate::channel::mpsc), a multi-producer, single-consumer + //! channel for sending values between tasks, analogous to the + //! similarly-named structure in the standard library. + //! + //! This module is only available when the `std` or `alloc` feature of this + //! library is activated, and it is activated by default. + + pub use futures_channel::oneshot; + + #[cfg(feature = "std")] + pub use futures_channel::mpsc; +} + +#[cfg(feature = "compat")] +pub mod compat { + //! Interop between `futures` 0.1 and 0.3. + //! + //! This module is only available when the `compat` feature of this + //! library is activated. + + pub use futures_util::compat::{ + Compat, + CompatSink, + Compat01As03, + Compat01As03Sink, + Executor01Future, + Executor01As03, + Executor01CompatExt, + Future01CompatExt, + Stream01CompatExt, + Sink01CompatExt, + }; + + #[cfg(feature = "io-compat")] + pub use futures_util::compat::{ + AsyncRead01CompatExt, + AsyncWrite01CompatExt, + }; +} + +#[cfg(feature = "executor")] +pub mod executor { + //! Task execution. + //! + //! All asynchronous computation occurs within an executor, which is + //! capable of spawning futures as tasks. This module provides several + //! built-in executors, as well as tools for building your own. + //! + //! This module is only available when the `executor` feature of this + //! library is activated, and it is activated by default. + //! + //! # Using a thread pool (M:N task scheduling) + //! + //! Most of the time tasks should be executed on a [thread + //! pool](crate::executor::ThreadPool). A small set of worker threads can + //! handle a very large set of spawned tasks (which are much lighter weight + //! than threads). Tasks spawned onto the pool with the + //! [`spawn_ok()`](crate::executor::ThreadPool::spawn_ok) + //! function will run ambiently on the created threads. + //! + //! # Spawning additional tasks + //! + //! Tasks can be spawned onto a spawner by calling its + //! [`spawn_obj`](crate::task::Spawn::spawn_obj) method directly. + //! In the case of `!Send` futures, + //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj) + //! can be used instead. + //! + //! # Single-threaded execution + //! + //! In addition to thread pools, it's possible to run a task (and the tasks + //! it spawns) entirely within a single thread via the + //! [`LocalPool`](crate::executor::LocalPool) executor. Aside from cutting + //! down on synchronization costs, this executor also makes it possible to + //! spawn non-`Send` tasks, via + //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj). + //! The `LocalPool` is best suited for running I/O-bound tasks that do + //! relatively little work between I/O operations. + //! + //! There is also a convenience function + //! [`block_on`](crate::executor::block_on) for simply running a future to + //! completion on the current thread. + + pub use futures_executor::{ + BlockingStream, + Enter, EnterError, + LocalSpawner, LocalPool, + block_on, block_on_stream, enter, + }; + + #[cfg(feature = "thread-pool")] + pub use futures_executor::{ThreadPool, ThreadPoolBuilder}; +} + +pub mod future { + //! Asynchronous values. + //! + //! This module contains: + //! + //! - The [`Future` trait](crate::future::Future). + //! - The [`FutureExt`](crate::future::FutureExt) trait, which provides + //! adapters for chaining and composing futures. + //! - Top-level future combinators like [`lazy`](crate::future::lazy) which + //! creates a future from a closure that defines its return value, and + //! [`ready`](crate::future::ready), which constructs a future with an + //! immediate defined value. + + pub use futures_core::future::{ + Future, TryFuture, FusedFuture, + }; + + #[cfg(feature = "alloc")] + pub use futures_core::future::{BoxFuture, LocalBoxFuture}; + + pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; + + pub use futures_util::future::{ + lazy, Lazy, + maybe_done, MaybeDone, + pending, Pending, + poll_fn, PollFn, + ready, ok, err, Ready, + join, join3, join4, join5, + Join, Join3, Join4, Join5, + select, Select, + try_join, try_join3, try_join4, try_join5, + TryJoin, TryJoin3, TryJoin4, TryJoin5, + try_select, TrySelect, + Either, + OptionFuture, + + FutureExt, + FlattenStream, Flatten, Fuse, Inspect, IntoStream, Map, Then, UnitError, + NeverError, + + TryFutureExt, + AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse, + InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse, + }; + + #[cfg(feature = "alloc")] + pub use futures_util::future::{ + join_all, JoinAll, + select_all, SelectAll, + try_join_all, TryJoinAll, + select_ok, SelectOk, + }; + + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + pub use futures_util::future::{ + abortable, Abortable, AbortHandle, AbortRegistration, Aborted, + }; + + #[cfg(feature = "std")] + pub use futures_util::future::{ + Remote, RemoteHandle, + CatchUnwind, Shared, + }; +} + +#[cfg(feature = "std")] +pub mod io { + //! Asynchronous I/O. + //! + //! This module is the asynchronous version of `std::io`. It defines four + //! traits, [`AsyncRead`](crate::io::AsyncRead), + //! [`AsyncWrite`](crate::io::AsyncWrite), + //! [`AsyncSeek`](crate::io::AsyncSeek), and + //! [`AsyncBufRead`](crate::io::AsyncBufRead), which mirror the `Read`, + //! `Write`, `Seek`, and `BufRead` traits of the standard library. However, + //! these traits integrate + //! with the asynchronous task system, so that if an I/O object isn't ready + //! for reading (or writing), the thread is not blocked, and instead the + //! current task is queued to be woken when I/O is ready. + //! + //! In addition, the [`AsyncReadExt`](crate::io::AsyncReadExt), + //! [`AsyncWriteExt`](crate::io::AsyncWriteExt), + //! [`AsyncSeekExt`](crate::io::AsyncSeekExt), and + //! [`AsyncBufReadExt`](crate::io::AsyncBufReadExt) extension traits offer a + //! variety of useful combinators for operating with asynchronous I/O + //! objects, including ways to work with them using futures, streams and + //! sinks. + //! + //! This module is only available when the `std` feature of this + //! library is activated, and it is activated by default. + + pub use futures_io::{ + AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, + IoSlice, IoSliceMut, Result, SeekFrom, + }; + + #[cfg(feature = "read-initializer")] + pub use futures_io::Initializer; + + pub use futures_util::io::{ + AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, + BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf, + empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, + ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat, + Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, + WriteVectored, + }; +} + +#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +pub mod lock { + //! Futures-powered synchronization primitives. + //! + //! This module is only available when the `std` or `alloc` feature of this + //! library is activated, and it is activated by default. + + #[cfg(feature = "bilock")] + pub use futures_util::lock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; + + #[cfg(feature = "std")] + pub use futures_util::lock::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; +} + +pub mod prelude { + //! A "prelude" for crates using the `futures` crate. + //! + //! This prelude is similar to the standard library's prelude in that you'll + //! almost always want to import its entire contents, but unlike the + //! standard library's prelude you'll have to do so manually: + //! + //! ``` + //! # #[allow(unused_imports)] + //! use futures::prelude::*; + //! ``` + //! + //! The prelude may grow over time as additional items see ubiquitous use. + + pub use crate::future::{self, Future, TryFuture}; + pub use crate::stream::{self, Stream, TryStream}; + pub use crate::sink::{self, Sink}; + + #[doc(no_inline)] + pub use crate::future::{FutureExt as _, TryFutureExt as _}; + #[doc(no_inline)] + pub use crate::stream::{StreamExt as _, TryStreamExt as _}; + #[doc(no_inline)] + pub use crate::sink::SinkExt as _; + + #[cfg(feature = "std")] + pub use crate::io::{ + AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, + }; + + #[cfg(feature = "std")] + #[doc(no_inline)] + pub use crate::io::{ + AsyncReadExt as _, AsyncWriteExt as _, AsyncSeekExt as _, AsyncBufReadExt as _, + }; +} + +pub mod sink { + //! Asynchronous sinks. + //! + //! This module contains: + //! + //! - The [`Sink` trait](crate::sink::Sink), which allows you to + //! asynchronously write data. + //! - The [`SinkExt`](crate::sink::SinkExt) trait, which provides adapters + //! for chaining and composing sinks. + + pub use futures_sink::Sink; + + pub use futures_util::sink::{ + Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, + SinkExt, Fanout, Drain, drain, + WithFlatMap, + }; + + #[cfg(feature = "alloc")] + pub use futures_util::sink::Buffer; +} + +pub mod stream { + //! Asynchronous streams. + //! + //! This module contains: + //! + //! - The [`Stream` trait](crate::stream::Stream), for objects that can + //! asynchronously produce a sequence of values. + //! - The [`StreamExt`](crate::stream::StreamExt) trait, which provides + //! adapters for chaining and composing streams. + //! - Top-level stream contructors like [`iter`](crate::stream::iter) + //! which creates a stream from an iterator. + + pub use futures_core::stream::{ + Stream, TryStream, FusedStream, + }; + + #[cfg(feature = "alloc")] + pub use futures_core::stream::{BoxStream, LocalBoxStream}; + + pub use futures_util::stream::{ + iter, Iter, + repeat, Repeat, + empty, Empty, + pending, Pending, + once, Once, + poll_fn, PollFn, + select, Select, + unfold, Unfold, + try_unfold, TryUnfold, + + StreamExt, + Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, + Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, + SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeWhile, + Then, Zip, + + TryStreamExt, + AndThen, ErrInto, MapOk, MapErr, OrElse, + InspectOk, InspectErr, + TryNext, TryForEach, TryFilter, TryFilterMap, TryFlatten, + TryCollect, TryConcat, TryFold, TrySkipWhile, + IntoStream, + }; + + #[cfg(feature = "alloc")] + pub use futures_util::stream::{ + // For StreamExt: + Chunks, + }; + + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + pub use futures_util::stream::{ + FuturesOrdered, + futures_unordered, FuturesUnordered, + + // For StreamExt: + BufferUnordered, Buffered, ForEachConcurrent, SplitStream, SplitSink, + ReuniteError, + + select_all, SelectAll, + }; + + #[cfg(feature = "std")] + pub use futures_util::stream::{ + // For StreamExt: + CatchUnwind, + }; + + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + pub use futures_util::stream::{ + // For TryStreamExt: + TryBufferUnordered, TryForEachConcurrent, + }; + + #[cfg(feature = "std")] + pub use futures_util::stream::IntoAsyncRead; +} + +pub mod task { + //! Tools for working with tasks. + //! + //! This module contains: + //! + //! - [`Spawn`](crate::task::Spawn), a trait for spawning new tasks. + //! - [`Context`](crate::task::Context), a context of an asynchronous task, + //! including a handle for waking up the task. + //! - [`Waker`](crate::task::Waker), a handle for waking up a task. + //! + //! The remaining types and traits in the module are used for implementing + //! executors or dealing with synchronization issues around task wakeup. + + pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; + + pub use futures_task::{ + Spawn, LocalSpawn, SpawnError, + FutureObj, LocalFutureObj, UnsafeFutureObj, + }; + + pub use futures_util::task::noop_waker; + + #[cfg(feature = "std")] + pub use futures_util::task::noop_waker_ref; + + #[cfg(feature = "alloc")] + pub use futures_util::task::{SpawnExt, LocalSpawnExt}; + + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + pub use futures_util::task::{waker, waker_ref, WakerRef, ArcWake}; + + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + pub use futures_util::task::AtomicWaker; +} + +pub mod never { + //! This module contains the `Never` type. + //! + //! Values of this type can never be created and will never exist. + + pub use futures_util::never::Never; +} + +// proc-macro re-export -------------------------------------- + +// Not public API. +#[doc(hidden)] +pub use futures_core::core_reexport; + +// Not public API. +#[cfg(feature = "async-await")] +#[doc(hidden)] +pub use futures_util::async_await; + +// Not public API. +#[cfg(feature = "async-await")] +#[doc(hidden)] +pub mod inner_macro { + pub use futures_util::join; + pub use futures_util::try_join; + #[cfg(feature = "std")] + pub use futures_util::select; + pub use futures_util::select_biased; +} + +#[cfg(feature = "async-await")] +futures_util::document_join_macro! { + #[macro_export] + macro_rules! join { // replace `::futures_util` with `::futures` as the crate path + ($($tokens:tt)*) => { + $crate::inner_macro::join! { + futures_crate_path ( ::futures ) + $( $tokens )* + } + } + } + + #[macro_export] + macro_rules! try_join { // replace `::futures_util` with `::futures` as the crate path + ($($tokens:tt)*) => { + $crate::inner_macro::try_join! { + futures_crate_path ( ::futures ) + $( $tokens )* + } + } + } +} + +#[cfg(feature = "async-await")] +futures_util::document_select_macro! { + #[cfg(feature = "std")] + #[macro_export] + macro_rules! select { // replace `::futures_util` with `::futures` as the crate path + ($($tokens:tt)*) => { + $crate::inner_macro::select! { + futures_crate_path ( ::futures ) + $( $tokens )* + } + } + } + + #[macro_export] + macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path + ($($tokens:tt)*) => { + $crate::inner_macro::select_biased! { + futures_crate_path ( ::futures ) + $( $tokens )* + } + } + } +} diff --git a/tests/abortable.rs b/tests/abortable.rs new file mode 100644 index 0000000..5925c9a --- /dev/null +++ b/tests/abortable.rs @@ -0,0 +1,39 @@ +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{abortable, Aborted, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::new_count_waker; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = oneshot::channel::<()>(); + let (abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert_eq!(Err(Aborted), block_on(abortable_rx)); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = oneshot::channel::<()>(); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx)); + assert_eq!(counter, 0); + abort_handle.abort(); + assert_eq!(counter, 1); + assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (tx, a_rx) = oneshot::channel::<()>(); + let (abortable_rx, _abort_handle) = abortable(a_rx); + + tx.send(()).unwrap(); + + assert_eq!(Ok(Ok(())), block_on(abortable_rx)); +} diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs new file mode 100644 index 0000000..1940e4f --- /dev/null +++ b/tests/arc_wake.rs @@ -0,0 +1,77 @@ +use futures::task::{self, ArcWake, Waker}; +use std::sync::{Arc, Mutex}; + +struct CountingWaker { + nr_wake: Mutex, +} + +impl CountingWaker { + fn new() -> CountingWaker { + CountingWaker { + nr_wake: Mutex::new(0), + } + } + + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() + } +} + +impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; + } +} + +#[test] +fn create_waker_from_arc() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); + + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); + + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); + + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); +} + +struct PanicWaker; + +impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc) { + panic!("WAKE UP"); + } +} + +#[test] +fn proper_refcount_on_wake_panic() { + let some_w = Arc::new(PanicWaker); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()); + assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); // some_w +} + +#[test] +fn waker_ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); + + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); +} diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs new file mode 100644 index 0000000..bc717df --- /dev/null +++ b/tests/async_await_macros.rs @@ -0,0 +1,360 @@ +#![recursion_limit="128"] + +use futures::{pending, pin_mut, poll, join, try_join, select}; +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, FutureExt, poll_fn}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; + +#[test] +fn poll_and_pending() { + let pending_once = async { pending!() }; + block_on(async { + pin_mut!(pending_once); + assert_eq!(Poll::Pending, poll!(&mut pending_once)); + assert_eq!(Poll::Ready(()), poll!(&mut pending_once)); + }); +} + +#[test] +fn join() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = oneshot::channel::(); + + let fut = async { + let res = join!(rx1, rx2); + assert_eq!((Ok(1), Ok(2)), res); + }; + + block_on(async { + pin_mut!(fut); + assert_eq!(Poll::Pending, poll!(&mut fut)); + tx1.send(1).unwrap(); + assert_eq!(Poll::Pending, poll!(&mut fut)); + tx2.send(2).unwrap(); + assert_eq!(Poll::Ready(()), poll!(&mut fut)); + }); +} + +#[test] +fn select() { + let (tx1, rx1) = oneshot::channel::(); + let (_tx2, rx2) = oneshot::channel::(); + tx1.send(1).unwrap(); + let mut ran = false; + block_on(async { + select! { + res = rx1.fuse() => { + assert_eq!(Ok(1), res); + ran = true; + }, + _ = rx2.fuse() => unreachable!(), + } + }); + assert!(ran); +} + +#[test] +fn select_biased() { + use futures::select_biased; + + let (tx1, rx1) = oneshot::channel::(); + let (_tx2, rx2) = oneshot::channel::(); + tx1.send(1).unwrap(); + let mut ran = false; + block_on(async { + select_biased! { + res = rx1.fuse() => { + assert_eq!(Ok(1), res); + ran = true; + }, + _ = rx2.fuse() => unreachable!(), + } + }); + assert!(ran); +} + +#[test] +fn select_streams() { + let (mut tx1, rx1) = mpsc::channel::(1); + let (mut tx2, rx2) = mpsc::channel::(1); + let mut rx1 = rx1.fuse(); + let mut rx2 = rx2.fuse(); + let mut ran = false; + let mut total = 0; + block_on(async { + let mut tx1_opt; + let mut tx2_opt; + select! { + _ = rx1.next() => panic!(), + _ = rx2.next() => panic!(), + default => { + tx1.send(2).await.unwrap(); + tx2.send(3).await.unwrap(); + tx1_opt = Some(tx1); + tx2_opt = Some(tx2); + } + complete => panic!(), + } + loop { + select! { + // runs first and again after default + x = rx1.next() => if let Some(x) = x { total += x; }, + // runs second and again after default + x = rx2.next() => if let Some(x) = x { total += x; }, + // runs third + default => { + assert_eq!(total, 5); + ran = true; + drop(tx1_opt.take().unwrap()); + drop(tx2_opt.take().unwrap()); + }, + // runs last + complete => break, + }; + } + }); + assert!(ran); +} + +#[test] +fn select_can_move_uncompleted_futures() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = oneshot::channel::(); + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + let mut ran = false; + let mut rx1 = rx1.fuse(); + let mut rx2 = rx2.fuse(); + block_on(async { + select! { + res = rx1 => { + assert_eq!(Ok(1), res); + assert_eq!(Ok(2), rx2.await); + ran = true; + }, + res = rx2 => { + assert_eq!(Ok(2), res); + assert_eq!(Ok(1), rx1.await); + ran = true; + }, + } + }); + assert!(ran); +} + +#[test] +fn select_nested() { + let mut outer_fut = future::ready(1); + let mut inner_fut = future::ready(2); + let res = block_on(async { + select! { + x = outer_fut => { + select! { + y = inner_fut => x + y, + } + } + } + }); + assert_eq!(res, 3); +} + +#[test] +fn select_size() { + let fut = async { + let mut ready = future::ready(0i32); + select! { + _ = ready => {}, + } + }; + assert_eq!(::std::mem::size_of_val(&fut), 24); + + let fut = async { + let mut ready1 = future::ready(0i32); + let mut ready2 = future::ready(0i32); + select! { + _ = ready1 => {}, + _ = ready2 => {}, + } + }; + assert_eq!(::std::mem::size_of_val(&fut), 40); +} + +#[test] +fn select_on_non_unpin_expressions() { + // The returned Future is !Unpin + let make_non_unpin_fut = || { async { + 5 + }}; + + let res = block_on(async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, + value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, + }; + select_res + }); + assert_eq!(res, 5); +} + +#[test] +fn select_on_non_unpin_expressions_with_default() { + // The returned Future is !Unpin + let make_non_unpin_fut = || { async { + 5 + }}; + + let res = block_on(async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, + value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, + default => { select_res = 7 }, + }; + select_res + }); + assert_eq!(res, 5); +} + +#[test] +fn select_on_non_unpin_size() { + // The returned Future is !Unpin + let make_non_unpin_fut = || { async { + 5 + }}; + + let fut = async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, + value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, + }; + select_res + }; + + assert_eq!(48, std::mem::size_of_val(&fut)); +} + +#[test] +fn select_can_be_used_as_expression() { + block_on(async { + let res = select! { + x = future::ready(7) => { x }, + y = future::ready(3) => { y + 1 }, + }; + assert!(res == 7 || res == 4); + }); +} + +#[test] +fn select_with_default_can_be_used_as_expression() { + fn poll_always_pending(_cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + + block_on(async { + let res = select! { + x = poll_fn(poll_always_pending::).fuse() => x, + y = poll_fn(poll_always_pending::).fuse() => { y + 1 }, + default => 99, + }; + assert_eq!(res, 99); + }); +} + +#[test] +fn select_with_complete_can_be_used_as_expression() { + block_on(async { + let res = select! { + x = future::pending::() => { x }, + y = future::pending::() => { y + 1 }, + default => 99, + complete => 237, + }; + assert_eq!(res, 237); + }); +} + +async fn require_mutable(_: &mut i32) {} +async fn async_noop() {} + +#[test] +fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { + block_on(async { + let mut value = 234; + select! { + x = require_mutable(&mut value).fuse() => { }, + y = async_noop().fuse() => { + value += 5; + }, + } + }); +} + +#[test] +fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { + block_on(async { + let mut value = 234; + select! { + x = require_mutable(&mut value).fuse() => { }, + y = async_noop().fuse() => { + value += 5; + }, + default => { + value += 27; + }, + } + }); +} + +#[test] +fn join_size() { + let fut = async { + let ready = future::ready(0i32); + join!(ready) + }; + assert_eq!(::std::mem::size_of_val(&fut), 16); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + join!(ready1, ready2) + }; + assert_eq!(::std::mem::size_of_val(&fut), 28); +} + +#[test] +fn try_join_size() { + let fut = async { + let ready = future::ready(Ok::(0)); + try_join!(ready) + }; + assert_eq!(::std::mem::size_of_val(&fut), 16); + + let fut = async { + let ready1 = future::ready(Ok::(0)); + let ready2 = future::ready(Ok::(0)); + try_join!(ready1, ready2) + }; + assert_eq!(::std::mem::size_of_val(&fut), 28); +} + +#[test] +fn join_doesnt_require_unpin() { + let _ = async { + join!(async {}, async {}) + }; +} + +#[test] +fn try_join_doesnt_require_unpin() { + let _ = async { + try_join!( + async { Ok::<(), ()>(()) }, + async { Ok::<(), ()>(()) }, + ) + }; +} diff --git a/tests/atomic_waker.rs b/tests/atomic_waker.rs new file mode 100644 index 0000000..d9ce753 --- /dev/null +++ b/tests/atomic_waker.rs @@ -0,0 +1,49 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; + +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::task::{AtomicWaker, Poll}; + +#[test] +fn basic() { + let atomic_waker = Arc::new(AtomicWaker::new()); + let atomic_waker_copy = atomic_waker.clone(); + + let returned_pending = Arc::new(AtomicUsize::new(0)); + let returned_pending_copy = returned_pending.clone(); + + let woken = Arc::new(AtomicUsize::new(0)); + let woken_copy = woken.clone(); + + let t = thread::spawn(move || { + let mut pending_count = 0; + + block_on(poll_fn(move |cx| { + if woken_copy.load(Ordering::Relaxed) == 1 { + Poll::Ready(()) + } else { + // Assert we return pending exactly once + assert_eq!(0, pending_count); + pending_count += 1; + atomic_waker_copy.register(cx.waker()); + + returned_pending_copy.store(1, Ordering::Relaxed); + + Poll::Pending + } + })) + }); + + while returned_pending.load(Ordering::Relaxed) == 0 {} + + // give spawned thread some time to sleep in `block_on` + thread::yield_now(); + + woken.store(1, Ordering::Relaxed); + atomic_waker.wake(); + + t.join().unwrap(); +} diff --git a/tests/basic_combinators.rs b/tests/basic_combinators.rs new file mode 100644 index 0000000..fa65b6f --- /dev/null +++ b/tests/basic_combinators.rs @@ -0,0 +1,98 @@ +use futures::future::{self, FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; + +#[test] +fn basic_future_combinators() { + let (tx1, rx) = mpsc::channel(); + let tx2 = tx1.clone(); + let tx3 = tx1.clone(); + + let fut = future::ready(1) + .then(move |x| { + tx1.send(x).unwrap(); // Send 1 + tx1.send(2).unwrap(); // Send 2 + future::ready(3) + }).map(move |x| { + tx2.send(x).unwrap(); // Send 3 + tx2.send(4).unwrap(); // Send 4 + 5 + }).map(move |x| { + tx3.send(x).unwrap(); // Send 5 + }); + + assert!(rx.try_recv().is_err()); // Not started yet + fut.run_in_background(); // Start it + for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it + assert!(rx.recv().is_err()); // Should be done +} + +#[test] +fn basic_try_future_combinators() { + let (tx1, rx) = mpsc::channel(); + let tx2 = tx1.clone(); + let tx3 = tx1.clone(); + let tx4 = tx1.clone(); + let tx5 = tx1.clone(); + let tx6 = tx1.clone(); + let tx7 = tx1.clone(); + let tx8 = tx1.clone(); + let tx9 = tx1.clone(); + let tx10 = tx1.clone(); + + let fut = future::ready(Ok(1)) + .and_then(move |x: i32| { + tx1.send(x).unwrap(); // Send 1 + tx1.send(2).unwrap(); // Send 2 + future::ready(Ok(3)) + }) + .or_else(move |x: i32| { + tx2.send(x).unwrap(); // Should not run + tx2.send(-1).unwrap(); + future::ready(Ok(-1)) + }) + .map_ok(move |x: i32| { + tx3.send(x).unwrap(); // Send 3 + tx3.send(4).unwrap(); // Send 4 + 5 + }) + .map_err(move |x: i32| { + tx4.send(x).unwrap(); // Should not run + tx4.send(-1).unwrap(); + -1 + }) + .map(move |x: Result| { + tx5.send(x.unwrap()).unwrap(); // Send 5 + tx5.send(6).unwrap(); // Send 6 + Err(7) // Now return errors! + }) + .and_then(move |x: i32| { + tx6.send(x).unwrap(); // Should not run + tx6.send(-1).unwrap(); + future::ready(Err(-1)) + }) + .or_else(move |x: i32| { + tx7.send(x).unwrap(); // Send 7 + tx7.send(8).unwrap(); // Send 8 + future::ready(Err(9)) + }) + .map_ok(move |x: i32| { + tx8.send(x).unwrap(); // Should not run + tx8.send(-1).unwrap(); + -1 + }) + .map_err(move |x: i32| { + tx9.send(x).unwrap(); // Send 9 + tx9.send(10).unwrap(); // Send 10 + 11 + }) + .map(move |x: Result| { + tx10.send(x.err().unwrap()).unwrap(); // Send 11 + tx10.send(12).unwrap(); // Send 12 + }); + + assert!(rx.try_recv().is_err()); // Not started yet + fut.run_in_background(); // Start it + for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it + assert!(rx.recv().is_err()); // Should be done +} diff --git a/tests/buffer_unordered.rs b/tests/buffer_unordered.rs new file mode 100644 index 0000000..1c559c8 --- /dev/null +++ b/tests/buffer_unordered.rs @@ -0,0 +1,73 @@ +use futures::channel::{oneshot, mpsc}; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::mpsc as std_mpsc; +use std::thread; + +#[test] +#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 +fn works() { + const N: usize = 4; + + let (mut tx, rx) = mpsc::channel(1); + + let (tx2, rx2) = std_mpsc::channel(); + let (tx3, rx3) = std_mpsc::channel(); + let t1 = thread::spawn(move || { + for _ in 0..=N { + let (mytx, myrx) = oneshot::channel(); + block_on(tx.send(myrx)).unwrap(); + tx3.send(mytx).unwrap(); + } + rx2.recv().unwrap(); + for _ in 0..N { + let (mytx, myrx) = oneshot::channel(); + block_on(tx.send(myrx)).unwrap(); + tx3.send(mytx).unwrap(); + } + }); + + let (tx4, rx4) = std_mpsc::channel(); + let t2 = thread::spawn(move || { + for item in block_on_stream(rx.buffer_unordered(N)) { + tx4.send(item.unwrap()).unwrap(); + } + }); + + let o1 = rx3.recv().unwrap(); + let o2 = rx3.recv().unwrap(); + let o3 = rx3.recv().unwrap(); + let o4 = rx3.recv().unwrap(); + assert!(rx4.try_recv().is_err()); + + o1.send(1).unwrap(); + assert_eq!(rx4.recv(), Ok(1)); + o3.send(3).unwrap(); + assert_eq!(rx4.recv(), Ok(3)); + tx2.send(()).unwrap(); + o2.send(2).unwrap(); + assert_eq!(rx4.recv(), Ok(2)); + o4.send(4).unwrap(); + assert_eq!(rx4.recv(), Ok(4)); + + let o5 = rx3.recv().unwrap(); + let o6 = rx3.recv().unwrap(); + let o7 = rx3.recv().unwrap(); + let o8 = rx3.recv().unwrap(); + let o9 = rx3.recv().unwrap(); + + o5.send(5).unwrap(); + assert_eq!(rx4.recv(), Ok(5)); + o8.send(8).unwrap(); + assert_eq!(rx4.recv(), Ok(8)); + o9.send(9).unwrap(); + assert_eq!(rx4.recv(), Ok(9)); + o7.send(7).unwrap(); + assert_eq!(rx4.recv(), Ok(7)); + o6.send(6).unwrap(); + assert_eq!(rx4.recv(), Ok(6)); + + t1.join().unwrap(); + t2.join().unwrap(); +} diff --git a/tests/compat.rs b/tests/compat.rs new file mode 100644 index 0000000..39adc7c --- /dev/null +++ b/tests/compat.rs @@ -0,0 +1,17 @@ +#![cfg(feature = "compat")] + +use tokio::timer::Delay; +use tokio::runtime::Runtime; +use std::time::Instant; +use futures::prelude::*; +use futures::compat::Future01CompatExt; + +#[test] +fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() { + let f = async { + Delay::new(Instant::now()).compat().await + }; + + let mut runtime = Runtime::new().unwrap(); + runtime.block_on(f.boxed().compat()).unwrap(); +} diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs new file mode 100644 index 0000000..674e401 --- /dev/null +++ b/tests/eager_drop.rs @@ -0,0 +1,117 @@ +use futures::channel::oneshot; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use pin_utils::unsafe_pinned; +use std::pin::Pin; +use std::sync::mpsc; + +#[test] +fn map_ok() { + // The closure given to `map_ok` should have been dropped by the time `map` + // runs. + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + future::ready::>(Err(1)) + .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map(move |_| { + assert!(rx1.recv().is_err()); + tx2.send(()).unwrap() + }) + .run_in_background(); + + rx2.recv().unwrap(); +} + +#[test] +fn map_err() { + // The closure given to `map_err` should have been dropped by the time `map` + // runs. + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + future::ready::>(Ok(1)) + .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map(move |_| { + assert!(rx1.recv().is_err()); + tx2.send(()).unwrap() + }) + .run_in_background(); + + rx2.recv().unwrap(); +} + +struct FutureData { + _data: T, + future: F, +} + +impl FutureData { + unsafe_pinned!(future: F); +} + +impl Future for FutureData { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.future().poll(cx) + } +} + +#[test] +fn then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn and_then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn or_else_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); +} diff --git a/tests/eventual.rs b/tests/eventual.rs new file mode 100644 index 0000000..bff000d --- /dev/null +++ b/tests/eventual.rs @@ -0,0 +1,159 @@ +use futures::channel::oneshot; +use futures::executor::ThreadPool; +use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; +use futures::task::SpawnExt; +use std::sync::mpsc; +use std::thread; + +fn run(future: F) { + let tp = ThreadPool::new().unwrap(); + tp.spawn(future.map(drop)).unwrap(); +} + +#[test] +fn join1() { + let (tx, rx) = mpsc::channel(); + run(future::try_join(ok::(1), ok(2)).map_ok(move |v| tx.send(v).unwrap())); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); +} + +#[test] +fn join2() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); +} + +#[test] +fn join3() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + assert_eq!(rx.recv(), Ok(1)); + assert!(rx.recv().is_err()); + drop(c2); +} + +#[test] +fn join4() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + assert!(rx.recv().is_ok()); + drop(c2); + assert!(rx.recv().is_err()); +} + +#[test] +fn join5() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (c3, p3) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert!(rx.try_recv().is_err()); + c3.send(3).unwrap(); + assert_eq!(rx.recv(), Ok(((1, 2), 3))); + assert!(rx.recv().is_err()); +} + +#[test] +fn select1() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + let (v, p2) = rx.recv().unwrap().into_inner(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select2() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select3() { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_err(move |_v| tx.send(2).unwrap())); + drop(c2); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select4() { + let (tx, rx) = mpsc::channel::>(); + + let t = thread::spawn(move || { + for c in rx { + c.send(1).unwrap(); + } + }); + + let (tx2, rx2) = mpsc::channel(); + for _ in 0..10000 { + let (c1, p1) = oneshot::channel::(); + let (c2, p2) = oneshot::channel::(); + + let tx3 = tx2.clone(); + run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap())); + tx.send(c1).unwrap(); + rx2.recv().unwrap(); + drop(c2); + } + drop(tx); + + t.join().unwrap(); +} diff --git a/tests/fuse.rs b/tests/fuse.rs new file mode 100644 index 0000000..83f2c1c --- /dev/null +++ b/tests/fuse.rs @@ -0,0 +1,12 @@ +use futures::future::{self, FutureExt}; +use futures::task::Context; +use futures_test::task::panic_waker; + +#[test] +fn fuse() { + let mut future = future::ready::(2).fuse(); + let waker = panic_waker(); + let mut cx = Context::from_waker(&waker); + assert!(future.poll_unpin(&mut cx).is_ready()); + assert!(future.poll_unpin(&mut cx).is_pending()); +} diff --git a/tests/future_obj.rs b/tests/future_obj.rs new file mode 100644 index 0000000..c6b18fc --- /dev/null +++ b/tests/future_obj.rs @@ -0,0 +1,33 @@ +use futures::future::{Future, FutureObj, FutureExt}; +use std::pin::Pin; +use futures::task::{Context, Poll}; + +#[test] +fn dropping_does_not_segfault() { + FutureObj::new(async { String::new() }.boxed()); +} + +#[test] +fn dropping_drops_the_future() { + let mut times_dropped = 0; + + struct Inc<'a>(&'a mut u32); + + impl Future for Inc<'_> { + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<()> { + unimplemented!() + } + } + + impl Drop for Inc<'_> { + fn drop(&mut self) { + *self.0 += 1; + } + } + + FutureObj::new(Inc(&mut times_dropped).boxed()); + + assert_eq!(times_dropped, 1); +} diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs new file mode 100644 index 0000000..082c5ef --- /dev/null +++ b/tests/future_try_flatten_stream.rs @@ -0,0 +1,82 @@ +use core::marker::PhantomData; +use core::pin::Pin; +use futures::executor::block_on_stream; +use futures::future::{ok, err, TryFutureExt}; +use futures::sink::Sink; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; + +#[test] +fn successful_future() { + let stream_items = vec![17, 19]; + let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); + + let stream = future_of_a_stream.try_flatten_stream(); + + let mut iter = block_on_stream(stream); + assert_eq!(Ok(17), iter.next().unwrap()); + assert_eq!(Ok(19), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} + +struct PanickingStream { + _marker: PhantomData<(T, E)> +} + +impl Stream for PanickingStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } +} + +#[test] +fn failed_future() { + let future_of_a_stream = err::, _>(10); + let stream = future_of_a_stream.try_flatten_stream(); + let mut iter = block_on_stream(stream); + assert_eq!(Err(10), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} + +struct StreamSink(PhantomData<(T, E, Item)>); + +impl Stream for StreamSink { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } +} + +impl Sink for StreamSink { + type Error = E; + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } + fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { + panic!() + } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } +} + +fn assert_stream(_: &S) {} +fn assert_sink, Item>(_: &S) {} +fn assert_stream_sink, Item>(_: &S) {} + +#[test] +fn assert_impls() { + let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream(); + assert_stream(&s); + assert_sink(&s); + assert_stream_sink(&s); + let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink(); + assert_stream(&s); + assert_sink(&s); + assert_stream_sink(&s); +} diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs new file mode 100644 index 0000000..d06b62f --- /dev/null +++ b/tests/futures_ordered.rs @@ -0,0 +1,83 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; +use futures::stream::{StreamExt, FuturesOrdered}; +use futures_test::task::noop_context; +use std::any::Any; + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::>(); + + b_tx.send(99).unwrap(); + assert!(stream.poll_next_unpin(&mut noop_context()).is_pending()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + + let mut iter = block_on_stream(stream); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(Some(Ok(99)), iter.next()); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(None, iter.next()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = vec![ + a_rx.boxed(), + join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), + ].into_iter().collect::>(); + + let mut cx = noop_context(); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(&mut cx).is_ready()); + assert!(stream.poll_next_unpin(&mut cx).is_pending()); + c_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(&mut cx).is_ready()); +} + +#[test] +fn from_iterator() { + let stream = vec![ + future::ready::(1), + future::ready::(2), + future::ready::(3) + ].into_iter().collect::>(); + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.collect::>()), vec![1,2,3]); +} + +#[test] +fn queue_never_unblocked() { + let (_a_tx, a_rx) = oneshot::channel::>(); + let (b_tx, b_rx) = oneshot::channel::>(); + let (c_tx, c_rx) = oneshot::channel::>(); + + let mut stream = vec![ + Box::new(a_rx) as Box + Unpin>, + Box::new(future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box))) as _, + ].into_iter().collect::>(); + + let cx = &mut noop_context(); + for _ in 0..10 { + assert!(stream.poll_next_unpin(cx).is_pending()); + } + + b_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + c_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs new file mode 100644 index 0000000..57eb98f --- /dev/null +++ b/tests/futures_unordered.rs @@ -0,0 +1,287 @@ +use std::marker::Unpin; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; + +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt}; +use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::noop_context; +use futures_test::{assert_stream_done, assert_stream_next}; + +#[test] +fn is_terminated() { + let mut cx = noop_context(); + let mut tasks = FuturesUnordered::new(); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); + + // Test that the sentinel value doesn't leak + assert_eq!(tasks.is_empty(), true); + assert_eq!(tasks.len(), 0); + assert_eq!(tasks.iter_mut().len(), 0); + + tasks.push(future::ready(1)); + + assert_eq!(tasks.is_empty(), false); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks.iter_mut().len(), 1); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); +} + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut iter = block_on_stream( + vec![a_rx, b_rx, c_rx] + .into_iter() + .collect::>(), + ); + + b_tx.send(99).unwrap(); + assert_eq!(Some(Ok(99)), iter.next()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(None, iter.next()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = vec![ + a_rx.boxed(), + join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), + ] + .into_iter() + .collect::>(); + + a_tx.send(9).unwrap(); + b_tx.send(10).unwrap(); + + let mut cx = noop_context(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9)))); + c_tx.send(20).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30)))); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None)); +} + +#[test] +fn from_iterator() { + let stream = vec![ + future::ready::(1), + future::ready::(2), + future::ready::(3), + ] + .into_iter() + .collect::>(); + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); +} + +#[test] +fn finished_future() { + let (_a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = vec![ + Box::new(a_rx) as Box> + Unpin>, + Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _, + ] + .into_iter() + .collect::>(); + + let cx = &mut noop_context(); + for _ in 0..10 { + assert!(stream.poll_next_unpin(cx).is_pending()); + } + + b_tx.send(12).unwrap(); + c_tx.send(3).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} + +#[test] +fn iter_mut_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = vec![a_rx, b_rx, c_rx] + .into_iter() + .collect::>(); + + for rx in stream.iter_mut() { + rx.close(); + } + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn iter_mut_len() { + let mut stream = vec![ + future::pending::<()>(), + future::pending::<()>(), + future::pending::<()>(), + ] + .into_iter() + .collect::>(); + + let mut iter_mut = stream.iter_mut(); + assert_eq!(iter_mut.len(), 3); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 2); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 1); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 0); + assert!(iter_mut.next().is_none()); +} + +#[test] +fn iter_cancel() { + struct AtomicCancel { + future: F, + cancel: AtomicBool, + } + + impl Future for AtomicCancel { + type Output = Option<::Output>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.cancel.load(Ordering::Relaxed) { + Poll::Ready(None) + } else { + self.future.poll_unpin(cx).map(Some) + } + } + } + + impl AtomicCancel { + fn new(future: F) -> Self { + Self { future, cancel: AtomicBool::new(false) } + } + } + + let stream = vec![ + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + ] + .into_iter() + .collect::>(); + + for f in stream.iter() { + f.cancel.store(true, Ordering::Relaxed); + } + + let mut iter = block_on_stream(stream); + + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), None); +} + +#[test] +fn iter_len() { + let stream = vec![ + future::pending::<()>(), + future::pending::<()>(), + future::pending::<()>(), + ] + .into_iter() + .collect::>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn futures_not_moved_after_poll() { + // Future that will be ready after being polled twice, + // asserting that it does not move. + let fut = future::ready(()).pending_once().assert_unmoved(); + let mut stream = vec![fut; 3].into_iter().collect::>(); + assert_stream_next!(stream, ()); + assert_stream_next!(stream, ()); + assert_stream_next!(stream, ()); + assert_stream_done!(stream); +} + +#[test] +fn len_valid_during_out_of_order_completion() { + // Complete futures out-of-order and add new futures afterwards to ensure + // length values remain correct. + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + let (d_tx, d_rx) = oneshot::channel::(); + + let mut cx = noop_context(); + let mut stream = FuturesUnordered::new(); + assert_eq!(stream.len(), 0); + + stream.push(a_rx); + assert_eq!(stream.len(), 1); + stream.push(b_rx); + assert_eq!(stream.len(), 2); + stream.push(c_rx); + assert_eq!(stream.len(), 3); + + b_tx.send(4).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4)))); + assert_eq!(stream.len(), 2); + + stream.push(d_rx); + assert_eq!(stream.len(), 3); + + c_tx.send(5).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5)))); + assert_eq!(stream.len(), 2); + + d_tx.send(6).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6)))); + assert_eq!(stream.len(), 1); + + a_tx.send(7).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7)))); + assert_eq!(stream.len(), 0); +} diff --git a/tests/inspect.rs b/tests/inspect.rs new file mode 100644 index 0000000..42f6f73 --- /dev/null +++ b/tests/inspect.rs @@ -0,0 +1,14 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt}; + +#[test] +fn smoke() { + let mut counter = 0; + + { + let work = future::ready::(40).inspect(|val| { counter += *val; }); + assert_eq!(block_on(work), 40); + } + + assert_eq!(counter, 40); +} diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs new file mode 100644 index 0000000..a3d723a --- /dev/null +++ b/tests/io_buf_reader.rs @@ -0,0 +1,321 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AsyncSeek, AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, + AllowStdIo, BufReader, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::cmp; +use std::io; +use std::pin::Pin; + +/// A dummy reader intended at testing short-reads propagation. +struct ShortReader { + lengths: Vec, +} + +impl io::Read for ShortReader { + fn read(&mut self, _: &mut [u8]) -> io::Result { + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } + } +} + +macro_rules! run_fill_buf { + ($reader:expr) => {{ + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + break x; + } + } + }}; +} + +#[test] +fn test_buffered_reader() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = block_on(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +} + +#[test] +fn test_buffered_reader_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); + + assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + Pin::new(&mut reader).consume(1); + assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +} + +#[test] +fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64 + } + impl io::Read for PositionReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = buf.len(); + for x in buf { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + Ok(len) + } + } + impl io::Seek for PositionReader { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::max_value().wrapping_add(n as u64); + } + } + Ok(self.pos) + } + } + + let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); + assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5)); + assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); + assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); + // seeking to 0 should empty the buffer. + assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); + assert_eq!(reader.get_ref().get_ref().pos, expected); +} + +#[test] +fn test_short_reads() { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); + assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending + } + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) + -> Poll> + { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(run(reader.read(&mut buf)).unwrap(), 0); +} + +#[test] +fn maybe_pending_buf_read() { + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + run(reader.read_until(3, &mut v)).unwrap(); + assert_eq!(v, [0, 1, 2, 3]); + v.clear(); + run(reader.read_until(1, &mut v)).unwrap(); + assert_eq!(v, [1]); + v.clear(); + run(reader.read_until(8, &mut v)).unwrap(); + assert_eq!(v, [0]); + v.clear(); + run(reader.read_until(9, &mut v)).unwrap(); + assert_eq!(v, []); +} + +struct MaybePendingSeek<'a> { + inner: Cursor<&'a [u8]>, + ready: bool, +} + +impl<'a> MaybePendingSeek<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner: Cursor::new(inner), ready: true } + } +} + +impl AsyncRead for MaybePendingSeek<'_> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncBufRead for MaybePendingSeek<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll> + { + let this: *mut Self = &mut *self as *mut _; + Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } +} + +impl AsyncSeek for MaybePendingSeek<'_> { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready = true; + Poll::Pending + } + } +} + +// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 +#[test] +fn maybe_pending_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + + assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); + assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); + Pin::new(&mut reader).consume(1); + assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +} diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs new file mode 100644 index 0000000..7bdcd16 --- /dev/null +++ b/tests/io_buf_writer.rs @@ -0,0 +1,236 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io; +use std::pin::Pin; + +#[test] +fn buf_writer() { + let mut writer = BufWriter::with_capacity(2, Vec::new()); + + block_on(writer.write(&[0, 1])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[2])).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[3])).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + block_on(writer.write(&[4])).unwrap(); + block_on(writer.write(&[5])).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + block_on(writer.write(&[6])).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + block_on(writer.write(&[7, 8])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + block_on(writer.write(&[9, 10, 11])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + block_on(w.write(&[0, 1])).unwrap(); + assert_eq!(*w.get_ref(), []); + block_on(w.flush()).unwrap(); + let w = w.into_inner(); + assert_eq!(w, [0, 1]); +} + +#[test] +fn buf_writer_seek() { + // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); + block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + block_on(w.write_all(&[6, 7])).unwrap(); + assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + block_on(w.write_all(&[8, 9])).unwrap(); + block_on(w.flush()).unwrap(); + assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +struct MaybePending { + inner: Vec, + ready: bool, +} + +impl MaybePending { + fn new(inner: Vec) -> Self { + Self { inner, ready: false } + } +} + +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending_buf_writer() { + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); + + run(writer.write(&[0, 1])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.write(&[2])).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.write(&[3])).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run(writer.write(&[4])).unwrap(); + run(writer.write(&[5])).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run(writer.write(&[6])).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); + + run(writer.write(&[7, 8])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + run(writer.write(&[9, 10, 11])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn maybe_pending_buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); + run(w.write(&[0, 1])).unwrap(); + assert_eq!(&w.get_ref().inner, &[]); + run(w.flush()).unwrap(); + let w = w.into_inner().inner; + assert_eq!(w, [0, 1]); +} + + +struct MaybePendingSeek { + inner: Cursor>, + ready_write: bool, + ready_seek: bool, +} + +impl MaybePendingSeek { + fn new(inner: Vec) -> Self { + Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } + } +} + +impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +impl AsyncSeek for MaybePendingSeek { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + if self.ready_seek { + self.ready_seek = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready_seek = true; + Poll::Pending + } + } +} + +#[test] +fn maybe_pending_buf_writer_seek() { + // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8])); + run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + run(w.write_all(&[6, 7])).unwrap(); + assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + run(w.write_all(&[8, 9])).unwrap(); + run(w.flush()).unwrap(); + assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs new file mode 100644 index 0000000..4f80a75 --- /dev/null +++ b/tests/io_cursor.rs @@ -0,0 +1,29 @@ +use assert_matches::assert_matches; +use futures::future::lazy; +use futures::io::{AsyncWrite, Cursor}; +use futures::task::Poll; +use std::pin::Pin; + +#[test] +fn cursor_asyncwrite_vec() { + let mut cursor = Cursor::new(vec![0; 5]); + futures::executor::block_on(lazy(|cx| { + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(2))); + })); + assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]); +} + +#[test] +fn cursor_asyncwrite_box() { + let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); + futures::executor::block_on(lazy(|cx| { + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(0))); + })); + assert_eq!(&*cursor.into_inner(), [1, 2, 3, 4, 5]); +} diff --git a/tests/io_lines.rs b/tests/io_lines.rs new file mode 100644 index 0000000..39eafa9 --- /dev/null +++ b/tests/io_lines.rs @@ -0,0 +1,62 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; +} + +#[test] +fn lines() { + let buf = Cursor::new(&b"12\r"[..]); + let mut s = buf.lines(); + assert_eq!(block_on_next!(s), "12\r".to_string()); + assert!(block_on(s.next()).is_none()); + + let buf = Cursor::new(&b"12\r\n\n"[..]); + let mut s = buf.lines(); + assert_eq!(block_on_next!(s), "12".to_string()); + assert_eq!(block_on_next!(s), "".to_string()); + assert!(block_on(s.next()).is_none()); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; +} + +#[test] +fn maybe_pending() { + let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut s = buf.lines(); + assert_eq!(run_next!(s), "12\r".to_string()); + assert!(run(s.next()).is_none()); + + let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut s = buf.lines(); + assert_eq!(run_next!(s), "12".to_string()); + assert_eq!(run_next!(s), "".to_string()); + assert!(run(s.next()).is_none()); +} diff --git a/tests/io_read.rs b/tests/io_read.rs new file mode 100644 index 0000000..f99c4ed --- /dev/null +++ b/tests/io_read.rs @@ -0,0 +1,65 @@ +use futures::io::AsyncRead; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct MockReader { + fun: Box Poll>>, +} + +impl MockReader { + pub fn new(fun: impl FnMut(&mut [u8]) -> Poll> + 'static) -> Self { + MockReader { fun: Box::new(fun) } + } +} + +impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8] + ) -> Poll> { + (self.get_mut().fun)(buf) + } +} + +/// Verifies that the default implementation of `poll_read_vectored` +/// calls `poll_read` with an empty slice if no buffers are provided. +#[test] +fn read_vectored_no_buffers() { + let mut reader = MockReader::new(|buf| { + assert_eq!(buf, b""); + Err(io::ErrorKind::BrokenPipe.into()).into() + }); + let cx = &mut panic_context(); + let bufs = &mut []; + + let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe))) +} + +/// Verifies that the default implementation of `poll_read_vectored` +/// calls `poll_read` with the first non-empty buffer. +#[test] +fn read_vectored_first_non_empty() { + let mut reader = MockReader::new(|buf| { + assert_eq!(buf.len(), 4); + buf.copy_from_slice(b"four"); + Poll::Ready(Ok(4)) + }); + let cx = &mut panic_context(); + let mut buf = [0; 4]; + let bufs = &mut [ + io::IoSliceMut::new(&mut []), + io::IoSliceMut::new(&mut []), + io::IoSliceMut::new(&mut buf), + ]; + + let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Ok(4))); + assert_eq!(buf, b"four"[..]); +} + diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs new file mode 100644 index 0000000..4941773 --- /dev/null +++ b/tests/io_read_exact.rs @@ -0,0 +1,17 @@ +use futures::executor::block_on; +use futures::io::AsyncReadExt; + +#[test] +fn read_exact() { + let mut reader: &[u8] = &[1, 2, 3, 4, 5]; + let mut out = [0u8; 3]; + + let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out + assert!(res.is_ok()); + assert_eq!(out, [1,2,3]); + assert_eq!(reader.len(), 2); + + let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left + assert!(res.is_err()); + assert_eq!(reader.len(), 0); +} diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs new file mode 100644 index 0000000..d1dba5e --- /dev/null +++ b/tests/io_read_line.rs @@ -0,0 +1,60 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +#[test] +fn read_line() { + let mut buf = Cursor::new(b"12"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = Cursor::new(b"12\n\n"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending() { + let mut buf = b"12".interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs new file mode 100644 index 0000000..db825af --- /dev/null +++ b/tests/io_read_to_string.rs @@ -0,0 +1,45 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::io::{AsyncReadExt, Cursor}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +#[test] +fn read_to_string() { + let mut c = Cursor::new(&b""[..]); + let mut v = String::new(); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + + let mut c = Cursor::new(&b"1"[..]); + let mut v = String::new(); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1); + assert_eq!(v, "1"); + + let mut c = Cursor::new(&b"\xff"[..]); + let mut v = String::new(); + assert!(block_on(c.read_to_string(&mut v)).is_err()); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn interleave_pending() { + let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + + let mut v = String::new(); + assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5); + assert_eq!(v, "12333"); +} diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs new file mode 100644 index 0000000..5152281 --- /dev/null +++ b/tests/io_read_until.rs @@ -0,0 +1,60 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +#[test] +fn read_until() { + let mut buf = Cursor::new(b"12"); + let mut v = Vec::new(); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); + assert_eq!(v, b"12"); + + let mut buf = Cursor::new(b"1233"); + let mut v = Vec::new(); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3); + assert_eq!(v, b"123"); + v.truncate(0); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.truncate(0); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 0); + assert_eq!(v, []); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn maybe_pending() { + let mut buf = b"12".interleave_pending(); + let mut v = Vec::new(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); + assert_eq!(v, b"12"); + + let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut v = Vec::new(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3); + assert_eq!(v, b"123"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 0); + assert_eq!(v, []); +} diff --git a/tests/io_window.rs b/tests/io_window.rs new file mode 100644 index 0000000..98df69c --- /dev/null +++ b/tests/io_window.rs @@ -0,0 +1,26 @@ +use futures::io::Window; + +#[test] +fn set() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(..3); + buffer.set(3..3); + buffer.set(3..=2); // == 3..3 + buffer.set(0..2); + + assert_eq!(buffer.as_ref(), &[1, 2]); +} + +#[test] +#[should_panic] +fn set_panic_out_of_bounds() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(2..4); +} + +#[test] +#[should_panic] +fn set_panic_start_is_greater_than_end() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(3..2); +} diff --git a/tests/io_write.rs b/tests/io_write.rs new file mode 100644 index 0000000..b963444 --- /dev/null +++ b/tests/io_write.rs @@ -0,0 +1,70 @@ +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct MockWriter { + fun: Box Poll>>, +} + +impl MockWriter { + pub fn new(fun: impl FnMut(&[u8]) -> Poll> + 'static) -> Self { + MockWriter { fun: Box::new(fun) } + } +} + +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + (self.get_mut().fun)(buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() + } +} + +/// Verifies that the default implementation of `poll_write_vectored` +/// calls `poll_write` with an empty slice if no buffers are provided. +#[test] +fn write_vectored_no_buffers() { + let mut writer = MockWriter::new(|buf| { + assert_eq!(buf, b""); + Err(io::ErrorKind::BrokenPipe.into()).into() + }); + let cx = &mut panic_context(); + let bufs = &mut []; + + let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe))) +} + +/// Verifies that the default implementation of `poll_write_vectored` +/// calls `poll_write` with the first non-empty buffer. +#[test] +fn write_vectored_first_non_empty() { + let mut writer = MockWriter::new(|buf| { + assert_eq!(buf, b"four"); + Poll::Ready(Ok(4)) + }); + let cx = &mut panic_context(); + let bufs = &mut [ + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(b"four") + ]; + + let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Ok(4))); +} + diff --git a/tests/join_all.rs b/tests/join_all.rs new file mode 100644 index 0000000..63967bf --- /dev/null +++ b/tests/join_all.rs @@ -0,0 +1,43 @@ +use futures_util::future::*; +use std::future::Future; +use futures::executor::block_on; +use std::fmt::Debug; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); + assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + // REVIEW: should this be implemented? + // assert_done(|| Box::new(join_all(Vec::::new())), vec![]); + + // TODO: needs more tests +} + +#[test] +fn join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `JoinAll`. + fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box> + Unpin> { + let iter = bufs.into_iter().map(|b| ready::(b.len())); + Box::new(join_all(iter)) + } + + assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]); +} + +#[test] +fn join_all_from_iter() { + assert_done( + || Box::new(vec![ready(1), ready(2)].into_iter().collect::>()), + vec![1, 2], + ) +} diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs new file mode 100644 index 0000000..111f65a --- /dev/null +++ b/tests/macro_comma_support.rs @@ -0,0 +1,42 @@ +#[macro_use] +extern crate futures; + +use futures::{ + executor::block_on, + future::{self, FutureExt}, + task::Poll, +}; + +#[test] +fn ready() { + block_on(future::poll_fn(|_| { + ready!(Poll::Ready(()),); + Poll::Ready(()) + })) +} + +#[test] +fn poll() { + block_on(async { + let _ = poll!(async {}.boxed(),); + }) +} + +#[test] +fn join() { + block_on(async { + let future1 = async { 1 }; + let future2 = async { 2 }; + join!(future1, future2,); + }) +} + +#[test] +fn try_join() { + block_on(async { + let future1 = async { 1 }.never_error(); + let future2 = async { 2 }.never_error(); + try_join!(future1, future2,) + }) + .unwrap(); +} diff --git a/tests/mutex.rs b/tests/mutex.rs new file mode 100644 index 0000000..bad53a9 --- /dev/null +++ b/tests/mutex.rs @@ -0,0 +1,69 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::{ready, FutureExt}; +use futures::lock::Mutex; +use futures::stream::StreamExt; +use futures::task::{Context, SpawnExt}; +use futures_test::future::FutureTestExt; +use futures_test::task::{new_count_waker, panic_context}; +use std::sync::Arc; + +#[test] +fn mutex_acquire_uncontested() { + let mutex = Mutex::new(()); + for _ in 0..10 { + assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); + } +} + +#[test] +fn mutex_wakes_waiters() { + let mutex = Mutex::new(()); + let (waker, counter) = new_count_waker(); + let lock = mutex.lock().poll_unpin(&mut panic_context()); + assert!(lock.is_ready()); + + let mut cx = Context::from_waker(&waker); + let mut waiter = mutex.lock(); + assert!(waiter.poll_unpin(&mut cx).is_pending()); + assert_eq!(counter, 0); + + drop(lock); + + assert_eq!(counter, 1); + assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); +} + +#[test] +fn mutex_contested() { + let (tx, mut rx) = mpsc::unbounded(); + let pool = futures::executor::ThreadPool::builder() + .pool_size(16) + .create() + .unwrap(); + + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0)); + + let num_tasks = 1000; + for _ in 0..num_tasks { + let tx = tx.clone(); + let mutex = mutex.clone(); + pool.spawn(async move { + let mut lock = mutex.lock().await; + ready(()).pending_once().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + .unwrap(); + } + + block_on(async { + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }) +} diff --git a/tests/object_safety.rs b/tests/object_safety.rs new file mode 100644 index 0000000..30c892f --- /dev/null +++ b/tests/object_safety.rs @@ -0,0 +1,49 @@ +fn assert_is_object_safe() {} + +#[test] +fn future() { + // `FutureExt`, `TryFutureExt` and `UnsafeFutureObj` are not object safe. + use futures::future::{FusedFuture, Future, TryFuture}; + + assert_is_object_safe::<&dyn Future>(); + assert_is_object_safe::<&dyn FusedFuture>(); + assert_is_object_safe::<&dyn TryFuture>>(); +} + +#[test] +fn stream() { + // `StreamExt` and `TryStreamExt` are not object safe. + use futures::stream::{FusedStream, Stream, TryStream}; + + assert_is_object_safe::<&dyn Stream>(); + assert_is_object_safe::<&dyn FusedStream>(); + assert_is_object_safe::<&dyn TryStream>>(); +} + +#[test] +fn sink() { + // `SinkExt` is not object safe. + use futures::sink::Sink; + + assert_is_object_safe::<&dyn Sink<(), Error = ()>>(); +} + +#[test] +fn io() { + // `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe. + use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; + + assert_is_object_safe::<&dyn AsyncRead>(); + assert_is_object_safe::<&dyn AsyncWrite>(); + assert_is_object_safe::<&dyn AsyncSeek>(); + assert_is_object_safe::<&dyn AsyncBufRead>(); +} + +#[test] +fn task() { + // `ArcWake`, `SpawnExt` and `LocalSpawnExt` are not object safe. + use futures::task::{LocalSpawn, Spawn}; + + assert_is_object_safe::<&dyn Spawn>(); + assert_is_object_safe::<&dyn LocalSpawn>(); +} diff --git a/tests/oneshot.rs b/tests/oneshot.rs new file mode 100644 index 0000000..58951ec --- /dev/null +++ b/tests/oneshot.rs @@ -0,0 +1,66 @@ +use futures::channel::oneshot; +use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; +use std::thread; + +#[test] +fn oneshot_send1() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = mpsc::channel(); + + let t = thread::spawn(|| tx1.send(1).unwrap()); + rx1.map_ok(move |x| tx2.send(x)).run_in_background(); + assert_eq!(1, rx2.recv().unwrap()); + t.join().unwrap(); +} + +#[test] +fn oneshot_send2() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = mpsc::channel(); + + thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); + rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + assert_eq!(1, rx2.recv().unwrap()); +} + +#[test] +fn oneshot_send3() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = mpsc::channel(); + + rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); + assert_eq!(1, rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_tx1() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = mpsc::channel(); + + drop(tx1); + rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + + assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_tx2() { + let (tx1, rx1) = oneshot::channel::(); + let (tx2, rx2) = mpsc::channel(); + + let t = thread::spawn(|| drop(tx1)); + rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + t.join().unwrap(); + + assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_rx() { + let (tx, rx) = oneshot::channel::(); + drop(rx); + assert_eq!(Err(2), tx.send(2)); +} diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs new file mode 100644 index 0000000..15a0bef --- /dev/null +++ b/tests/ready_queue.rs @@ -0,0 +1,151 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Barrier}; +use std::thread; + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for FuturesUnordered<()> {} + +#[test] +fn basic_usage() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll_next_unpin(cx).is_ready()); + + tx2.send("hello").unwrap(); + + assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); + + tx1.send("world").unwrap(); + tx3.send("world2").unwrap(); + + assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} + +#[test] +fn resolving_errors() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll_next_unpin(cx).is_ready()); + + drop(tx2); + + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); + + drop(tx1); + tx3.send("world2").unwrap(); + + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} + +#[test] +fn dropping_ready_queue() { + block_on(future::lazy(move |_| { + let queue = FuturesUnordered::new(); + let (mut tx1, rx1) = oneshot::channel::<()>(); + let (mut tx2, rx2) = oneshot::channel::<()>(); + let (mut tx3, rx3) = oneshot::channel::<()>(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + { + let cx = &mut noop_context(); + assert!(!tx1.poll_canceled(cx).is_ready()); + assert!(!tx2.poll_canceled(cx).is_ready()); + assert!(!tx3.poll_canceled(cx).is_ready()); + + drop(queue); + + assert!(tx1.poll_canceled(cx).is_ready()); + assert!(tx2.poll_canceled(cx).is_ready()); + assert!(tx3.poll_canceled(cx).is_ready()); + } + })); +} + +#[test] +fn stress() { + const ITER: usize = 300; + + for i in 0..ITER { + let n = (i % 10) + 1; + + let mut queue = FuturesUnordered::new(); + + for _ in 0..5 { + let barrier = Arc::new(Barrier::new(n + 1)); + + for num in 0..n { + let barrier = barrier.clone(); + let (tx, rx) = oneshot::channel(); + + queue.push(rx); + + thread::spawn(move || { + barrier.wait(); + tx.send(num).unwrap(); + }); + } + + barrier.wait(); + + let mut sync = block_on_stream(queue); + + let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect(); + + assert_eq!(rx.len(), n); + + rx.sort(); + + for (i, x) in rx.into_iter().enumerate() { + assert_eq!(i, x); + } + + queue = sync.into_inner(); + } + } +} + +#[test] +fn panicking_future_dropped() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + queue.push(future::poll_fn(|_| -> Poll> { panic!() })); + + let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx))); + assert!(r.is_err()); + assert!(queue.is_empty()); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} diff --git a/tests/recurse.rs b/tests/recurse.rs new file mode 100644 index 0000000..2920a41 --- /dev/null +++ b/tests/recurse.rs @@ -0,0 +1,22 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt, BoxFuture}; +use std::sync::mpsc; +use std::thread; + +#[test] +fn lots() { + fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> { + let (n, x) = input; + if n == 0 { + future::ready(x).boxed() + } else { + future::ready((n - 1, x + n)).then(do_it).boxed() + } + } + + let (tx, rx) = mpsc::channel(); + thread::spawn(|| { + block_on(do_it((1_000, 0)).map(move |x| tx.send(x).unwrap())) + }); + assert_eq!(500_500, rx.recv().unwrap()); +} diff --git a/tests/select_all.rs b/tests/select_all.rs new file mode 100644 index 0000000..aad977d --- /dev/null +++ b/tests/select_all.rs @@ -0,0 +1,29 @@ +use futures::executor::block_on; +use futures::future::{ready, select_all}; +use std::collections::HashSet; + +#[test] +fn smoke() { + let v = vec![ + ready(1), + ready(2), + ready(3), + ]; + + let mut c = vec![1, 2, 3].into_iter().collect::>(); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + assert!(c.is_empty()); + assert!(v.is_empty()); +} diff --git a/tests/select_ok.rs b/tests/select_ok.rs new file mode 100644 index 0000000..db88a95 --- /dev/null +++ b/tests/select_ok.rs @@ -0,0 +1,39 @@ +use futures::executor::block_on; +use futures::future::{err, ok, select_ok}; + +#[test] +fn ignore_err() { + let v = vec![ + err(1), + err(2), + ok(3), + ok(4), + ]; + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 3); + + assert_eq!(v.len(), 1); + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 4); + + assert!(v.is_empty()); +} + +#[test] +fn last_err() { + let v = vec![ + ok(1), + err(2), + err(3), + ]; + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 1); + + assert_eq!(v.len(), 2); + + let i = block_on(select_ok(v)).err().unwrap(); + assert_eq!(i, 3); +} diff --git a/tests/shared.rs b/tests/shared.rs new file mode 100644 index 0000000..8402bfe --- /dev/null +++ b/tests/shared.rs @@ -0,0 +1,151 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, LocalPool}; +use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj}; +use futures::task::LocalSpawn; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::thread; + +fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { + let (tx, rx) = oneshot::channel::(); + let f = rx.shared(); + let join_handles = (0..threads_number) + .map(|_| { + let cloned_future = f.clone(); + thread::spawn(move || { + assert_eq!(block_on(cloned_future).unwrap(), 6); + }) + }) + .collect::>(); + + tx.send(6).unwrap(); + + assert_eq!(block_on(f).unwrap(), 6); + for join_handle in join_handles { + join_handle.join().unwrap(); + } +} + +#[test] +fn one_thread() { + send_shared_oneshot_and_wait_on_multiple_threads(1); +} + +#[test] +fn two_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(2); +} + +#[test] +fn many_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(1000); +} + +#[test] +fn drop_on_one_task_ok() { + let (tx, rx) = oneshot::channel::(); + let f1 = rx.shared(); + let f2 = f1.clone(); + + let (tx2, rx2) = oneshot::channel::(); + + let t1 = thread::spawn(|| { + let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ())); + drop(block_on(f)); + }); + + let (tx3, rx3) = oneshot::channel::(); + + let t2 = thread::spawn(|| { + let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ())); + }); + + tx2.send(11).unwrap(); // cancel `f1` + t1.join().unwrap(); + + tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved. + let result = block_on(rx3).unwrap(); + assert_eq!(result, 42); + t2.join().unwrap(); +} + +#[test] +fn drop_in_poll() { + let slot1 = Rc::new(RefCell::new(None)); + let slot2 = slot1.clone(); + + let future1 = future::lazy(move |_| { + slot2.replace(None); // Drop future + 1 + }).shared(); + + let future2 = LocalFutureObj::new(Box::new(future1.clone())); + slot1.replace(Some(future2)); + + assert_eq!(block_on(future1), 1); +} + +#[test] +fn peek() { + let mut local_pool = LocalPool::new(); + let spawn = &mut local_pool.spawner(); + + let (tx0, rx0) = oneshot::channel::(); + let f1 = rx0.shared(); + let f2 = f1.clone(); + + // Repeated calls on the original or clone do not change the outcome. + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Completing the underlying future has no effect, because the value has not been `poll`ed in. + tx0.send(42).unwrap(); + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Once the Shared has been polled, the value is peekable on the clone. + spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); + local_pool.run(); + for _ in 0..2 { + assert_eq!(*f2.peek().unwrap(), Ok(42)); + } +} + +struct CountClone(Rc>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + CountClone(self.0.clone()) + } +} + +#[test] +fn dont_clone_in_single_owner_shared_future() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (tx, rx) = oneshot::channel(); + + let rx = rx.shared(); + + tx.send(counter).ok().unwrap(); + + assert_eq!(block_on(rx).unwrap().0.get(), 0); +} + +#[test] +fn dont_do_unnecessary_clones_on_output() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (tx, rx) = oneshot::channel(); + + let rx = rx.shared(); + + tx.send(counter).ok().unwrap(); + + assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1); + assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2); + assert_eq!(block_on(rx).unwrap().0.get(), 2); +} diff --git a/tests/sink.rs b/tests/sink.rs new file mode 100644 index 0000000..f967e1b --- /dev/null +++ b/tests/sink.rs @@ -0,0 +1,516 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +fn sassert_next(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } +} + +fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), + } +} + +#[test] +fn either_sink() { + let mut s = if true { + Vec::::new().left_sink() + } else { + VecDeque::::new().right_sink() + }; + + Pin::new(&mut s).start_send(0).unwrap(); +} + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + Pin::new(&mut v).start_send(0).unwrap(); + Pin::new(&mut v).start_send(1).unwrap(); + assert_eq!(v, vec![0, 1]); + block_on(v.flush()).unwrap(); + assert_eq!(v, vec![0, 1]); +} + +#[test] +fn vecdeque_sink() { + let mut deque = VecDeque::new(); + Pin::new(&mut deque).start_send(2).unwrap(); + Pin::new(&mut deque).start_send(3).unwrap(); + + assert_eq!(deque.pop_front(), Some(2)); + assert_eq!(deque.pop_front(), Some(3)); + assert_eq!(deque.pop_front(), None); +} + +#[test] +fn send() { + let mut v = Vec::new(); + + block_on(v.send(0)).unwrap(); + assert_eq!(v, vec![0]); + + block_on(v.send(1)).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send(2)).unwrap(); + assert_eq!(v, vec![0, 1, 2]); +} + +#[test] +fn send_all() { + let mut v = Vec::new(); + + block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); +} + +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); + +impl Flag { + fn new() -> Arc { + Arc::new(Self(AtomicBool::new(false))) + } + + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc) { + arc_self.set(true) + } +} + +fn flag_cx(f: F) -> R +where + F: FnOnce(Arc, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} + +// Sends a value on an i32 channel sink +struct StartSendFut + Unpin, Item: Unpin>(Option, Option); + +impl + Unpin, Item: Unpin> StartSendFut { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } +} + +impl + Unpin, Item: Unpin> Future for StartSendFut { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } +} + +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +#[test] +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::(0); + + block_on(future::lazy(|_| { + tx.start_send(0).unwrap(); + + flag_cx(|flag, cx| { + let mut task = StartSendFut::new(tx, 1); + + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + sassert_next(&mut rx, 0); + assert!(flag.take()); + unwrap(task.poll_unpin(cx)); + assert!(!flag.take()); + sassert_next(&mut rx, 1); + }) + })); +} + +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +#[test] +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = rx.boxed(); + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, future::ok(()).boxed()) + .map_ok(move |()| elem + 1) + .map_err(|_| -> Never { panic!() }) + }); + + assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(())); + + flag_cx(|flag, cx| { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + tx.send(()).unwrap(); + assert!(flag.take()); + + unwrap(task.poll_unpin(cx)); + + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2]); + }) +} + +// test simple use of with to change data +#[test] +fn with_as_map() { + let mut sink = Vec::new().with(|item| future::ok::(item * 2)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +// test simple use of with_flat_map +#[test] +fn with_flat_map() { + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + block_on(sink.send(3)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]); +} + +// Check that `with` propagates `poll_ready` to the inner sink. +// Regression test for the issue #1834. +#[test] +fn with_propagates_poll_ready() { + let (tx, mut rx) = mpsc::channel::(0); + let mut tx = tx.with(|item: i32| future::ok::(item + 10)); + + block_on(future::lazy(|_| { + flag_cx(|flag, cx| { + let mut tx = Pin::new(&mut tx); + + // Should be ready for the first item. + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(tx.as_mut().start_send(0), Ok(())); + + // Should be ready for the second item only after the first one is received. + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending); + assert!(!flag.take()); + sassert_next(&mut rx, 10); + assert!(flag.take()); + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(tx.as_mut().start_send(1), Ok(())); + }) + })); +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush { + data: Vec, + waiting_tasks: Vec, +} + +impl Sink> for ManualFlush { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +impl ManualFlush { + fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), + } + } + + fn force_flush(&mut self) -> Vec { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::replace(&mut self.data, Vec::new()) + } +} + +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +#[test] +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(future::ok::, ()>); + flag_cx(|flag, cx| { + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(0)).unwrap(); + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(1)).unwrap(); + + { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + } + assert_eq!(sink.get_mut().force_flush(), vec![0, 1]); + assert!(flag.take()); + unwrap(sink.flush().poll_unpin(cx)); + }) +} + +// test that a buffer is a no-nop around a sink that always accepts sends +#[test] +fn buffer_noop() { + let mut sink = Vec::new().buffer(0); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let mut sink = Vec::new().buffer(1); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +struct ManualAllow { + data: Vec, + allow: Rc, +} + +struct Allow { + flag: Cell, + tasks: RefCell>, +} + +impl Allow { + fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } +} + +impl Sink for ManualAllow { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow() -> (ManualAllow, Rc) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +#[test] +fn buffer() { + let (sink, allow) = manual_allow::(); + let sink = sink.buffer(2); + + let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + allow.start(); + assert!(flag.take()); + unwrap(task.poll_unpin(cx)); + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + }) +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let mut sink = sink1.fanout(sink2); + block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1, 2, 3]); + assert_eq!(sink2, vec![1, 2, 3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, mut left_recv) = mpsc::channel(0); + let (right_send, mut right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(!flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(0)); + assert!(flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(0)); + assert!(flag.take()); + + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(2)); + assert!(flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(2)); + assert!(flag.take()); + + unwrap(task.poll_unpin(cx)); + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); + }) +} + +#[test] +fn sink_map_err() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), + Err(()) + ); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct ErrIntoTest; + +impl From for ErrIntoTest { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +#[test] +fn err_into() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkErrInto, _, ErrIntoTest> = tx.sink_err_into(); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_err_into()).start_send(()), + Err(ErrIntoTest) + ); +} diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs new file mode 100644 index 0000000..e57b2d8 --- /dev/null +++ b/tests/sink_fanout.rs @@ -0,0 +1,24 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::join3; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; + +#[test] +fn it_works() { + let (tx1, rx1) = mpsc::channel(1); + let (tx2, rx2) = mpsc::channel(2); + let tx = tx1.fanout(tx2).sink_map_err(|_| ()); + + let src = stream::iter((0..10).map(Ok)); + let fwd = src.forward(tx); + + let collect_fut1 = rx1.collect::>(); + let collect_fut2 = rx2.collect::>(); + let (_, vec1, vec2) = block_on(join3(fwd, collect_fut1, collect_fut2)); + + let expected = (0..10).collect::>(); + + assert_eq!(vec1, expected); + assert_eq!(vec2, expected); +} diff --git a/tests/split.rs b/tests/split.rs new file mode 100644 index 0000000..9f4f1a0 --- /dev/null +++ b/tests/split.rs @@ -0,0 +1,77 @@ +use futures::executor::block_on; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_utils::unsafe_pinned; +use std::pin::Pin; + +struct Join { + stream: T, + sink: U +} + +impl Join { + unsafe_pinned!(stream: T); + unsafe_pinned!(sink: U); +} + +impl Stream for Join { + type Item = T::Item; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.stream().poll_next(cx) + } +} + +impl, Item> Sink for Join { + type Error = U::Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_ready(cx) + } + + fn start_send( + self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + self.sink().start_send(item) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_flush(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_close(cx) + } +} + +#[test] +fn test_split() { + let mut dest: Vec = Vec::new(); + { + let join = Join { + stream: stream::iter(vec![10, 20, 30]), + sink: &mut dest + }; + + let (sink, stream) = join.split(); + let join = sink.reunite(stream).expect("test_split: reunite error"); + let (mut sink, stream) = join.split(); + let mut stream = stream.map(Ok); + block_on(sink.send_all(&mut stream)).unwrap(); + } + assert_eq!(dest, vec![10, 20, 30]); +} diff --git a/tests/stream.rs b/tests/stream.rs new file mode 100644 index 0000000..fd6a8b6 --- /dev/null +++ b/tests/stream.rs @@ -0,0 +1,32 @@ +use futures::executor::block_on; +use futures::stream::{self, StreamExt}; + +#[test] +fn select() { + fn select_and_compare(a: Vec, b: Vec, expected: Vec) { + let a = stream::iter(a); + let b = stream::iter(b); + let vec = block_on(stream::select(a, b).collect::>()); + assert_eq!(vec, expected); + } + + select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]); + select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]); + select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]); +} + +#[test] +fn scan() { + futures::executor::block_on(async { + assert_eq!( + stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) + .scan(1, |acc, e| { + *acc += 1; + futures::future::ready(if e < *acc { Some(e) } else { None }) + }) + .collect::>() + .await, + vec![1u8, 2, 3, 4] + ); + }); +} diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs new file mode 100644 index 0000000..8b23a0a --- /dev/null +++ b/tests/stream_catch_unwind.rs @@ -0,0 +1,27 @@ +use futures::executor::block_on_stream; +use futures::stream::{self, StreamExt}; + +#[test] +fn panic_in_the_middle_of_the_stream() { + let stream = stream::iter(vec![Some(10), None, Some(11)]); + + // panic on second element + let stream_panicking = stream.map(|o| o.unwrap()); + let mut iter = block_on_stream(stream_panicking.catch_unwind()); + + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert!(iter.next().unwrap().is_err()); + assert!(iter.next().is_none()); +} + +#[test] +fn no_panic() { + let stream = stream::iter(vec![10, 11, 12]); + + let mut iter = block_on_stream(stream.catch_unwind()); + + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert_eq!(11, iter.next().unwrap().ok().unwrap()); + assert_eq!(12, iter.next().unwrap().ok().unwrap()); + assert!(iter.next().is_none()); +} diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs new file mode 100644 index 0000000..c528af0 --- /dev/null +++ b/tests/stream_into_async_read.rs @@ -0,0 +1,96 @@ +use core::pin::Pin; +use futures::io::{AsyncRead, AsyncBufRead}; +use futures::stream::{self, TryStreamExt}; +use futures::task::Poll; +use futures_test::{task::noop_context, stream::StreamTestExt}; + +macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +#[test] +fn test_into_async_read() { + let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); + let mut reader = stream.interleave_pending().into_async_read(); + let mut buf = vec![0; 3]; + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 0); +} + +#[test] +fn test_into_async_bufread() -> std::io::Result<()> { + let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); + let mut reader = stream.interleave_pending().into_async_read(); + + let mut reader = Pin::new(&mut reader); + + assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]); + reader.as_mut().consume(3); + + assert_fill_buf!(reader, &[4, 5][..]); + reader.as_mut().consume(2); + + assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]); + reader.as_mut().consume(2); + + assert_fill_buf!(reader, &[3, 4, 5][..]); + reader.as_mut().consume(3); + + assert_fill_buf!(reader, &[][..]); + + Ok(()) +} diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs new file mode 100644 index 0000000..b65a057 --- /dev/null +++ b/tests/stream_peekable.rs @@ -0,0 +1,13 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + +#[test] +fn peekable() { + block_on(async { + let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(peekable); + assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); + assert_eq!(peekable.collect::>().await, vec![1, 2, 3]); + }); +} diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs new file mode 100644 index 0000000..eb711dd --- /dev/null +++ b/tests/stream_select_all.rs @@ -0,0 +1,78 @@ +use futures::channel::mpsc; +use futures::executor::block_on_stream; +use futures::future::{self, FutureExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; + +#[test] +fn is_terminated() { + let mut cx = noop_context(); + let mut tasks = SelectAll::new(); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); + + // Test that the sentinel value doesn't leak + assert_eq!(tasks.is_empty(), true); + assert_eq!(tasks.len(), 0); + + tasks.push(future::ready(1).into_stream()); + + assert_eq!(tasks.is_empty(), false); + assert_eq!(tasks.len(), 1); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); +} + +#[test] +fn issue_1626() { + let a = stream::iter(0..=2); + let b = stream::iter(10..=14); + + let mut s = block_on_stream(stream::select_all(vec![a, b])); + + assert_eq!(s.next(), Some(0)); + assert_eq!(s.next(), Some(10)); + assert_eq!(s.next(), Some(1)); + assert_eq!(s.next(), Some(11)); + assert_eq!(s.next(), Some(2)); + assert_eq!(s.next(), Some(12)); + assert_eq!(s.next(), Some(13)); + assert_eq!(s.next(), Some(14)); + assert_eq!(s.next(), None); +} + +#[test] +fn works_1() { + let (a_tx, a_rx) = mpsc::unbounded::(); + let (b_tx, b_rx) = mpsc::unbounded::(); + let (c_tx, c_rx) = mpsc::unbounded::(); + + let streams = vec![a_rx, b_rx, c_rx]; + + let mut stream = block_on_stream(select_all(streams)); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + c_tx.unbounded_send(42).unwrap(); + assert_eq!(Some(42), stream.next()); + a_tx.unbounded_send(43).unwrap(); + assert_eq!(Some(43), stream.next()); + + drop((a_tx, b_tx, c_tx)); + assert_eq!(None, stream.next()); +} diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs new file mode 100644 index 0000000..09d7e89 --- /dev/null +++ b/tests/stream_select_next_some.rs @@ -0,0 +1,85 @@ +use futures::{future, select}; +use futures::future::{FusedFuture, FutureExt}; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::new_count_waker; + +#[test] +fn is_terminated() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let mut tasks = FuturesUnordered::new(); + + let mut select_next_some = tasks.select_next_some(); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 1); + assert_eq!(select_next_some.is_terminated(), true); + drop(select_next_some); + + tasks.push(future::ready(1)); + + let mut select_next_some = tasks.select_next_some(); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Ready(1)); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(select_next_some.is_terminated(), true); +} + +#[test] +fn select() { + // Checks that even though `async_tasks` will yield a `None` and return + // `is_terminated() == true` during the first poll, it manages to toggle + // back to having items after a future is pushed into it during the second + // poll (after pending_once completes). + futures::executor::block_on(async { + let mut fut = future::ready(1).pending_once(); + let mut async_tasks = FuturesUnordered::new(); + let mut total = 0; + loop { + select! { + num = fut => { + total += num; + async_tasks.push(async { 5 }); + }, + num = async_tasks.select_next_some() => { + total += num; + } + complete => break, + } + } + assert_eq!(total, 6); + }); +} + +// Check that `select!` macro does not fail when importing from `futures_util`. +#[test] +fn futures_util_select() { + use futures_util::select; + + // Checks that even though `async_tasks` will yield a `None` and return + // `is_terminated() == true` during the first poll, it manages to toggle + // back to having items after a future is pushed into it during the second + // poll (after pending_once completes). + futures::executor::block_on(async { + let mut fut = future::ready(1).pending_once(); + let mut async_tasks = FuturesUnordered::new(); + let mut total = 0; + loop { + select! { + num = fut => { + total += num; + async_tasks.push(async { 5 }); + }, + num = async_tasks.select_next_some() => { + total += num; + } + complete => break, + } + } + assert_eq!(total, 6); + }); +} diff --git a/tests/try_join.rs b/tests/try_join.rs new file mode 100644 index 0000000..6c6d084 --- /dev/null +++ b/tests/try_join.rs @@ -0,0 +1,36 @@ +#![deny(unreachable_code)] + +use futures::{try_join, executor::block_on}; + +// TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to +// test behaviour of the `try_join!` macro with the never type before it is +// stabilized. Once `!` is again stabilized this can be removed and replaced +// with direct use of `!` below where `Never` is used. +trait MyTrait { + type Output; +} +impl MyTrait for fn() -> T { + type Output = T; +} +type Never = ! as MyTrait>::Output; + + +#[test] +fn try_join_never_error() { + block_on(async { + let future1 = async { Ok::<(), Never>(()) }; + let future2 = async { Ok::<(), Never>(()) }; + try_join!(future1, future2) + }) + .unwrap(); +} + +#[test] +fn try_join_never_ok() { + block_on(async { + let future1 = async { Err::(()) }; + let future2 = async { Err::(()) }; + try_join!(future1, future2) + }) + .unwrap_err(); +} diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs new file mode 100644 index 0000000..662b866 --- /dev/null +++ b/tests/try_join_all.rs @@ -0,0 +1,44 @@ +use futures_util::future::*; +use std::future::Future; +use futures::executor::block_on; +use std::fmt::Debug; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); + assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); + assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); + // REVIEW: should this be implemented? + // assert_done(|| Box::new(try_join_all(Vec::::new())), Ok(vec![])); + + // TODO: needs more tests +} + +#[test] +fn try_join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `TryJoinAll`. + fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box, ()>> + Unpin> { + let iter = bufs.into_iter().map(|b| ok::(b.len())); + Box::new(try_join_all(iter)) + } + + assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1])); +} + +#[test] +fn try_join_all_from_iter() { + assert_done( + || Box::new(vec![ok(1), ok(2)].into_iter().collect::>()), + Ok::<_, usize>(vec![1, 2]), + ) +} diff --git a/tests/unfold.rs b/tests/unfold.rs new file mode 100644 index 0000000..95722cf --- /dev/null +++ b/tests/unfold.rs @@ -0,0 +1,35 @@ +use futures::future; +use futures::stream; + +use futures_test::future::FutureTestExt; +use futures_test::{ + assert_stream_done, assert_stream_next, assert_stream_pending, +}; + +#[test] +fn unfold1() { + let mut stream = stream::unfold(0, |state| { + if state <= 2 { + future::ready(Some((state * 2, state + 1))).pending_once() + } else { + future::ready(None).pending_once() + } + }); + + // Creates the future with the closure + // Not ready (delayed future) + assert_stream_pending!(stream); + // Future is ready, yields the item + assert_stream_next!(stream, 0); + + // Repeat + assert_stream_pending!(stream); + assert_stream_next!(stream, 2); + + assert_stream_pending!(stream); + assert_stream_next!(stream, 4); + + // No more items + assert_stream_pending!(stream); + assert_stream_done!(stream); +} diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs new file mode 100644 index 0000000..6c7e11c --- /dev/null +++ b/tests_disabled/all.rs @@ -0,0 +1,351 @@ +use futures::future; +use futures::executor::block_on; +use futures::channel::oneshot::{self, Canceled}; +use std::sync::mpsc::{channel, TryRecvError}; + +mod support; +use support::*; + +fn unselect(r: Result, Either<(E, B), (E, A)>>) -> Result { + match r { + Ok(Either::Left((t, _))) | + Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | + Err(Either::Right((e, _))) => Err(e), + } +} + +#[test] +fn result_smoke() { + fn is_future_v(_: C) + where A: Send + 'static, + B: Send + 'static, + C: Future + {} + + is_future_v::(f_ok(1).map(|a| a + 1)); + is_future_v::(f_ok(1).map_err(|a| a + 1)); + is_future_v::(f_ok(1).and_then(Ok)); + is_future_v::(f_ok(1).or_else(Err)); + is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3))); + is_future_v::(f_ok(1).map(f_ok).flatten()); + + assert_done(|| f_ok(1), r_ok(1)); + assert_done(|| f_err(1), r_err(1)); + assert_done(|| result(Ok(1)), r_ok(1)); + assert_done(|| result(Err(1)), r_err(1)); + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); + assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3)); + assert_done(|| f_err(1).map(|a| a + 2), r_err(1)); + assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1)); + assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3)); + assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3)); + assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1)); + assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4)); + assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1)); + assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3)); + assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5)); + assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1)); + assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1)); + assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1)); + assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1)); + assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1)); + assert_done(|| f_ok(1).join(f_err(1)), Err(1)); + assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2))); + assert_done(|| f_err(1).join(f_ok(1)), Err(1)); + assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2)); + assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_err(1).then(|_| Err(2)), r_err(2)); +} + +#[test] +fn test_empty() { + fn empty() -> Empty { future::empty() } + + assert_empty(|| empty()); + assert_empty(|| empty().select(empty())); + assert_empty(|| empty().join(empty())); + assert_empty(|| empty().join(f_ok(1))); + assert_empty(|| f_ok(1).join(empty())); + assert_empty(|| empty().or_else(move |_| empty())); + assert_empty(|| empty().and_then(move |_| empty())); + assert_empty(|| f_err(1).or_else(move |_| empty())); + assert_empty(|| f_ok(1).and_then(move |_| empty())); + assert_empty(|| empty().map(|a| a + 1)); + assert_empty(|| empty().map_err(|a| a + 1)); + assert_empty(|| empty().then(|a| a)); +} + +#[test] +fn test_ok() { + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); +} + +#[test] +fn flatten() { + fn ok(a: T) -> FutureResult { + future::ok(a) + } + fn err(b: E) -> FutureResult { + future::err(b) + } + + assert_done(|| ok(ok(1)).flatten(), r_ok(1)); + assert_done(|| ok(err(1)).flatten(), r_err(1)); + assert_done(|| err(1u32).map(ok).flatten(), r_err(1)); + assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1)); + assert_empty(|| ok(empty::()).flatten()); + assert_empty(|| empty::().map(ok).flatten()); +} + +#[test] +fn smoke_oneshot() { + assert_done(|| { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, Ok(1)); + assert_done(|| { + let (c, p) = oneshot::channel::(); + drop(c); + p + }, Err(Canceled)); + let mut completes = Vec::new(); + assert_empty(|| { + let (a, b) = oneshot::channel::(); + completes.push(a); + b + }); + + let (c, mut p) = oneshot::channel::(); + drop(c); + let res = panic_waker_lw(|lw| p.poll(lw)); + assert!(res.is_err()); + let (c, p) = oneshot::channel::(); + drop(c); + let (tx, rx) = channel(); + p.then(move |_| { + tx.send(()) + }).forget(); + rx.recv().unwrap(); +} + +#[test] +fn select_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let mut f = b.select(d).then(unselect); + // assert!(f.poll(&mut Task::new()).is_pending()); + assert!(brx.try_recv().is_err()); + assert!(drx.try_recv().is_err()); + a.send(1).unwrap(); + noop_waker_lw(|lw| { + let res = f.poll(lw); + assert!(res.ok().unwrap().is_ready()); + assert_eq!(brx.recv().unwrap(), 1); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let mut f = b.select(d).then(unselect); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + a.send(1).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + drop((c, f)); + assert!(drx.recv().is_err()); + }) +} + +#[test] +fn join_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let mut f = b.join(d); + drop(a); + let res = panic_waker_lw(|lw| f.poll(lw)); + assert!(res.is_err()); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let (tx, rx) = channel(); + let f = b.join(d); + f.then(move |_| { + tx.send(()).unwrap(); + let res: Result<(), ()> = Ok(()); + res + }).forget(); + assert!(rx.try_recv().is_err()); + drop(a); + rx.recv().unwrap(); + drop(c); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_incomplete() { + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + noop_waker_lw(|lw| { + let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + a.send(2).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + a.send(1).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll(lw).is_err()); + assert_eq!(rx.recv().unwrap(), 2); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll(lw).is_err()); + assert_eq!(rx.recv().unwrap(), 1); + }) +} + + +#[test] +fn select2() { + assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); + assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2)); + assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); + assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); + + assert_done(|| { + f_ok(1).select(f_ok(2)) + .map_err(|_| 0) + .and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, Ok(3)); + + // Finish one half of a select and then fail the second, ensuring that we + // get the notification of the second one. + { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map(move |r| tx.send(r).unwrap()).forget(); + a.send(1).unwrap(); + let (val, next) = rx.recv().unwrap().into_inner(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map_err(move |_r| tx.send(2).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + drop(c); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Fail the second half and ensure that we see the first one finish + { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget(); + drop(c); + let (val, next) = rx.recv().unwrap(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map(move |r| tx.send(r).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + a.send(2).unwrap(); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Cancelling the first half should cancel the second + { + let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let f = b.select(d); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel after a schedule + { + let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let mut f = b.select(d); + let _res = noop_waker_lw(|lw| f.poll(lw)); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel propagates + { + let ((a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let (tx, rx) = channel(); + b.select(d).map(move |_| tx.send(()).unwrap()).forget(); + drop(a); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + assert!(rx.recv().is_err()); + } + + // Cancel on early drop + { + let (tx, rx) = channel(); + let f = f_ok(1).select(empty::<_, ()>().map(move |()| { + tx.send(()).unwrap(); + 1 + })); + drop(f); + assert!(rx.recv().is_err()); + } +} + +#[test] +fn option() { + assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future())); + assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::>.into_future())); +} diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs new file mode 100644 index 0000000..c1bc33f --- /dev/null +++ b/tests_disabled/bilock.rs @@ -0,0 +1,105 @@ +use futures::task; +use futures::stream; +use futures::future; +use futures_util::lock::BiLock; +use std::thread; + +mod support; +use support::*; + +#[test] +fn smoke() { + let future = future::lazy(|_| { + let (a, b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock().is_pending()); + assert!(a.poll_lock().is_pending()); + } + + assert!(b.poll_lock().is_ready()); + assert!(a.poll_lock().is_ready()); + + { + let lock = match b.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + assert!(task::spawn(future) + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); +} + +#[test] +fn concurrent() { + const N: usize = 10000; + let (a, b) = BiLock::new(0); + + let a = Increment { + a: Some(a), + remaining: N, + }; + let b = stream::iter_ok(0..N).fold(b, |b, _n| { + b.lock().map(|mut b| { + *b += 1; + b.unlock() + }) + }); + + let t1 = thread::spawn(move || a.wait()); + let b = b.wait().expect("b error"); + let a = t1.join().unwrap().expect("a error"); + + match a.poll_lock() { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + match b.poll_lock() { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option>, + } + + impl Future for Increment { + type Item = BiLock; + type Error = (); + + fn poll(&mut self) -> Poll, ()> { + loop { + if self.remaining == 0 { + return Ok(self.a.take().unwrap().into()) + } + + let a = self.a.as_ref().unwrap(); + let mut a = match a.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => return Ok(Poll::Pending), + }; + self.remaining -= 1; + *a += 1; + } + } + } +} diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs new file mode 100644 index 0000000..4eaf12e --- /dev/null +++ b/tests_disabled/stream.rs @@ -0,0 +1,393 @@ +use futures::executor::{block_on, block_on_stream}; +use futures::future::{err, ok}; +use futures::stream::{empty, iter_ok, poll_fn, Peekable}; +use futures::channel::oneshot; +use futures::channel::mpsc; + +mod support; +use support::*; + +pub struct Iter { + iter: I, +} + +pub fn iter(i: J) -> Iter + where J: IntoIterator>, +{ + Iter { + iter: i.into_iter(), + } +} + +impl Stream for Iter + where I: Iterator>, +{ + type Item = T; + type Error = E; + + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll, E> { + match self.iter.next() { + Some(Ok(e)) => Ok(Poll::Ready(Some(e))), + Some(Err(e)) => Err(e), + None => Ok(Poll::Ready(None)), + } + } +} + +fn list() -> Box + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Ok(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +fn err_list() -> Box + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Err(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +#[test] +fn map() { + assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4])); +} + +#[test] +fn map_err() { + assert_done(|| err_list().map_err(|a| a + 1).collect::>(), Err(4)); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest(u32); + +impl From for FromErrTest { + fn from(i: u32) -> FromErrTest { + FromErrTest(i) + } +} + +#[test] +fn from_err() { + assert_done(|| err_list().err_into().collect::>(), Err(FromErrTest(3))); +} + +#[test] +fn fold() { + assert_done(|| list().fold(0, |a, b| ok::(a + b)), Ok(6)); + assert_done(|| err_list().fold(0, |a, b| ok::(a + b)), Err(3)); +} + +#[test] +fn filter() { + assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2])); +} + +#[test] +fn filter_map() { + assert_done(|| list().filter_map(|x| { + ok(if x % 2 == 0 { + Some(x + 10) + } else { + None + }) + }).collect(), Ok(vec![12])); +} + +#[test] +fn and_then() { + assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); + assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), + Err(1)); +} + +#[test] +fn then() { + assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); + +} + +#[test] +fn or_else() { + assert_done(|| err_list().or_else(|a| { + ok::(a as i32) + }).collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn flatten() { + assert_done(|| list().map(|_| list()).flatten().collect(), + Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); + +} + +#[test] +fn skip() { + assert_done(|| list().skip(2).collect(), Ok(vec![3])); +} + +#[test] +fn skip_passes_errors_through() { + let mut s = block_on_stream( + iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) + ); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(4))); + assert_eq!(s.next(), Some(Ok(5))); + assert_eq!(s.next(), None); +} + +#[test] +fn skip_while() { + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), + Ok(vec![2, 3])); +} +#[test] +fn take() { + assert_done(|| list().take(2).collect(), Ok(vec![1, 2])); +} + +#[test] +fn take_while() { + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), + Ok(vec![1, 2])); +} + +#[test] +fn take_passes_errors_through() { + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1)); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(3))); + assert_eq!(s.next(), None); + + let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1)); + assert_eq!(s.next(), Some(Ok(1))); + assert_eq!(s.next(), None); +} + +#[test] +fn peekable() { + assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn fuse() { + let mut stream = block_on_stream(list().fuse()); + assert_eq!(stream.next(), Some(Ok(1))); + assert_eq!(stream.next(), Some(Ok(2))); + assert_eq!(stream.next(), Some(Ok(3))); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); +} + +#[test] +fn buffered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(2); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn unordered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + let mut rx = rx.buffer_unordered(2); + sassert_empty(&mut rx); + let mut rx = block_on_stream(rx); + c.send(3).unwrap(); + assert_eq!(rx.next(), Some(Ok(3))); + a.send(5).unwrap(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + // We don't even get to see `c` until `a` completes. + let mut rx = rx.buffer_unordered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn zip() { + assert_done(|| list().zip(list()).collect(), + Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| err_list().zip(list()).collect::>(), Err(3)); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), + Ok(vec![(1, 2), (2, 3), (3, 4)])); +} + +#[test] +fn peek() { + struct Peek { + inner: Peekable + Send>> + } + + impl Future for Peek { + type Item = (); + type Error = u32; + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> { + { + let res = ready!(self.inner.peek(cx))?; + assert_eq!(res, Some(&1)); + } + assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into()); + assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into()); + Ok(Poll::Ready(())) + } + } + + block_on(Peek { + inner: list().peekable(), + }).unwrap() +} + +#[test] +fn wait() { + assert_eq!(block_on_stream(list()).collect::, _>>(), + Ok(vec![1, 2, 3])); +} + +#[test] +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); + let mut list = block_on_stream(err_list().chunks(3)); + let i = list.next().unwrap().unwrap(); + assert_eq!(i, vec![1, 2]); + let i = list.next().unwrap().unwrap_err(); + assert_eq!(i, 3); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); +} + +#[test] +fn forward() { + let v = Vec::new(); + let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1]); + + let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5])); +} + +#[test] +#[allow(deprecated)] +fn concat() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); +} + +#[test] +fn concat2() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); + + let c = empty::, ()>(); + assert_done(move || c.concat(), Ok(vec![])) +} + +#[test] +fn stream_poll_fn() { + let mut counter = 5usize; + + let read_stream = poll_fn(move |_| -> Poll, std::io::Error> { + if counter == 0 { + return Ok(Poll::Ready(None)); + } + counter -= 1; + Ok(Poll::Ready(Some(counter))) + }); + + assert_eq!(block_on_stream(read_stream).count(), 5); +} + +#[test] +fn inspect() { + let mut seen = vec![]; + assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3])); + assert_eq!(seen, [1, 2, 3]); +} + +#[test] +fn inspect_err() { + let mut seen = vec![]; + assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::>(), Err(3)); + assert_eq!(seen, [3]); +} -- cgit v1.2.3