diff options
author | Haibo Huang <hhb@google.com> | 2021-02-09 16:59:45 -0800 |
---|---|---|
committer | Haibo Huang <hhb@google.com> | 2021-02-09 16:59:45 -0800 |
commit | 51d51a0897dd3e6b9745e74fd8dace2886a69270 (patch) | |
tree | 6c60bbd745f93079f6c279f3dae07a2d3c372ecc | |
parent | 46caea809ff8cc690fa2329a852179a78ea9d7db (diff) | |
download | futures-51d51a0897dd3e6b9745e74fd8dace2886a69270.tar.gz |
Upgrade rust/crates/futures to 0.3.12
Test: make
Change-Id: I32391e03ce49616fb6d619a1d0861faabc38685c
-rw-r--r-- | .cargo_vcs_info.json | 5 | ||||
-rw-r--r-- | Android.bp | 25 | ||||
-rw-r--r-- | Cargo.toml | 31 | ||||
-rw-r--r-- | Cargo.toml.orig | 29 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | src/lib.rs | 440 | ||||
-rw-r--r-- | tests/arc_wake.rs | 4 | ||||
-rw-r--r-- | tests/async_await_macros.rs | 34 | ||||
-rw-r--r-- | tests/io_read.rs | 2 | ||||
-rw-r--r-- | tests/io_read_to_end.rs | 64 | ||||
-rw-r--r-- | tests/io_write.rs | 2 | ||||
-rw-r--r-- | tests/join_all.rs | 2 | ||||
-rw-r--r-- | tests/recurse.rs | 9 | ||||
-rw-r--r-- | tests/shared.rs | 30 | ||||
-rw-r--r-- | tests/sink.rs | 73 | ||||
-rw-r--r-- | tests/stream_into_async_read.rs | 4 | ||||
-rw-r--r-- | tests/try_join_all.rs | 2 | ||||
-rw-r--r-- | tests/try_stream.rs | 38 | ||||
-rw-r--r-- | tests_disabled/stream.rs | 4 |
19 files changed, 338 insertions, 470 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..4fd4ba3 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,5 @@ +{ + "git": { + "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6" + } +} @@ -30,23 +30,22 @@ rust_library { } // dependent_library ["feature_list"] -// futures-channel-0.3.8 "alloc,futures-sink,sink,std" -// futures-core-0.3.8 "alloc,std" -// futures-executor-0.3.8 "std" -// futures-io-0.3.8 "std" -// futures-macro-0.3.8 -// futures-sink-0.3.8 "alloc,std" -// futures-task-0.3.8 "alloc,once_cell,std" -// futures-util-0.3.8 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" +// futures-channel-0.3.12 "alloc,futures-sink,sink,std" +// futures-core-0.3.12 "alloc,std" +// futures-executor-0.3.12 "std" +// futures-io-0.3.12 "std" +// futures-macro-0.3.12 +// futures-sink-0.3.12 "alloc,std" +// futures-task-0.3.12 "alloc,once_cell,std" +// futures-util-0.3.12 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" // memchr-2.3.4 "default,std" // once_cell-1.5.2 "alloc,std" -// pin-project-1.0.2 -// pin-project-internal-1.0.2 +// pin-project-lite-0.2.4 // pin-utils-0.1.0 // proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.6 +// proc-macro-nested-0.1.7 // proc-macro2-1.0.24 "default,proc-macro" -// quote-1.0.7 "default,proc-macro" +// quote-1.0.8 "default,proc-macro" // slab-0.4.2 -// syn-1.0.53 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" +// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" // unicode-xid-0.2.1 "default" @@ -13,11 +13,11 @@ [package] edition = "2018" name = "futures" -version = "0.3.7" +version = "0.3.12" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3.7" +documentation = "https://docs.rs/futures/0.3" readme = "../README.md" keywords = ["futures", "async", "future"] categories = ["asynchronous"] @@ -30,35 +30,46 @@ rustdoc-args = ["--cfg", "docsrs"] [package.metadata.playground] features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] [dependencies.futures-channel] -version = "0.3.7" +version = "0.3.12" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures-executor] -version = "0.3.7" +version = "0.3.12" optional = true default-features = false [dependencies.futures-io] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures-sink] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures-task] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures-util] -version = "0.3.7" +version = "0.3.12" features = ["sink"] default-features = false +[dev-dependencies.assert_matches] +version = "1.3.0" + +[dev-dependencies.pin-project] +version = "1.0.1" + +[dev-dependencies.pin-utils] +version = "0.1.0" + +[dev-dependencies.tokio] +version = "0.1.11" [features] alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] @@ -74,5 +85,3 @@ std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futur thread-pool = ["executor", "futures-executor/thread-pool"] unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] write-all-vectored = ["futures-util/write-all-vectored"] -[badges.travis-ci] -repository = "rust-lang/futures-rs" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 342441b..34ca82e 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,31 +1,36 @@ [package] name = "futures" edition = "2018" -version = "0.3.7" +version = "0.3.12" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" readme = "../README.md" keywords = ["futures", "async", "future"] repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3.7" +documentation = "https://docs.rs/futures/0.3" description = """ An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces. """ categories = ["asynchronous"] -[badges] -travis-ci = { repository = "rust-lang/futures-rs" } - [dependencies] -futures-core = { path = "../futures-core", version = "0.3.7", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.7", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.7", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.7", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.7", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.7", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.7", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.12", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.12", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.12", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.12", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.12", default-features = false, features = ["sink"] } + +[dev-dependencies] +pin-utils = "0.1.0" +futures-executor = { path = "../futures-executor", features = ["thread-pool"] } +futures-test = { path = "../futures-test" } +tokio = "0.1.11" +assert_matches = "1.3.0" +pin-project = "1.0.1" [features] default = ["std", "async-await", "executor"] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.7.crate" + value: "https://static.crates.io/crates/futures/futures-0.3.12.crate" } - version: "0.3.7" + version: "0.3.12" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 10 - day: 25 + year: 2021 + month: 2 + day: 9 } } @@ -87,16 +87,8 @@ // It cannot be included in the published code because this lints have false positives in the minimum required version. #![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] - -// mem::take requires Rust 1.40, matches! requires Rust 1.42 -// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941 -// get's implemented. -#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)] - #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#![doc(html_root_url = "https://docs.rs/futures/0.3.7")] - #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] @@ -108,263 +100,61 @@ compile_error!("The `bilock` feature requires the `unstable` feature as an expli #[cfg(all(feature = "read-initializer", not(feature = "unstable")))] compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[doc(hidden)] pub use futures_core::future::{Future, TryFuture}; -#[doc(hidden)] pub use futures_util::future::{FutureExt, TryFutureExt}; +#[doc(hidden)] +pub use futures_core::future::{Future, TryFuture}; +#[doc(hidden)] +pub use futures_util::future::{FutureExt, TryFutureExt}; -#[doc(hidden)] pub use futures_core::stream::{Stream, TryStream}; -#[doc(hidden)] pub use futures_util::stream::{StreamExt, TryStreamExt}; +#[doc(hidden)] +pub use futures_core::stream::{Stream, TryStream}; +#[doc(hidden)] +pub use futures_util::stream::{StreamExt, TryStreamExt}; -#[doc(hidden)] pub use futures_sink::Sink; -#[doc(hidden)] pub use futures_util::sink::SinkExt; +#[doc(hidden)] +pub use futures_sink::Sink; +#[doc(hidden)] +pub use futures_util::sink::SinkExt; #[cfg(feature = "std")] -#[doc(hidden)] pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; +#[doc(hidden)] +pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; #[cfg(feature = "std")] -#[doc(hidden)] pub use futures_util::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt}; +#[doc(hidden)] +pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; // Macro reexports pub use futures_core::ready; // Readiness propagation pub use futures_util::pin_mut; -#[cfg(feature = "async-await")] -pub use futures_util::{pending, poll, join, try_join, select_biased}; // Async-await #[cfg(feature = "std")] #[cfg(feature = "async-await")] pub use futures_util::select; +#[cfg(feature = "async-await")] +pub use futures_util::{join, pending, poll, select_biased, try_join}; // Async-await + +// Module reexports +#[doc(inline)] +pub use futures_util::{future, never, sink, stream, task}; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub mod channel { - //! Cross-task communication. - //! - //! Like threads, concurrent tasks sometimes need to communicate with each - //! other. This module contains two basic abstractions for doing so: - //! - //! - [oneshot](crate::channel::oneshot), a way of sending a single value - //! from one task to another. - //! - [mpsc](crate::channel::mpsc), a multi-producer, single-consumer - //! channel for sending values between tasks, analogous to the - //! similarly-named structure in the standard library. - //! - //! This module is only available when the `std` or `alloc` feature of this - //! library is activated, and it is activated by default. +#[doc(inline)] +pub use futures_channel as channel; +#[cfg(feature = "alloc")] +#[doc(inline)] +pub use futures_util::lock; - pub use futures_channel::oneshot; +#[cfg(feature = "std")] +#[doc(inline)] +pub use futures_util::io; - #[cfg(feature = "std")] - pub use futures_channel::mpsc; -} +#[cfg(feature = "executor")] +#[cfg_attr(docsrs, doc(cfg(feature = "executor")))] +#[doc(inline)] +pub use futures_executor as executor; #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] -pub mod compat { - //! Interop between `futures` 0.1 and 0.3. - //! - //! This module is only available when the `compat` feature of this - //! library is activated. - - pub use futures_util::compat::{ - Compat, - CompatSink, - Compat01As03, - Compat01As03Sink, - Executor01Future, - Executor01As03, - Executor01CompatExt, - Future01CompatExt, - Stream01CompatExt, - Sink01CompatExt, - }; - - #[cfg(feature = "io-compat")] - #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] - pub use futures_util::compat::{ - AsyncRead01CompatExt, - AsyncWrite01CompatExt, - }; -} - -#[cfg(feature = "executor")] -pub mod executor { - //! Task execution. - //! - //! All asynchronous computation occurs within an executor, which is - //! capable of spawning futures as tasks. This module provides several - //! built-in executors, as well as tools for building your own. - //! - //! This module is only available when the `executor` feature of this - //! library is activated, and it is activated by default. - //! - //! # Using a thread pool (M:N task scheduling) - //! - //! Most of the time tasks should be executed on a [thread - //! pool](crate::executor::ThreadPool). A small set of worker threads can - //! handle a very large set of spawned tasks (which are much lighter weight - //! than threads). Tasks spawned onto the pool with the - //! [`spawn_ok()`](crate::executor::ThreadPool::spawn_ok) - //! function will run ambiently on the created threads. - //! - //! # Spawning additional tasks - //! - //! Tasks can be spawned onto a spawner by calling its - //! [`spawn_obj`](crate::task::Spawn::spawn_obj) method directly. - //! In the case of `!Send` futures, - //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj) - //! can be used instead. - //! - //! # Single-threaded execution - //! - //! In addition to thread pools, it's possible to run a task (and the tasks - //! it spawns) entirely within a single thread via the - //! [`LocalPool`](crate::executor::LocalPool) executor. Aside from cutting - //! down on synchronization costs, this executor also makes it possible to - //! spawn non-`Send` tasks, via - //! [`spawn_local_obj`](crate::task::LocalSpawn::spawn_local_obj). - //! The `LocalPool` is best suited for running I/O-bound tasks that do - //! relatively little work between I/O operations. - //! - //! There is also a convenience function - //! [`block_on`](crate::executor::block_on) for simply running a future to - //! completion on the current thread. - - pub use futures_executor::{ - BlockingStream, - Enter, EnterError, - LocalSpawner, LocalPool, - block_on, block_on_stream, enter, - }; - - #[cfg(feature = "thread-pool")] - #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] - pub use futures_executor::{ThreadPool, ThreadPoolBuilder}; -} - -pub mod future { - //! Asynchronous values. - //! - //! This module contains: - //! - //! - The [`Future` trait](crate::future::Future). - //! - The [`FutureExt`](crate::future::FutureExt) trait, which provides - //! adapters for chaining and composing futures. - //! - Top-level future combinators like [`lazy`](crate::future::lazy) which - //! creates a future from a closure that defines its return value, and - //! [`ready`](crate::future::ready), which constructs a future with an - //! immediate defined value. - - pub use futures_core::future::{ - Future, TryFuture, FusedFuture, - }; - - #[cfg(feature = "alloc")] - pub use futures_core::future::{BoxFuture, LocalBoxFuture}; - - pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; - - pub use futures_util::future::{ - lazy, Lazy, - maybe_done, MaybeDone, - pending, Pending, - poll_fn, PollFn, - ready, ok, err, Ready, - join, join3, join4, join5, - Join, Join3, Join4, Join5, - select, Select, - try_join, try_join3, try_join4, try_join5, - TryJoin, TryJoin3, TryJoin4, TryJoin5, - try_select, TrySelect, - Either, - OptionFuture, - - FutureExt, - FlattenStream, Flatten, Fuse, Inspect, IntoStream, Map, Then, UnitError, - NeverError, - - TryFutureExt, - AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse, - InspectOk, InspectErr, TryFlattenStream, UnwrapOrElse, - }; - - #[cfg(feature = "alloc")] - pub use futures_util::future::{ - join_all, JoinAll, - select_all, SelectAll, - try_join_all, TryJoinAll, - select_ok, SelectOk, - }; - - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - #[cfg(feature = "alloc")] - pub use futures_util::future::{ - abortable, Abortable, AbortHandle, AbortRegistration, Aborted, - }; - - #[cfg(feature = "std")] - pub use futures_util::future::{ - Remote, RemoteHandle, - CatchUnwind, Shared, - }; -} - -#[cfg(feature = "std")] -pub mod io { - //! Asynchronous I/O. - //! - //! This module is the asynchronous version of `std::io`. It defines four - //! traits, [`AsyncRead`](crate::io::AsyncRead), - //! [`AsyncWrite`](crate::io::AsyncWrite), - //! [`AsyncSeek`](crate::io::AsyncSeek), and - //! [`AsyncBufRead`](crate::io::AsyncBufRead), which mirror the `Read`, - //! `Write`, `Seek`, and `BufRead` traits of the standard library. However, - //! these traits integrate - //! with the asynchronous task system, so that if an I/O object isn't ready - //! for reading (or writing), the thread is not blocked, and instead the - //! current task is queued to be woken when I/O is ready. - //! - //! In addition, the [`AsyncReadExt`](crate::io::AsyncReadExt), - //! [`AsyncWriteExt`](crate::io::AsyncWriteExt), - //! [`AsyncSeekExt`](crate::io::AsyncSeekExt), and - //! [`AsyncBufReadExt`](crate::io::AsyncBufReadExt) extension traits offer a - //! variety of useful combinators for operating with asynchronous I/O - //! objects, including ways to work with them using futures, streams and - //! sinks. - //! - //! This module is only available when the `std` feature of this - //! library is activated, and it is activated by default. - - pub use futures_io::{ - AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, - IoSlice, IoSliceMut, Result, SeekFrom, - }; - - #[cfg(feature = "read-initializer")] - #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] - pub use futures_io::Initializer; - - pub use futures_util::io::{ - AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, - BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf, - empty, Empty, FillBuf, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, - ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat, - Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, - WriteVectored, - }; -} - -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] -#[cfg(feature = "alloc")] -pub mod lock { - //! Futures-powered synchronization primitives. - //! - //! This module is only available when the `std` or `alloc` feature of this - //! library is activated, and it is activated by default. - - #[cfg(feature = "bilock")] - #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] - pub use futures_util::lock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; - - #[cfg(feature = "std")] - pub use futures_util::lock::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; -} +#[doc(inline)] +pub use futures_util::compat; pub mod prelude { //! A "prelude" for crates using the `futures` crate. @@ -381,170 +171,22 @@ pub mod prelude { //! The prelude may grow over time as additional items see ubiquitous use. pub use crate::future::{self, Future, TryFuture}; - pub use crate::stream::{self, Stream, TryStream}; pub use crate::sink::{self, Sink}; + pub use crate::stream::{self, Stream, TryStream}; #[doc(no_inline)] pub use crate::future::{FutureExt as _, TryFutureExt as _}; #[doc(no_inline)] - pub use crate::stream::{StreamExt as _, TryStreamExt as _}; - #[doc(no_inline)] pub use crate::sink::SinkExt as _; + #[doc(no_inline)] + pub use crate::stream::{StreamExt as _, TryStreamExt as _}; #[cfg(feature = "std")] - pub use crate::io::{ - AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, - }; + pub use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; #[cfg(feature = "std")] #[doc(no_inline)] pub use crate::io::{ - AsyncReadExt as _, AsyncWriteExt as _, AsyncSeekExt as _, AsyncBufReadExt as _, + AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _, }; } - -pub mod sink { - //! Asynchronous sinks. - //! - //! This module contains: - //! - //! - The [`Sink` trait](crate::sink::Sink), which allows you to - //! asynchronously write data. - //! - The [`SinkExt`](crate::sink::SinkExt) trait, which provides adapters - //! for chaining and composing sinks. - - pub use futures_sink::Sink; - - pub use futures_util::sink::{ - Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, - SinkExt, Fanout, Drain, drain, - WithFlatMap, - }; - - #[cfg(feature = "alloc")] - pub use futures_util::sink::Buffer; -} - -pub mod stream { - //! Asynchronous streams. - //! - //! This module contains: - //! - //! - The [`Stream` trait](crate::stream::Stream), for objects that can - //! asynchronously produce a sequence of values. - //! - The [`StreamExt`](crate::stream::StreamExt) trait, which provides - //! adapters for chaining and composing streams. - //! - Top-level stream constructors like [`iter`](crate::stream::iter) - //! which creates a stream from an iterator. - - pub use futures_core::stream::{ - Stream, TryStream, FusedStream, - }; - - #[cfg(feature = "alloc")] - pub use futures_core::stream::{BoxStream, LocalBoxStream}; - - pub use futures_util::stream::{ - iter, Iter, - repeat, Repeat, - empty, Empty, - pending, Pending, - once, Once, - poll_fn, PollFn, - select, Select, - unfold, Unfold, - try_unfold, TryUnfold, - - StreamExt, - Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, - Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, - SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeUntil, - TakeWhile, Then, Zip, - - TryStreamExt, - AndThen, ErrInto, MapOk, MapErr, OrElse, - InspectOk, InspectErr, - TryNext, TryForEach, TryFilter, TryFilterMap, TryFlatten, - TryCollect, TryConcat, TryFold, TrySkipWhile, TryTakeWhile, - IntoStream, - }; - - #[cfg(feature = "alloc")] - pub use futures_util::stream::{ - // For StreamExt: - Chunks, ReadyChunks, - }; - - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - #[cfg(feature = "alloc")] - pub use futures_util::stream::{ - FuturesOrdered, - futures_unordered, FuturesUnordered, - - // For StreamExt: - BufferUnordered, Buffered, ForEachConcurrent, SplitStream, SplitSink, - ReuniteError, - - select_all, SelectAll, - }; - - #[cfg(feature = "std")] - pub use futures_util::stream::{ - // For StreamExt: - CatchUnwind, - }; - - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - #[cfg(feature = "alloc")] - pub use futures_util::stream::{ - // For TryStreamExt: - TryBufferUnordered, TryForEachConcurrent, - }; - - #[cfg(feature = "std")] - pub use futures_util::stream::IntoAsyncRead; -} - -pub mod task { - //! Tools for working with tasks. - //! - //! This module contains: - //! - //! - [`Spawn`](crate::task::Spawn), a trait for spawning new tasks. - //! - [`Context`](crate::task::Context), a context of an asynchronous task, - //! including a handle for waking up the task. - //! - [`Waker`](crate::task::Waker), a handle for waking up a task. - //! - //! The remaining types and traits in the module are used for implementing - //! executors or dealing with synchronization issues around task wakeup. - - pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; - - pub use futures_task::{ - Spawn, LocalSpawn, SpawnError, - FutureObj, LocalFutureObj, UnsafeFutureObj, - }; - - pub use futures_util::task::noop_waker; - - #[cfg(feature = "std")] - pub use futures_util::task::noop_waker_ref; - - #[cfg(feature = "alloc")] - pub use futures_util::task::{SpawnExt, LocalSpawnExt}; - - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - #[cfg(feature = "alloc")] - pub use futures_util::task::{waker, waker_ref, WakerRef, ArcWake}; - - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] - pub use futures_util::task::AtomicWaker; -} - -pub mod never { - //! This module contains the `Never` type. - //! - //! Values of this type can never be created and will never exist. - - pub use futures_util::never::Never; -} diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs index 2830daf..d19a83d 100644 --- a/tests/arc_wake.rs +++ b/tests/arc_wake.rs @@ -7,8 +7,8 @@ mod countingwaker { } impl CountingWaker { - fn new() -> CountingWaker { - CountingWaker { + fn new() -> Self { + Self { nr_wake: Mutex::new(0), } } diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs index b521038..bd586d6 100644 --- a/tests/async_await_macros.rs +++ b/tests/async_await_macros.rs @@ -221,8 +221,8 @@ fn select_on_non_unpin_expressions() { let res = block_on(async { let select_res; select! { - value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, - value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, }; select_res }); @@ -243,9 +243,9 @@ fn select_on_non_unpin_expressions_with_default() { let res = block_on(async { let select_res; select! { - value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, - value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, - default => { select_res = 7 }, + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, + default => select_res = 7, }; select_res }); @@ -265,8 +265,8 @@ fn select_on_non_unpin_size() { let fut = async { let select_res; select! { - value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 }, - value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 }, + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, }; select_res }; @@ -282,8 +282,8 @@ fn select_can_be_used_as_expression() { block_on(async { let res = select! { - x = future::ready(7) => { x }, - y = future::ready(3) => { y + 1 }, + x = future::ready(7) => x, + y = future::ready(3) => y + 1, }; assert!(res == 7 || res == 4); }); @@ -303,7 +303,7 @@ fn select_with_default_can_be_used_as_expression() { block_on(async { let res = select! { x = poll_fn(poll_always_pending::<i32>).fuse() => x, - y = poll_fn(poll_always_pending::<i32>).fuse() => { y + 1 }, + y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1, default => 99, }; assert_eq!(res, 99); @@ -318,8 +318,8 @@ fn select_with_complete_can_be_used_as_expression() { block_on(async { let res = select! { - x = future::pending::<i32>() => { x }, - y = future::pending::<i32>() => { y + 1 }, + x = future::pending::<i32>() => x, + y = future::pending::<i32>() => y + 1, default => 99, complete => 237, }; @@ -328,6 +328,7 @@ fn select_with_complete_can_be_used_as_expression() { } #[test] +#[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { use futures::select; use futures::executor::block_on; @@ -339,8 +340,8 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { block_on(async { let mut value = 234; select! { - x = require_mutable(&mut value).fuse() => { }, - y = async_noop().fuse() => { + _ = require_mutable(&mut value).fuse() => { }, + _ = async_noop().fuse() => { value += 5; }, } @@ -348,6 +349,7 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { } #[test] +#[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { use futures::select; use futures::executor::block_on; @@ -359,8 +361,8 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { block_on(async { let mut value = 234; select! { - x = require_mutable(&mut value).fuse() => { }, - y = async_noop().fuse() => { + _ = require_mutable(&mut value).fuse() => { }, + _ = async_noop().fuse() => { value += 5; }, default => { diff --git a/tests/io_read.rs b/tests/io_read.rs index bc2a434..5902ad0 100644 --- a/tests/io_read.rs +++ b/tests/io_read.rs @@ -10,7 +10,7 @@ mod mock_reader { impl MockReader { pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - MockReader { fun: Box::new(fun) } + Self { fun: Box::new(fun) } } } diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs new file mode 100644 index 0000000..892d463 --- /dev/null +++ b/tests/io_read_to_end.rs @@ -0,0 +1,64 @@ +use futures::{ + io::{self, AsyncRead, AsyncReadExt}, + task::{Context, Poll}, +}; +use std::pin::Pin; + +#[test] +#[should_panic(expected = "assertion failed: n <= buf.len()")] +fn issue2310() { + struct MyRead { + first: bool, + } + + impl MyRead { + pub fn new() -> Self { + MyRead { first: false } + } + } + + impl AsyncRead for MyRead { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context, + _buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(if !self.first { + self.first = true; + // First iteration: return more than the buffer size + Ok(64) + } else { + // Second iteration: indicate that we are done + Ok(0) + }) + } + } + + struct VecWrapper { + inner: Vec<u8>, + } + + impl VecWrapper { + pub fn new() -> Self { + VecWrapper { inner: Vec::new() } + } + } + + impl Drop for VecWrapper { + fn drop(&mut self) { + // Observe uninitialized bytes + println!("{:?}", &self.inner); + // Overwrite heap contents + for b in &mut self.inner { + *b = 0x90; + } + } + } + + futures::executor::block_on(async { + let mut vec = VecWrapper::new(); + let mut read = MyRead::new(); + + read.read_to_end(&mut vec.inner).await.unwrap(); + }) +} diff --git a/tests/io_write.rs b/tests/io_write.rs index 165b3d3..363f32b 100644 --- a/tests/io_write.rs +++ b/tests/io_write.rs @@ -10,7 +10,7 @@ mod mock_writer { impl MockWriter { pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { - MockWriter { fun: Box::new(fun) } + Self { fun: Box::new(fun) } } } diff --git a/tests/join_all.rs b/tests/join_all.rs index 1450679..c322e58 100644 --- a/tests/join_all.rs +++ b/tests/join_all.rs @@ -37,7 +37,7 @@ fn join_all_iter_lifetime() { Box::new(join_all(iter)) } - util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]); + util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]); } #[test] diff --git a/tests/recurse.rs b/tests/recurse.rs index 87a4ff2..a151f1b 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -5,6 +5,11 @@ fn lots() { use std::sync::mpsc; use std::thread; + #[cfg(not(futures_sanitizer))] + const N: i32 = 1_000; + #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136 + const N: i32 = 100; + fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> { let (n, x) = input; if n == 0 { @@ -16,7 +21,7 @@ fn lots() { let (tx, rx) = mpsc::channel(); thread::spawn(|| { - block_on(do_it((1_000, 0)).map(move |x| tx.send(x).unwrap())) + block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())) }); - assert_eq!(500_500, rx.recv().unwrap()); + assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap()); } diff --git a/tests/shared.rs b/tests/shared.rs index b2251a5..cc0c6a2 100644 --- a/tests/shared.rs +++ b/tests/shared.rs @@ -7,7 +7,7 @@ mod count_clone { impl Clone for CountClone { fn clone(&self) -> Self { self.0.set(self.0.get() + 1); - CountClone(self.0.clone()) + Self(self.0.clone()) } } } @@ -144,6 +144,34 @@ fn peek() { } #[test] +fn downgrade() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + + let (tx, rx) = oneshot::channel::<i32>(); + let shared = rx.shared(); + // Since there are outstanding `Shared`s, we can get a `WeakShared`. + let weak = shared.downgrade().unwrap(); + // It should upgrade fine right now. + let mut shared2 = weak.upgrade().unwrap(); + + tx.send(42).unwrap(); + assert_eq!(block_on(shared).unwrap(), 42); + + // We should still be able to get a new `WeakShared` and upgrade it + // because `shared2` is outstanding. + assert!(shared2.downgrade().is_some()); + assert!(weak.upgrade().is_some()); + + assert_eq!(block_on(&mut shared2).unwrap(), 42); + // Now that all `Shared`s have been exhausted, we should not be able + // to get a new `WeakShared` or upgrade an existing one. + assert!(weak.upgrade().is_none()); + assert!(shared2.downgrade().is_none()); +} + +#[test] fn dont_clone_in_single_owner_shared_future() { use futures::channel::oneshot; use futures::executor::block_on; diff --git a/tests/sink.rs b/tests/sink.rs index 8ed201e..597ed34 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -483,6 +483,41 @@ fn with_flush_propagate() { }) } +// test that `Clone` is implemented on `with` sinks +#[test] +fn with_implements_clone() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future; + use futures::{SinkExt, StreamExt}; + + let (mut tx, rx) = mpsc::channel(5); + + { + let mut is_positive = tx + .clone() + .with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); + + let mut is_long = tx + .clone() + .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); + + block_on(is_positive.clone().send(-1)).unwrap(); + block_on(is_long.clone().send("123456")).unwrap(); + block_on(is_long.send("123")).unwrap(); + block_on(is_positive.send(1)).unwrap(); + } + + block_on(tx.send(false)).unwrap(); + + block_on(tx.close()).unwrap(); + + assert_eq!( + block_on(rx.collect::<Vec<_>>()), + vec![false, true, false, true, false] + ); +} + // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { @@ -611,6 +646,44 @@ fn sink_map_err() { } #[test] +fn sink_unfold() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::poll_fn; + use futures::sink::{self, Sink, SinkExt}; + use futures::task::Poll; + + block_on(poll_fn(|cx| { + let (tx, mut rx) = mpsc::channel(1); + let unfold = sink::unfold((), |(), i: i32| { + let mut tx = tx.clone(); + async move { + tx.send(i).await.unwrap(); + Ok::<_, String>(()) + } + }); + futures::pin_mut!(unfold); + assert_eq!(unfold.as_mut().start_send(1), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(()))); + assert_eq!(rx.try_next().unwrap(), Some(1)); + + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(2), Ok(())); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(3), Ok(())); + assert_eq!(rx.try_next().unwrap(), Some(2)); + assert!(rx.try_next().is_err()); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(4), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full + assert_eq!(rx.try_next().unwrap(), Some(3)); + assert_eq!(rx.try_next().unwrap(), Some(4)); + + Poll::Ready(()) + })) +} + +#[test] fn err_into() { use futures::channel::mpsc; use futures::sink::{Sink, SinkErrInto, SinkExt}; diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs index eb78b59..222c985 100644 --- a/tests/stream_into_async_read.rs +++ b/tests/stream_into_async_read.rs @@ -52,7 +52,7 @@ fn test_into_async_read() { } #[test] -fn test_into_async_bufread() -> std::io::Result<()> { +fn test_into_async_bufread() { use core::pin::Pin; use futures::io::AsyncBufRead; use futures::stream::{self, TryStreamExt}; @@ -97,6 +97,4 @@ fn test_into_async_bufread() -> std::io::Result<()> { reader.as_mut().consume(3); assert_fill_buf!(reader, &[][..]); - - Ok(()) } diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs index 3de679b..8e579a2 100644 --- a/tests/try_join_all.rs +++ b/tests/try_join_all.rs @@ -42,7 +42,7 @@ fn try_join_all_iter_lifetime() { Box::new(try_join_all(iter)) } - assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1])); + assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); } #[test] diff --git a/tests/try_stream.rs b/tests/try_stream.rs new file mode 100644 index 0000000..194e74d --- /dev/null +++ b/tests/try_stream.rs @@ -0,0 +1,38 @@ +use futures::{ + stream::{self, StreamExt, TryStreamExt}, + task::Poll, +}; +use futures_test::task::noop_context; + +#[test] +fn try_filter_map_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_filter_map(|v| async move { Err::<Option<()>, _>(v) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_skip_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_skip_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_take_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_take_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs index 4eaf12e..ef0676d 100644 --- a/tests_disabled/stream.rs +++ b/tests_disabled/stream.rs @@ -66,8 +66,8 @@ fn map_err() { struct FromErrTest(u32); impl From<u32> for FromErrTest { - fn from(i: u32) -> FromErrTest { - FromErrTest(i) + fn from(i: u32) -> Self { + Self(i) } } |