aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-02-10 11:08:03 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-02-10 11:08:03 +0000
commit64d98f2516574959a4f0ebd928125433595d8fb0 (patch)
tree6c60bbd745f93079f6c279f3dae07a2d3c372ecc
parentf3b63b98a5ad51cc0e374919cdcb38f978e27464 (diff)
parent089614a74d85a3d29226640874cca2534889f8a5 (diff)
downloadfutures-64d98f2516574959a4f0ebd928125433595d8fb0.tar.gz
Upgrade rust/crates/futures to 0.3.12 am: 51d51a0897 am: 089614a74d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures/+/1582706 MUST ONLY BE SUBMITTED BY AUTOMERGER Change-Id: I0aefb94d5ee741f499c492de16c0a7e7cef97435
-rw-r--r--.cargo_vcs_info.json5
-rw-r--r--Android.bp25
-rw-r--r--Cargo.toml31
-rw-r--r--Cargo.toml.orig29
-rw-r--r--METADATA10
-rw-r--r--src/lib.rs440
-rw-r--r--tests/arc_wake.rs4
-rw-r--r--tests/async_await_macros.rs34
-rw-r--r--tests/io_read.rs2
-rw-r--r--tests/io_read_to_end.rs64
-rw-r--r--tests/io_write.rs2
-rw-r--r--tests/join_all.rs2
-rw-r--r--tests/recurse.rs9
-rw-r--r--tests/shared.rs30
-rw-r--r--tests/sink.rs73
-rw-r--r--tests/stream_into_async_read.rs4
-rw-r--r--tests/try_join_all.rs2
-rw-r--r--tests/try_stream.rs38
-rw-r--r--tests_disabled/stream.rs4
19 files changed, 338 insertions, 470 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..4fd4ba3
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+ "git": {
+ "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6"
+ }
+}
diff --git a/Android.bp b/Android.bp
index f62f6d9..29eb738 100644
--- a/Android.bp
+++ b/Android.bp
@@ -30,23 +30,22 @@ rust_library {
}
// dependent_library ["feature_list"]
-// futures-channel-0.3.8 "alloc,futures-sink,sink,std"
-// futures-core-0.3.8 "alloc,std"
-// futures-executor-0.3.8 "std"
-// futures-io-0.3.8 "std"
-// futures-macro-0.3.8
-// futures-sink-0.3.8 "alloc,std"
-// futures-task-0.3.8 "alloc,once_cell,std"
-// futures-util-0.3.8 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
+// futures-channel-0.3.12 "alloc,futures-sink,sink,std"
+// futures-core-0.3.12 "alloc,std"
+// futures-executor-0.3.12 "std"
+// futures-io-0.3.12 "std"
+// futures-macro-0.3.12
+// futures-sink-0.3.12 "alloc,std"
+// futures-task-0.3.12 "alloc,once_cell,std"
+// futures-util-0.3.12 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
// memchr-2.3.4 "default,std"
// once_cell-1.5.2 "alloc,std"
-// pin-project-1.0.2
-// pin-project-internal-1.0.2
+// pin-project-lite-0.2.4
// pin-utils-0.1.0
// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.6
+// proc-macro-nested-0.1.7
// proc-macro2-1.0.24 "default,proc-macro"
-// quote-1.0.7 "default,proc-macro"
+// quote-1.0.8 "default,proc-macro"
// slab-0.4.2
-// syn-1.0.53 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
+// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
// unicode-xid-0.2.1 "default"
diff --git a/Cargo.toml b/Cargo.toml
index fb21673..bb846bf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "futures"
-version = "0.3.7"
+version = "0.3.12"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3.7"
+documentation = "https://docs.rs/futures/0.3"
readme = "../README.md"
keywords = ["futures", "async", "future"]
categories = ["asynchronous"]
@@ -30,35 +30,46 @@ rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
[dependencies.futures-channel]
-version = "0.3.7"
+version = "0.3.12"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures-executor]
-version = "0.3.7"
+version = "0.3.12"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures-sink]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures-task]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures-util]
-version = "0.3.7"
+version = "0.3.12"
features = ["sink"]
default-features = false
+[dev-dependencies.assert_matches]
+version = "1.3.0"
+
+[dev-dependencies.pin-project]
+version = "1.0.1"
+
+[dev-dependencies.pin-utils]
+version = "0.1.0"
+
+[dev-dependencies.tokio]
+version = "0.1.11"
[features]
alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
@@ -74,5 +85,3 @@ std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futur
thread-pool = ["executor", "futures-executor/thread-pool"]
unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
write-all-vectored = ["futures-util/write-all-vectored"]
-[badges.travis-ci]
-repository = "rust-lang/futures-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 342441b..34ca82e 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,31 +1,36 @@
[package]
name = "futures"
edition = "2018"
-version = "0.3.7"
+version = "0.3.12"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3.7"
+documentation = "https://docs.rs/futures/0.3"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
"""
categories = ["asynchronous"]
-[badges]
-travis-ci = { repository = "rust-lang/futures-rs" }
-
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.7", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.7", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.7", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.7", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.7", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.7", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.7", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.12", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.12", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.12", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.12", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.12", default-features = false, features = ["sink"] }
+
+[dev-dependencies]
+pin-utils = "0.1.0"
+futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
+futures-test = { path = "../futures-test" }
+tokio = "0.1.11"
+assert_matches = "1.3.0"
+pin-project = "1.0.1"
[features]
default = ["std", "async-await", "executor"]
diff --git a/METADATA b/METADATA
index e7e876a..b64ba70 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.7.crate"
+ value: "https://static.crates.io/crates/futures/futures-0.3.12.crate"
}
- version: "0.3.7"
+ version: "0.3.12"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 10
- day: 25
+ year: 2021
+ month: 2
+ day: 9
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 072c2da..de29ace 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -87,16 +87,8 @@
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]
-
-// mem::take requires Rust 1.40, matches! requires Rust 1.42
-// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
-// get's implemented.
-#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
-
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-#![doc(html_root_url = "https://docs.rs/futures/0.3.7")]
-
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
@@ -108,263 +100,61 @@ compile_error!("The `bilock` feature requires the `unstable` feature as an expli
#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-#[doc(hidden)] pub use futures_core::future::{Future, TryFuture};
-#[doc(hidden)] pub use futures_util::future::{FutureExt, TryFutureExt};
+#[doc(hidden)]
+pub use futures_core::future::{Future, TryFuture};
+#[doc(hidden)]
+pub use futures_util::future::{FutureExt, TryFutureExt};
-#[doc(hidden)] pub use futures_core::stream::{Stream, TryStream};
-#[doc(hidden)] pub use futures_util::stream::{StreamExt, TryStreamExt};
+#[doc(hidden)]
+pub use futures_core::stream::{Stream, TryStream};
+#[doc(hidden)]
+pub use futures_util::stream::{StreamExt, TryStreamExt};
-#[doc(hidden)] pub use futures_sink::Sink;
-#[doc(hidden)] pub use futures_util::sink::SinkExt;
+#[doc(hidden)]
+pub use futures_sink::Sink;
+#[doc(hidden)]
+pub use futures_util::sink::SinkExt;
#[cfg(feature = "std")]
-#[doc(hidden)] pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
+#[doc(hidden)]
+pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
-#[doc(hidden)] pub use futures_util::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt};
+#[doc(hidden)]
+pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
// Macro reexports
pub use futures_core::ready; // Readiness propagation
pub use futures_util::pin_mut;
-#[cfg(feature = "async-await")]
-pub use futures_util::{pending, poll, join, try_join, select_biased}; // Async-await
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::select;
+#[cfg(feature = "async-await")]
+pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-await
+
+// Module reexports
+#[doc(inline)]
+pub use futures_util::{future, never, sink, stream, task};
-#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
-pub mod channel {
- //! Cross-task communication.
- //!
- //! Like threads, concurrent tasks sometimes need to communicate with each
- //! other. This module contains two basic abstractions for doing so:
- //!
- //! - [oneshot](crate::channel::oneshot), a way of sending a single value
- //! from one task to another.
- //! - [mpsc](crate::channel::mpsc), a multi-producer, single-consumer
- //! channel for sending values between tasks, analogous to the
- //! similarly-named structure in the standard library.
- //!
- //! This module is only available when the `std` or `alloc` feature of this
- //! library is activated, and it is activated by default.
+#[doc(inline)]
+pub use futures_channel as channel;
+#[cfg(feature = "alloc")]
+#[doc(inline)]
+pub use futures_util::lock;
- pub use futures_channel::oneshot;
+#[cfg(feature = "std")]
+#[doc(inline)]
+pub use futures_util::io;
- #[cfg(feature = "std")]
- pub use futures_channel::mpsc;
-}
+#[cfg(feature = "executor")]
+#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
+#[doc(inline)]
+pub use futures_executor as executor;
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-pub mod compat {
- //! Interop between `futures` 0.1 and 0.3.
- //!
- //! This module is only available when the `compat` feature of this
- //! library is activated.
-
- pub use futures_util::compat::{
- Compat,
- CompatSink,
- Compat01As03,
- Compat01As03Sink,
- Executor01Future,
- Executor01As03,
- Executor01CompatExt,
- Future01CompatExt,
- Stream01CompatExt,
- Sink01CompatExt,
- };
-
- #[cfg(feature = "io-compat")]
- #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
- pub use futures_util::compat::{
- AsyncRead01CompatExt,
- AsyncWrite01CompatExt,
- };
-}
-
-#[cfg(feature = "executor")]
-pub mod executor {
- //! Task execution.
- //!
- //! All asynchronous computation occurs within an executor, which is
- //! capable of spawning futures as tasks. This module provides several
- //! built-in executors, as well as tools for building your own.
- //!
- //! This module is only available when the `executor` feature of this
- //! library is activated, and it is activated by default.
- //!
- //! # Using a thread pool (M:N task scheduling)
- //!
- //! Most of the time tasks should be executed on a [thread
- //! pool](crate::executor::ThreadPool). A small set of worker threads can
- //! handle a very large set of spawned tasks (which are much lighter weight
- //! than threads). Tasks spawned onto the pool with the
- //! [`spawn_ok()`](crate::executor::ThreadPool::spawn_ok)
- //! function will run ambiently on the created threads.
- //!
- //! # Spawning additional tasks
- //!
- //! Tasks can be spawned onto a spawner by calling its
- //! [`spawn_obj`](crate::task::Spawn::spawn_obj) method directly.
- //! In the case of `!Send` futures,
- //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj)
- //! can be used instead.
- //!
- //! # Single-threaded execution
- //!
- //! In addition to thread pools, it's possible to run a task (and the tasks
- //! it spawns) entirely within a single thread via the
- //! [`LocalPool`](crate::executor::LocalPool) executor. Aside from cutting
- //! down on synchronization costs, this executor also makes it possible to
- //! spawn non-`Send` tasks, via
- //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj).
- //! The `LocalPool` is best suited for running I/O-bound tasks that do
- //! relatively little work between I/O operations.
- //!
- //! There is also a convenience function
- //! [`block_on`](crate::executor::block_on) for simply running a future to
- //! completion on the current thread.
-
- pub use futures_executor::{
- BlockingStream,
- Enter, EnterError,
- LocalSpawner, LocalPool,
- block_on, block_on_stream, enter,
- };
-
- #[cfg(feature = "thread-pool")]
- #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
- pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
-}
-
-pub mod future {
- //! Asynchronous values.
- //!
- //! This module contains:
- //!
- //! - The [`Future` trait](crate::future::Future).
- //! - The [`FutureExt`](crate::future::FutureExt) trait, which provides
- //! adapters for chaining and composing futures.
- //! - Top-level future combinators like [`lazy`](crate::future::lazy) which
- //! creates a future from a closure that defines its return value, and
- //! [`ready`](crate::future::ready), which constructs a future with an
- //! immediate defined value.
-
- pub use futures_core::future::{
- Future, TryFuture, FusedFuture,
- };
-
- #[cfg(feature = "alloc")]
- pub use futures_core::future::{BoxFuture, LocalBoxFuture};
-
- pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
-
- pub use futures_util::future::{
- lazy, Lazy,
- maybe_done, MaybeDone,
- pending, Pending,
- poll_fn, PollFn,
- ready, ok, err, Ready,
- join, join3, join4, join5,
- Join, Join3, Join4, Join5,
- select, Select,
- try_join, try_join3, try_join4, try_join5,
- TryJoin, TryJoin3, TryJoin4, TryJoin5,
- try_select, TrySelect,
- Either,
- OptionFuture,
-
- FutureExt,
- FlattenStream, Flatten, Fuse, Inspect, IntoStream, Map, Then, UnitError,
- NeverError,
-
- TryFutureExt,
- AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
- InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse,
- };
-
- #[cfg(feature = "alloc")]
- pub use futures_util::future::{
- join_all, JoinAll,
- select_all, SelectAll,
- try_join_all, TryJoinAll,
- select_ok, SelectOk,
- };
-
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- #[cfg(feature = "alloc")]
- pub use futures_util::future::{
- abortable, Abortable, AbortHandle, AbortRegistration, Aborted,
- };
-
- #[cfg(feature = "std")]
- pub use futures_util::future::{
- Remote, RemoteHandle,
- CatchUnwind, Shared,
- };
-}
-
-#[cfg(feature = "std")]
-pub mod io {
- //! Asynchronous I/O.
- //!
- //! This module is the asynchronous version of `std::io`. It defines four
- //! traits, [`AsyncRead`](crate::io::AsyncRead),
- //! [`AsyncWrite`](crate::io::AsyncWrite),
- //! [`AsyncSeek`](crate::io::AsyncSeek), and
- //! [`AsyncBufRead`](crate::io::AsyncBufRead), which mirror the `Read`,
- //! `Write`, `Seek`, and `BufRead` traits of the standard library. However,
- //! these traits integrate
- //! with the asynchronous task system, so that if an I/O object isn't ready
- //! for reading (or writing), the thread is not blocked, and instead the
- //! current task is queued to be woken when I/O is ready.
- //!
- //! In addition, the [`AsyncReadExt`](crate::io::AsyncReadExt),
- //! [`AsyncWriteExt`](crate::io::AsyncWriteExt),
- //! [`AsyncSeekExt`](crate::io::AsyncSeekExt), and
- //! [`AsyncBufReadExt`](crate::io::AsyncBufReadExt) extension traits offer a
- //! variety of useful combinators for operating with asynchronous I/O
- //! objects, including ways to work with them using futures, streams and
- //! sinks.
- //!
- //! This module is only available when the `std` feature of this
- //! library is activated, and it is activated by default.
-
- pub use futures_io::{
- AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
- IoSlice, IoSliceMut, Result, SeekFrom,
- };
-
- #[cfg(feature = "read-initializer")]
- #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
- pub use futures_io::Initializer;
-
- pub use futures_util::io::{
- AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
- BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf,
- empty, Empty, FillBuf, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
- ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat,
- Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
- WriteVectored,
- };
-}
-
-#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
-#[cfg(feature = "alloc")]
-pub mod lock {
- //! Futures-powered synchronization primitives.
- //!
- //! This module is only available when the `std` or `alloc` feature of this
- //! library is activated, and it is activated by default.
-
- #[cfg(feature = "bilock")]
- #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
- pub use futures_util::lock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
-
- #[cfg(feature = "std")]
- pub use futures_util::lock::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
-}
+#[doc(inline)]
+pub use futures_util::compat;
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
@@ -381,170 +171,22 @@ pub mod prelude {
//! The prelude may grow over time as additional items see ubiquitous use.
pub use crate::future::{self, Future, TryFuture};
- pub use crate::stream::{self, Stream, TryStream};
pub use crate::sink::{self, Sink};
+ pub use crate::stream::{self, Stream, TryStream};
#[doc(no_inline)]
pub use crate::future::{FutureExt as _, TryFutureExt as _};
#[doc(no_inline)]
- pub use crate::stream::{StreamExt as _, TryStreamExt as _};
- #[doc(no_inline)]
pub use crate::sink::SinkExt as _;
+ #[doc(no_inline)]
+ pub use crate::stream::{StreamExt as _, TryStreamExt as _};
#[cfg(feature = "std")]
- pub use crate::io::{
- AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead,
- };
+ pub use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
pub use crate::io::{
- AsyncReadExt as _, AsyncWriteExt as _, AsyncSeekExt as _, AsyncBufReadExt as _,
+ AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
};
}
-
-pub mod sink {
- //! Asynchronous sinks.
- //!
- //! This module contains:
- //!
- //! - The [`Sink` trait](crate::sink::Sink), which allows you to
- //! asynchronously write data.
- //! - The [`SinkExt`](crate::sink::SinkExt) trait, which provides adapters
- //! for chaining and composing sinks.
-
- pub use futures_sink::Sink;
-
- pub use futures_util::sink::{
- Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
- SinkExt, Fanout, Drain, drain,
- WithFlatMap,
- };
-
- #[cfg(feature = "alloc")]
- pub use futures_util::sink::Buffer;
-}
-
-pub mod stream {
- //! Asynchronous streams.
- //!
- //! This module contains:
- //!
- //! - The [`Stream` trait](crate::stream::Stream), for objects that can
- //! asynchronously produce a sequence of values.
- //! - The [`StreamExt`](crate::stream::StreamExt) trait, which provides
- //! adapters for chaining and composing streams.
- //! - Top-level stream constructors like [`iter`](crate::stream::iter)
- //! which creates a stream from an iterator.
-
- pub use futures_core::stream::{
- Stream, TryStream, FusedStream,
- };
-
- #[cfg(feature = "alloc")]
- pub use futures_core::stream::{BoxStream, LocalBoxStream};
-
- pub use futures_util::stream::{
- iter, Iter,
- repeat, Repeat,
- empty, Empty,
- pending, Pending,
- once, Once,
- poll_fn, PollFn,
- select, Select,
- unfold, Unfold,
- try_unfold, TryUnfold,
-
- StreamExt,
- Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten,
- Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
- SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeUntil,
- TakeWhile, Then, Zip,
-
- TryStreamExt,
- AndThen, ErrInto, MapOk, MapErr, OrElse,
- InspectOk, InspectErr,
- TryNext, TryForEach, TryFilter, TryFilterMap, TryFlatten,
- TryCollect, TryConcat, TryFold, TrySkipWhile, TryTakeWhile,
- IntoStream,
- };
-
- #[cfg(feature = "alloc")]
- pub use futures_util::stream::{
- // For StreamExt:
- Chunks, ReadyChunks,
- };
-
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- #[cfg(feature = "alloc")]
- pub use futures_util::stream::{
- FuturesOrdered,
- futures_unordered, FuturesUnordered,
-
- // For StreamExt:
- BufferUnordered, Buffered, ForEachConcurrent, SplitStream, SplitSink,
- ReuniteError,
-
- select_all, SelectAll,
- };
-
- #[cfg(feature = "std")]
- pub use futures_util::stream::{
- // For StreamExt:
- CatchUnwind,
- };
-
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- #[cfg(feature = "alloc")]
- pub use futures_util::stream::{
- // For TryStreamExt:
- TryBufferUnordered, TryForEachConcurrent,
- };
-
- #[cfg(feature = "std")]
- pub use futures_util::stream::IntoAsyncRead;
-}
-
-pub mod task {
- //! Tools for working with tasks.
- //!
- //! This module contains:
- //!
- //! - [`Spawn`](crate::task::Spawn), a trait for spawning new tasks.
- //! - [`Context`](crate::task::Context), a context of an asynchronous task,
- //! including a handle for waking up the task.
- //! - [`Waker`](crate::task::Waker), a handle for waking up a task.
- //!
- //! The remaining types and traits in the module are used for implementing
- //! executors or dealing with synchronization issues around task wakeup.
-
- pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
-
- pub use futures_task::{
- Spawn, LocalSpawn, SpawnError,
- FutureObj, LocalFutureObj, UnsafeFutureObj,
- };
-
- pub use futures_util::task::noop_waker;
-
- #[cfg(feature = "std")]
- pub use futures_util::task::noop_waker_ref;
-
- #[cfg(feature = "alloc")]
- pub use futures_util::task::{SpawnExt, LocalSpawnExt};
-
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- #[cfg(feature = "alloc")]
- pub use futures_util::task::{waker, waker_ref, WakerRef, ArcWake};
-
- #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
- pub use futures_util::task::AtomicWaker;
-}
-
-pub mod never {
- //! This module contains the `Never` type.
- //!
- //! Values of this type can never be created and will never exist.
-
- pub use futures_util::never::Never;
-}
diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs
index 2830daf..d19a83d 100644
--- a/tests/arc_wake.rs
+++ b/tests/arc_wake.rs
@@ -7,8 +7,8 @@ mod countingwaker {
}
impl CountingWaker {
- fn new() -> CountingWaker {
- CountingWaker {
+ fn new() -> Self {
+ Self {
nr_wake: Mutex::new(0),
}
}
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index b521038..bd586d6 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -221,8 +221,8 @@ fn select_on_non_unpin_expressions() {
let res = block_on(async {
let select_res;
select! {
- value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
- value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
+ value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
};
select_res
});
@@ -243,9 +243,9 @@ fn select_on_non_unpin_expressions_with_default() {
let res = block_on(async {
let select_res;
select! {
- value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
- value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
- default => { select_res = 7 },
+ value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
+ value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
+ default => select_res = 7,
};
select_res
});
@@ -265,8 +265,8 @@ fn select_on_non_unpin_size() {
let fut = async {
let select_res;
select! {
- value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
- value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
+ value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
};
select_res
};
@@ -282,8 +282,8 @@ fn select_can_be_used_as_expression() {
block_on(async {
let res = select! {
- x = future::ready(7) => { x },
- y = future::ready(3) => { y + 1 },
+ x = future::ready(7) => x,
+ y = future::ready(3) => y + 1,
};
assert!(res == 7 || res == 4);
});
@@ -303,7 +303,7 @@ fn select_with_default_can_be_used_as_expression() {
block_on(async {
let res = select! {
x = poll_fn(poll_always_pending::<i32>).fuse() => x,
- y = poll_fn(poll_always_pending::<i32>).fuse() => { y + 1 },
+ y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
default => 99,
};
assert_eq!(res, 99);
@@ -318,8 +318,8 @@ fn select_with_complete_can_be_used_as_expression() {
block_on(async {
let res = select! {
- x = future::pending::<i32>() => { x },
- y = future::pending::<i32>() => { y + 1 },
+ x = future::pending::<i32>() => x,
+ y = future::pending::<i32>() => y + 1,
default => 99,
complete => 237,
};
@@ -328,6 +328,7 @@ fn select_with_complete_can_be_used_as_expression() {
}
#[test]
+#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
use futures::select;
use futures::executor::block_on;
@@ -339,8 +340,8 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
block_on(async {
let mut value = 234;
select! {
- x = require_mutable(&mut value).fuse() => { },
- y = async_noop().fuse() => {
+ _ = require_mutable(&mut value).fuse() => { },
+ _ = async_noop().fuse() => {
value += 5;
},
}
@@ -348,6 +349,7 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
}
#[test]
+#[allow(unused_assignments)]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
use futures::select;
use futures::executor::block_on;
@@ -359,8 +361,8 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
block_on(async {
let mut value = 234;
select! {
- x = require_mutable(&mut value).fuse() => { },
- y = async_noop().fuse() => {
+ _ = require_mutable(&mut value).fuse() => { },
+ _ = async_noop().fuse() => {
value += 5;
},
default => {
diff --git a/tests/io_read.rs b/tests/io_read.rs
index bc2a434..5902ad0 100644
--- a/tests/io_read.rs
+++ b/tests/io_read.rs
@@ -10,7 +10,7 @@ mod mock_reader {
impl MockReader {
pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- MockReader { fun: Box::new(fun) }
+ Self { fun: Box::new(fun) }
}
}
diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs
new file mode 100644
index 0000000..892d463
--- /dev/null
+++ b/tests/io_read_to_end.rs
@@ -0,0 +1,64 @@
+use futures::{
+ io::{self, AsyncRead, AsyncReadExt},
+ task::{Context, Poll},
+};
+use std::pin::Pin;
+
+#[test]
+#[should_panic(expected = "assertion failed: n <= buf.len()")]
+fn issue2310() {
+ struct MyRead {
+ first: bool,
+ }
+
+ impl MyRead {
+ pub fn new() -> Self {
+ MyRead { first: false }
+ }
+ }
+
+ impl AsyncRead for MyRead {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context,
+ _buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(if !self.first {
+ self.first = true;
+ // First iteration: return more than the buffer size
+ Ok(64)
+ } else {
+ // Second iteration: indicate that we are done
+ Ok(0)
+ })
+ }
+ }
+
+ struct VecWrapper {
+ inner: Vec<u8>,
+ }
+
+ impl VecWrapper {
+ pub fn new() -> Self {
+ VecWrapper { inner: Vec::new() }
+ }
+ }
+
+ impl Drop for VecWrapper {
+ fn drop(&mut self) {
+ // Observe uninitialized bytes
+ println!("{:?}", &self.inner);
+ // Overwrite heap contents
+ for b in &mut self.inner {
+ *b = 0x90;
+ }
+ }
+ }
+
+ futures::executor::block_on(async {
+ let mut vec = VecWrapper::new();
+ let mut read = MyRead::new();
+
+ read.read_to_end(&mut vec.inner).await.unwrap();
+ })
+}
diff --git a/tests/io_write.rs b/tests/io_write.rs
index 165b3d3..363f32b 100644
--- a/tests/io_write.rs
+++ b/tests/io_write.rs
@@ -10,7 +10,7 @@ mod mock_writer {
impl MockWriter {
pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- MockWriter { fun: Box::new(fun) }
+ Self { fun: Box::new(fun) }
}
}
diff --git a/tests/join_all.rs b/tests/join_all.rs
index 1450679..c322e58 100644
--- a/tests/join_all.rs
+++ b/tests/join_all.rs
@@ -37,7 +37,7 @@ fn join_all_iter_lifetime() {
Box::new(join_all(iter))
}
- util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
+ util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]);
}
#[test]
diff --git a/tests/recurse.rs b/tests/recurse.rs
index 87a4ff2..a151f1b 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -5,6 +5,11 @@ fn lots() {
use std::sync::mpsc;
use std::thread;
+ #[cfg(not(futures_sanitizer))]
+ const N: i32 = 1_000;
+ #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136
+ const N: i32 = 100;
+
fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> {
let (n, x) = input;
if n == 0 {
@@ -16,7 +21,7 @@ fn lots() {
let (tx, rx) = mpsc::channel();
thread::spawn(|| {
- block_on(do_it((1_000, 0)).map(move |x| tx.send(x).unwrap()))
+ block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))
});
- assert_eq!(500_500, rx.recv().unwrap());
+ assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap());
}
diff --git a/tests/shared.rs b/tests/shared.rs
index b2251a5..cc0c6a2 100644
--- a/tests/shared.rs
+++ b/tests/shared.rs
@@ -7,7 +7,7 @@ mod count_clone {
impl Clone for CountClone {
fn clone(&self) -> Self {
self.0.set(self.0.get() + 1);
- CountClone(self.0.clone())
+ Self(self.0.clone())
}
}
}
@@ -144,6 +144,34 @@ fn peek() {
}
#[test]
+fn downgrade() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
+ let (tx, rx) = oneshot::channel::<i32>();
+ let shared = rx.shared();
+ // Since there are outstanding `Shared`s, we can get a `WeakShared`.
+ let weak = shared.downgrade().unwrap();
+ // It should upgrade fine right now.
+ let mut shared2 = weak.upgrade().unwrap();
+
+ tx.send(42).unwrap();
+ assert_eq!(block_on(shared).unwrap(), 42);
+
+ // We should still be able to get a new `WeakShared` and upgrade it
+ // because `shared2` is outstanding.
+ assert!(shared2.downgrade().is_some());
+ assert!(weak.upgrade().is_some());
+
+ assert_eq!(block_on(&mut shared2).unwrap(), 42);
+ // Now that all `Shared`s have been exhausted, we should not be able
+ // to get a new `WeakShared` or upgrade an existing one.
+ assert!(weak.upgrade().is_none());
+ assert!(shared2.downgrade().is_none());
+}
+
+#[test]
fn dont_clone_in_single_owner_shared_future() {
use futures::channel::oneshot;
use futures::executor::block_on;
diff --git a/tests/sink.rs b/tests/sink.rs
index 8ed201e..597ed34 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -483,6 +483,41 @@ fn with_flush_propagate() {
})
}
+// test that `Clone` is implemented on `with` sinks
+#[test]
+fn with_implements_clone() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::{SinkExt, StreamExt};
+
+ let (mut tx, rx) = mpsc::channel(5);
+
+ {
+ let mut is_positive = tx
+ .clone()
+ .with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
+
+ let mut is_long = tx
+ .clone()
+ .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
+
+ block_on(is_positive.clone().send(-1)).unwrap();
+ block_on(is_long.clone().send("123456")).unwrap();
+ block_on(is_long.send("123")).unwrap();
+ block_on(is_positive.send(1)).unwrap();
+ }
+
+ block_on(tx.send(false)).unwrap();
+
+ block_on(tx.close()).unwrap();
+
+ assert_eq!(
+ block_on(rx.collect::<Vec<_>>()),
+ vec![false, true, false, true, false]
+ );
+}
+
// test that a buffer is a no-nop around a sink that always accepts sends
#[test]
fn buffer_noop() {
@@ -611,6 +646,44 @@ fn sink_map_err() {
}
#[test]
+fn sink_unfold() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future::poll_fn;
+ use futures::sink::{self, Sink, SinkExt};
+ use futures::task::Poll;
+
+ block_on(poll_fn(|cx| {
+ let (tx, mut rx) = mpsc::channel(1);
+ let unfold = sink::unfold((), |(), i: i32| {
+ let mut tx = tx.clone();
+ async move {
+ tx.send(i).await.unwrap();
+ Ok::<_, String>(())
+ }
+ });
+ futures::pin_mut!(unfold);
+ assert_eq!(unfold.as_mut().start_send(1), Ok(()));
+ assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
+ assert_eq!(rx.try_next().unwrap(), Some(1));
+
+ assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(unfold.as_mut().start_send(2), Ok(()));
+ assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(unfold.as_mut().start_send(3), Ok(()));
+ assert_eq!(rx.try_next().unwrap(), Some(2));
+ assert!(rx.try_next().is_err());
+ assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(unfold.as_mut().start_send(4), Ok(()));
+ assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
+ assert_eq!(rx.try_next().unwrap(), Some(3));
+ assert_eq!(rx.try_next().unwrap(), Some(4));
+
+ Poll::Ready(())
+ }))
+}
+
+#[test]
fn err_into() {
use futures::channel::mpsc;
use futures::sink::{Sink, SinkErrInto, SinkExt};
diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs
index eb78b59..222c985 100644
--- a/tests/stream_into_async_read.rs
+++ b/tests/stream_into_async_read.rs
@@ -52,7 +52,7 @@ fn test_into_async_read() {
}
#[test]
-fn test_into_async_bufread() -> std::io::Result<()> {
+fn test_into_async_bufread() {
use core::pin::Pin;
use futures::io::AsyncBufRead;
use futures::stream::{self, TryStreamExt};
@@ -97,6 +97,4 @@ fn test_into_async_bufread() -> std::io::Result<()> {
reader.as_mut().consume(3);
assert_fill_buf!(reader, &[][..]);
-
- Ok(())
}
diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs
index 3de679b..8e579a2 100644
--- a/tests/try_join_all.rs
+++ b/tests/try_join_all.rs
@@ -42,7 +42,7 @@ fn try_join_all_iter_lifetime() {
Box::new(try_join_all(iter))
}
- assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1]));
+ assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
}
#[test]
diff --git a/tests/try_stream.rs b/tests/try_stream.rs
new file mode 100644
index 0000000..194e74d
--- /dev/null
+++ b/tests/try_stream.rs
@@ -0,0 +1,38 @@
+use futures::{
+ stream::{self, StreamExt, TryStreamExt},
+ task::Poll,
+};
+use futures_test::task::noop_context;
+
+#[test]
+fn try_filter_map_after_err() {
+ let cx = &mut noop_context();
+ let mut s = stream::iter(1..=3)
+ .map(Ok)
+ .try_filter_map(|v| async move { Err::<Option<()>, _>(v) })
+ .filter_map(|r| async move { r.ok() })
+ .boxed();
+ assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
+}
+
+#[test]
+fn try_skip_while_after_err() {
+ let cx = &mut noop_context();
+ let mut s = stream::iter(1..=3)
+ .map(Ok)
+ .try_skip_while(|_| async move { Err::<_, ()>(()) })
+ .filter_map(|r| async move { r.ok() })
+ .boxed();
+ assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
+}
+
+#[test]
+fn try_take_while_after_err() {
+ let cx = &mut noop_context();
+ let mut s = stream::iter(1..=3)
+ .map(Ok)
+ .try_take_while(|_| async move { Err::<_, ()>(()) })
+ .filter_map(|r| async move { r.ok() })
+ .boxed();
+ assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
+}
diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs
index 4eaf12e..ef0676d 100644
--- a/tests_disabled/stream.rs
+++ b/tests_disabled/stream.rs
@@ -66,8 +66,8 @@ fn map_err() {
struct FromErrTest(u32);
impl From<u32> for FromErrTest {
- fn from(i: u32) -> FromErrTest {
- FromErrTest(i)
+ fn from(i: u32) -> Self {
+ Self(i)
}
}