aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid LeGare <legare@google.com>2022-03-04 22:45:58 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2022-03-04 22:45:58 +0000
commit3049d36a1d1b1613a52f86adf1ea8408e5e3c837 (patch)
treee2532a639f92776d56f172cf560f5ba2f62642bc
parent58805b026bbdb1da983405653d547ae61856d91d (diff)
parent652b2abde2be451ceff06e7e01f312392b32c7a5 (diff)
downloadfutures-android13-qpr1-s6-release.tar.gz
Update futures to 0.3.21 am: 47ef9cbd72 am: e3f9774990 am: 652b2abde2t_frc_odp_330442040t_frc_odp_330442000t_frc_ase_330444010android-13.0.0_r83android-13.0.0_r82android-13.0.0_r81android-13.0.0_r80android-13.0.0_r79android-13.0.0_r78android-13.0.0_r77android-13.0.0_r76android-13.0.0_r75android-13.0.0_r74android-13.0.0_r73android-13.0.0_r72android-13.0.0_r71android-13.0.0_r70android-13.0.0_r69android-13.0.0_r68android-13.0.0_r67android-13.0.0_r66android-13.0.0_r65android-13.0.0_r64android-13.0.0_r63android-13.0.0_r62android-13.0.0_r61android-13.0.0_r60android-13.0.0_r59android-13.0.0_r58android-13.0.0_r57android-13.0.0_r56android-13.0.0_r55android-13.0.0_r54android-13.0.0_r53android-13.0.0_r52android-13.0.0_r51android-13.0.0_r50android-13.0.0_r49android-13.0.0_r48android-13.0.0_r47android-13.0.0_r46android-13.0.0_r45android-13.0.0_r44android-13.0.0_r43android-13.0.0_r42android-13.0.0_r41android-13.0.0_r40android-13.0.0_r39android-13.0.0_r38android-13.0.0_r37android-13.0.0_r36android-13.0.0_r35android-13.0.0_r34android-13.0.0_r33android-13.0.0_r32android-13.0.0_r30android-13.0.0_r29android-13.0.0_r28android-13.0.0_r27android-13.0.0_r24android-13.0.0_r23android-13.0.0_r22android-13.0.0_r21android-13.0.0_r20android-13.0.0_r19android-13.0.0_r18android-13.0.0_r17android-13.0.0_r16aml_go_odp_330912000aml_go_ads_330915100aml_go_ads_330915000aml_go_ads_330913000android13-qpr3-s9-releaseandroid13-qpr3-s8-releaseandroid13-qpr3-s7-releaseandroid13-qpr3-s6-releaseandroid13-qpr3-s5-releaseandroid13-qpr3-s4-releaseandroid13-qpr3-s3-releaseandroid13-qpr3-s2-releaseandroid13-qpr3-s14-releaseandroid13-qpr3-s13-releaseandroid13-qpr3-s12-releaseandroid13-qpr3-s11-releaseandroid13-qpr3-s10-releaseandroid13-qpr3-s1-releaseandroid13-qpr3-releaseandroid13-qpr3-c-s8-releaseandroid13-qpr3-c-s7-releaseandroid13-qpr3-c-s6-releaseandroid13-qpr3-c-s5-releaseandroid13-qpr3-c-s4-releaseandroid13-qpr3-c-s3-releaseandroid13-qpr3-c-s2-releaseandroid13-qpr3-c-s12-releaseandroid13-qpr3-c-s11-releaseandroid13-qpr3-c-s10-releaseandroid13-qpr3-c-s1-releaseandroid13-qpr2-s9-releaseandroid13-qpr2-s8-releaseandroid13-qpr2-s7-releaseandroid13-qpr2-s6-releaseandroid13-qpr2-s5-releaseandroid13-qpr2-s3-releaseandroid13-qpr2-s2-releaseandroid13-qpr2-s12-releaseandroid13-qpr2-s11-releaseandroid13-qpr2-s10-releaseandroid13-qpr2-s1-releaseandroid13-qpr2-releaseandroid13-qpr2-b-s1-releaseandroid13-qpr1-s8-releaseandroid13-qpr1-s7-releaseandroid13-qpr1-s6-releaseandroid13-qpr1-s5-releaseandroid13-qpr1-s4-releaseandroid13-qpr1-s3-releaseandroid13-qpr1-s2-releaseandroid13-qpr1-s1-releaseandroid13-qpr1-releaseandroid13-mainline-go-adservices-releaseandroid13-frc-odp-releaseandroid13-devandroid13-d4-s2-releaseandroid13-d4-s1-releaseandroid13-d4-releaseandroid13-d3-s1-releaseandroid13-d2-release
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures/+/2004096 Change-Id: I065efefc808c7dc9abb1b917aabffb79b7a9844d
-rw-r--r--.cargo_vcs_info.json7
-rw-r--r--Android.bp2
-rw-r--r--Cargo.toml106
-rw-r--r--Cargo.toml.orig20
-rw-r--r--METADATA10
-rw-r--r--src/lib.rs76
-rw-r--r--tests/compat.rs1
-rw-r--r--tests/eventual.rs2
-rw-r--r--tests/future_join_all.rs25
-rw-r--r--tests/future_shared.rs1
-rw-r--r--tests/future_try_join_all.rs24
-rw-r--r--tests/io_line_writer.rs73
-rw-r--r--tests/lock_mutex.rs1
-rw-r--r--tests/macro_comma_support.rs1
-rw-r--r--tests/ready_queue.rs3
-rw-r--r--tests/recurse.rs1
-rw-r--r--tests/sink.rs1
-rw-r--r--tests/stream.rs272
-rw-r--r--tests/stream_futures_ordered.rs2
-rw-r--r--tests/stream_futures_unordered.rs2
-rw-r--r--tests/stream_try_stream.rs2
-rw-r--r--tests/task_atomic_waker.rs1
22 files changed, 556 insertions, 77 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index ffd4f55..e483977 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
- }
-}
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures"
+} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index bc471eb..a594b53 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_library {
host_supported: true,
crate_name: "futures",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.17",
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/Cargo.toml b/Cargo.toml
index 17c4d5f..f740f96 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,53 +11,72 @@
[package]
edition = "2018"
+rust-version = "1.45"
name = "futures"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+version = "0.3.21"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
readme = "../README.md"
-keywords = ["futures", "async", "future"]
+keywords = [
+ "futures",
+ "async",
+ "future",
+]
categories = ["asynchronous"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
[package.metadata.playground]
-features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+features = [
+ "std",
+ "async-await",
+ "compat",
+ "io-compat",
+ "executor",
+ "thread-pool",
+]
+
[dependencies.futures-channel]
-version = "0.3.17"
+version = "0.3.21"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures-executor]
-version = "0.3.17"
+version = "0.3.21"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures-sink]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures-task]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures-util]
-version = "0.3.17"
+version = "0.3.21"
features = ["sink"]
default-features = false
+
[dev-dependencies.assert_matches]
version = "1.3.0"
@@ -74,16 +93,55 @@ version = "1"
version = "0.1.11"
[features]
-alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
-async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+alloc = [
+ "futures-core/alloc",
+ "futures-task/alloc",
+ "futures-sink/alloc",
+ "futures-channel/alloc",
+ "futures-util/alloc",
+]
+async-await = [
+ "futures-util/async-await",
+ "futures-util/async-await-macro",
+]
bilock = ["futures-util/bilock"]
cfg-target-has-atomic = []
-compat = ["std", "futures-util/compat"]
-default = ["std", "async-await", "executor"]
-executor = ["std", "futures-executor/std"]
-io-compat = ["compat", "futures-util/io-compat"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
-std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
-thread-pool = ["executor", "futures-executor/thread-pool"]
-unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+compat = [
+ "std",
+ "futures-util/compat",
+]
+default = [
+ "std",
+ "async-await",
+ "executor",
+]
+executor = [
+ "std",
+ "futures-executor/std",
+]
+io-compat = [
+ "compat",
+ "futures-util/io-compat",
+]
+std = [
+ "alloc",
+ "futures-core/std",
+ "futures-task/std",
+ "futures-io/std",
+ "futures-sink/std",
+ "futures-util/std",
+ "futures-util/io",
+ "futures-util/channel",
+]
+thread-pool = [
+ "executor",
+ "futures-executor/thread-pool",
+]
+unstable = [
+ "futures-core/unstable",
+ "futures-task/unstable",
+ "futures-channel/unstable",
+ "futures-io/unstable",
+ "futures-util/unstable",
+]
write-all-vectored = ["futures-util/write-all-vectored"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index b01b12e..6871f47 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,14 +1,13 @@
[package]
name = "futures"
+version = "0.3.21"
edition = "2018"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
@@ -16,13 +15,13 @@ composability, and iterator-like interfaces.
categories = ["asynchronous"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.17", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] }
[dev-dependencies]
futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
@@ -48,7 +47,6 @@ thread-pool = ["executor", "futures-executor/thread-pool"]
# `unstable` feature as an explicit opt-in to unstable API.
unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
bilock = ["futures-util/bilock"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
write-all-vectored = ["futures-util/write-all-vectored"]
# These features are no longer used.
diff --git a/METADATA b/METADATA
index ba70d3e..554f4df 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.17.crate"
+ value: "https://static.crates.io/crates/futures/futures-0.3.21.crate"
}
- version: "0.3.17"
+ version: "0.3.21"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 9
- day: 22
+ year: 2022
+ month: 3
+ day: 1
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 362aa3c..b8ebc61 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -25,6 +25,7 @@
//! within macros and keywords such as async and await!.
//!
//! ```rust
+//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
//! # use futures::channel::mpsc;
//! # use futures::executor; ///standard executors to provide a context for futures and streams
//! # use futures::executor::ThreadPool;
@@ -78,7 +79,6 @@
//! The majority of examples and code snippets in this crate assume that they are
//! inside an async block as written above.
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(not(feature = "std"), no_std)]
#![warn(
missing_debug_implementations,
@@ -99,9 +99,6 @@
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
#[doc(no_inline)]
pub use futures_core::future::{Future, TryFuture};
#[doc(no_inline)]
@@ -154,13 +151,73 @@ pub use futures_util::io;
#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
-#[doc(inline)]
-pub use futures_executor as executor;
+pub mod executor {
+ //! Built-in executors and related tools.
+ //!
+ //! All asynchronous computation occurs within an executor, which is
+ //! capable of spawning futures as tasks. This module provides several
+ //! built-in executors, as well as tools for building your own.
+ //!
+ //!
+ //! This module is only available when the `executor` feature of this
+ //! library is activated.
+ //!
+ //! # Using a thread pool (M:N task scheduling)
+ //!
+ //! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+ //! A small set of worker threads can handle a very large set of spawned tasks
+ //! (which are much lighter weight than threads). Tasks spawned onto the pool
+ //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+ //! the created threads.
+ //!
+ //! # Spawning additional tasks
+ //!
+ //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+ //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+ //! instead.
+ //!
+ //! # Single-threaded execution
+ //!
+ //! In addition to thread pools, it's possible to run a task (and the tasks
+ //! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+ //! Aside from cutting down on synchronization costs, this executor also makes
+ //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+ //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+ //! little work between I/O operations.
+ //!
+ //! There is also a convenience function [`block_on`] for simply running a
+ //! future to completion on the current thread.
+ //!
+ //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+ //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+ pub use futures_executor::{
+ block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
+ LocalSpawner,
+ };
+
+ #[cfg(feature = "thread-pool")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+ pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-#[doc(inline)]
-pub use futures_util::compat;
+pub mod compat {
+ //! Interop between `futures` 0.1 and 0.3.
+ //!
+ //! This module is only available when the `compat` feature of this
+ //! library is activated.
+
+ pub use futures_util::compat::{
+ Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
+ Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
+ };
+
+ #[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+ pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+}
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
@@ -181,10 +238,12 @@ pub mod prelude {
pub use crate::stream::{self, Stream, TryStream};
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::future::{FutureExt as _, TryFutureExt as _};
#[doc(no_inline)]
pub use crate::sink::SinkExt as _;
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::stream::{StreamExt as _, TryStreamExt as _};
#[cfg(feature = "std")]
@@ -192,6 +251,7 @@ pub mod prelude {
#[cfg(feature = "std")]
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::io::{
AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
};
diff --git a/tests/compat.rs b/tests/compat.rs
index c4125d8..ac04a95 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -1,4 +1,5 @@
#![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
use futures::compat::Future01CompatExt;
use futures::prelude::*;
diff --git a/tests/eventual.rs b/tests/eventual.rs
index bff000d..3461380 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
use futures::channel::oneshot;
use futures::executor::ThreadPool;
use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs
index ae05a21..44486e1 100644
--- a/tests/future_join_all.rs
+++ b/tests/future_join_all.rs
@@ -1,22 +1,24 @@
use futures::executor::block_on;
use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
use std::fmt::Debug;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
- assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+ assert_done(join_all(vec![ready(1)]), vec![1]);
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+ // assert_done(join_all(Vec::<i32>::new()), vec![]);
// TODO: needs more tests
}
@@ -25,18 +27,15 @@ fn collect_collects() {
fn join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `JoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
- Box::new(join_all(iter))
+ join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
}
#[test]
fn join_all_from_iter() {
- assert_done(
- || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
- vec![1, 2],
- )
+ assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
}
diff --git a/tests/future_shared.rs b/tests/future_shared.rs
index 718d6c4..3ceaebb 100644
--- a/tests/future_shared.rs
+++ b/tests/future_shared.rs
@@ -96,6 +96,7 @@ fn drop_in_poll() {
assert_eq!(block_on(future1), 1);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn peek() {
let mut local_pool = LocalPool::new();
diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs
index a4b3bb7..9a82487 100644
--- a/tests/future_try_join_all.rs
+++ b/tests/future_try_join_all.rs
@@ -1,24 +1,26 @@
use futures::executor::block_on;
+use futures::pin_mut;
use futures_util::future::{err, ok, try_join_all, TryJoinAll};
use std::fmt::Debug;
use std::future::Future;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
- assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
- assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+ assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+ assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+ assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+ // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
// TODO: needs more tests
}
@@ -27,18 +29,18 @@ fn collect_collects() {
fn try_join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `TryJoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
- Box::new(try_join_all(iter))
+ try_join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
}
#[test]
fn try_join_all_from_iter() {
assert_done(
- || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+ vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
Ok::<_, usize>(vec![1, 2]),
)
}
diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs
new file mode 100644
index 0000000..b483e0f
--- /dev/null
+++ b/tests/io_line_writer.rs
@@ -0,0 +1,73 @@
+use futures::executor::block_on;
+use futures::io::{AsyncWriteExt, LineWriter};
+use std::io;
+
+#[test]
+fn line_writer() {
+ let mut writer = LineWriter::new(Vec::new());
+
+ block_on(writer.write(&[0])).unwrap();
+ assert_eq!(*writer.get_ref(), []);
+
+ block_on(writer.write(&[1])).unwrap();
+ assert_eq!(*writer.get_ref(), []);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]);
+
+ block_on(writer.write(&[3, b'\n'])).unwrap();
+ assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']);
+}
+
+#[test]
+fn line_vectored() {
+ let mut line_writer = LineWriter::new(Vec::new());
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"\n"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"a"),
+ ]))
+ .unwrap(),
+ 2
+ );
+ assert_eq!(line_writer.get_ref(), b"\n");
+
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"b"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"a"),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"c"),
+ ]))
+ .unwrap(),
+ 3
+ );
+ assert_eq!(line_writer.get_ref(), b"\n");
+ block_on(line_writer.flush()).unwrap();
+ assert_eq!(line_writer.get_ref(), b"\nabac");
+ assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0);
+
+ assert_eq!(
+ block_on(line_writer.write_vectored(&[
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ ]))
+ .unwrap(),
+ 0
+ );
+
+ assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3);
+ assert_eq!(line_writer.get_ref(), b"\nabaca\nb");
+}
diff --git a/tests/lock_mutex.rs b/tests/lock_mutex.rs
index 7c33864..c92ef50 100644
--- a/tests/lock_mutex.rs
+++ b/tests/lock_mutex.rs
@@ -34,6 +34,7 @@ fn mutex_wakes_waiters() {
assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn mutex_contested() {
let (tx, mut rx) = mpsc::unbounded();
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index 85871e9..3b082d2 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -14,6 +14,7 @@ fn ready() {
}))
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn poll() {
use futures::poll;
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index 8290132..afba8f2 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -93,6 +93,9 @@ fn dropping_ready_queue() {
#[test]
fn stress() {
+ #[cfg(miri)]
+ const ITER: usize = 30;
+ #[cfg(not(miri))]
const ITER: usize = 300;
for i in 0..ITER {
diff --git a/tests/recurse.rs b/tests/recurse.rs
index d81753c..f06524f 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -3,6 +3,7 @@ use futures::future::{self, BoxFuture, FutureExt};
use std::sync::mpsc;
use std::thread;
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn lots() {
#[cfg(not(futures_sanitizer))]
diff --git a/tests/sink.rs b/tests/sink.rs
index f3cf11b..dc826bd 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -288,6 +288,7 @@ fn mpsc_blocking_start_send() {
// test `flush` by using `with` to make the first insertion into a sink block
// until a oneshot is completed
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn with_flush() {
let (tx, rx) = oneshot::channel();
diff --git a/tests/stream.rs b/tests/stream.rs
index 0d453d1..71ec654 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,10 +1,14 @@
+use std::iter;
+use std::sync::Arc;
+
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::{self, Future};
+use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
-use futures::FutureExt;
+use futures::{ready, FutureExt};
use futures_test::task::noop_context;
#[test]
@@ -50,6 +54,272 @@ fn scan() {
}
#[test]
+fn flatten_unordered() {
+ use futures::executor::block_on;
+ use futures::stream::*;
+ use futures::task::*;
+ use std::convert::identity;
+ use std::pin::Pin;
+ use std::thread;
+ use std::time::Duration;
+
+ struct DataStream {
+ data: Vec<u8>,
+ polled: bool,
+ wake_immediately: bool,
+ }
+
+ impl Stream for DataStream {
+ type Item = u8;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time =
+ Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ self.polled = true;
+ Poll::Pending
+ } else {
+ self.polled = false;
+ Poll::Ready(self.data.pop())
+ }
+ }
+ }
+
+ struct Interchanger {
+ polled: bool,
+ base: u8,
+ wake_immediately: bool,
+ }
+
+ impl Stream for Interchanger {
+ type Item = DataStream;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ self.polled = true;
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time = Duration::from_millis(self.base as u64);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ Poll::Pending
+ } else {
+ let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+ self.base += 1;
+ self.polled = false;
+ Poll::Ready(Some(DataStream {
+ polled: false,
+ data,
+ wake_immediately: self.wake_immediately && self.base % 2 == 0,
+ }))
+ }
+ }
+ }
+
+ // basic behaviour
+ {
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(10..=12),
+ ]);
+
+ let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+ assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+ });
+
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(0..=2),
+ ]);
+
+ let mut fm_unordered = st
+ .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+ });
+ }
+
+ // wake up immediately
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // wake up after delay
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>(),
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ );
+
+ fm_unordered.sort_unstable();
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, fl_unordered);
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // waker panics
+ {
+ let stream = Arc::new(Mutex::new(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity)),
+ ));
+
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+ let panic_waker = waker(Arc::new(PanicWaker));
+ let mut panic_cx = Context::from_waker(&panic_waker);
+ let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+ Poll::Ready(Some(()))
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // stream panics
+ {
+ let st = stream::iter(iter::once(
+ once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+ ))
+ .chain(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .map(|stream| stream.right_stream())
+ .take(10),
+ );
+
+ let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+ let data = ready!(lock.poll_next_unpin(cx));
+
+ Poll::Ready(data)
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+}
+
+#[test]
fn take_until() {
fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
let mut i = 0;
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs
index 7506c65..84e0bcc 100644
--- a/tests/stream_futures_ordered.rs
+++ b/tests/stream_futures_ordered.rs
@@ -26,6 +26,7 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -54,6 +55,7 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn queue_never_unblocked() {
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs
index 4b9afcc..f62f733 100644
--- a/tests/stream_futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -56,6 +56,7 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -85,6 +86,7 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn finished_future() {
let (_a_tx, a_rx) = oneshot::channel::<i32>();
diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs
index 194e74d..d83fc54 100644
--- a/tests/stream_try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
use futures::{
stream::{self, StreamExt, TryStreamExt},
task::Poll,
diff --git a/tests/task_atomic_waker.rs b/tests/task_atomic_waker.rs
index cec3db2..2d1612a 100644
--- a/tests/task_atomic_waker.rs
+++ b/tests/task_atomic_waker.rs
@@ -6,6 +6,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
+#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn basic() {
let atomic_waker = Arc::new(AtomicWaker::new());