diff options
author | Joel Galenson <jgalenson@google.com> | 2021-04-02 16:10:11 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-04-02 16:10:11 +0000 |
commit | cf1d92ef037819957a5a01d24e222b592cadb284 (patch) | |
tree | 931c1432113b74ac060d81ede8b4cbf7d3abfd76 | |
parent | 2eb284d7dbdf6e1f022ab60aa0d0ae4d44cb2041 (diff) | |
parent | 1fdff8fd4c4296e2707526c7f2cd2fd78447a7b1 (diff) | |
download | futures-util-cf1d92ef037819957a5a01d24e222b592cadb284.tar.gz |
Upgrade rust/crates/futures-util to 0.3.13 am: 1fdff8fd4c
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1662922
Change-Id: I0247d2fa66aa9ccee39d7219a0ae75468c013ee5
50 files changed, 368 insertions, 185 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 4fd4ba3..f3ad3ab 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6" + "sha1": "c91f8691672c7401b1923ab00bf138975c99391a" } } @@ -1,4 +1,5 @@ // This file is generated by cargo2android.py --run --device --features channel,default,io,memchr,sink --dependencies --tests --patch=patches/Android.bp.patch. +// Do not modify this file as changes will be overridden on upgrade. package { default_applicable_licenses: ["external_rust_crates_futures-util_license"], @@ -141,7 +142,7 @@ rust_library { // dependent_library ["feature_list"] // autocfg-1.0.1 -// byteorder-1.4.2 "default,std" +// byteorder-1.4.3 "default,std" // bytes-0.4.12 // cfg-if-0.1.10 // cfg-if-1.0.0 @@ -150,16 +151,16 @@ rust_library { // crossbeam-queue-0.2.3 "default,std" // crossbeam-utils-0.7.2 "default,lazy_static,std" // fnv-1.0.7 "default,std" -// futures-0.1.30 "default,use_std,with-deprecated" -// futures-channel-0.3.12 "alloc,std" -// futures-core-0.3.12 "alloc,std" -// futures-io-0.3.12 "std" -// futures-macro-0.3.12 -// futures-sink-0.3.12 -// futures-task-0.3.12 "alloc,once_cell,std" +// futures-0.1.31 "default,use_std,with-deprecated" +// futures-channel-0.3.13 "alloc,std" +// futures-core-0.3.13 "alloc,std" +// futures-io-0.3.13 "std" +// futures-macro-0.3.13 +// futures-sink-0.3.13 +// futures-task-0.3.13 "alloc,std" // iovec-0.1.4 // lazy_static-1.4.0 -// libc-0.2.86 "default,std" +// libc-0.2.92 "default,std" // lock_api-0.3.4 // log-0.4.14 // maybe-uninit-2.0.0 @@ -169,22 +170,21 @@ rust_library { // mio-uds-0.6.8 // net2-0.2.37 "default,duration" // num_cpus-1.13.0 -// once_cell-1.5.2 "alloc,std" // parking_lot-0.9.0 "default" // parking_lot_core-0.6.2 -// pin-project-lite-0.2.4 +// pin-project-lite-0.2.6 // pin-utils-0.1.0 // proc-macro-hack-0.5.19 // proc-macro-nested-0.1.7 -// proc-macro2-1.0.24 "default,proc-macro" -// quote-1.0.8 "default,proc-macro" +// proc-macro2-1.0.26 "default,proc-macro" +// quote-1.0.9 "default,proc-macro" // rustc_version-0.2.3 // scopeguard-1.1.0 // semver-0.9.0 "default" // semver-parser-0.7.0 // slab-0.4.2 // smallvec-0.6.14 "default,std" -// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" +// syn-1.0.68 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" // tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds" // tokio-codec-0.1.2 // tokio-current-thread-0.1.7 @@ -13,7 +13,7 @@ [package] edition = "2018" name = "futures-util" -version = "0.3.12" +version = "0.3.13" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "Common utilities and extension traits for the futures-rs library.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -24,33 +24,33 @@ repository = "https://github.com/rust-lang/futures-rs" all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-channel] -version = "0.3.12" +version = "0.3.13" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.12" +version = "0.3.13" default-features = false [dependencies.futures-io] -version = "0.3.12" +version = "0.3.13" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.12" +version = "=0.3.13" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.12" +version = "0.3.13" optional = true default-features = false [dependencies.futures-task] -version = "0.3.12" +version = "0.3.13" default-features = false [dependencies.futures_01] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 010902b..b7cc193 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures-util" edition = "2018" -version = "0.3.12" +version = "0.3.13" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -33,12 +33,12 @@ read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] write-all-vectored = ["io"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.12", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.12", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.12", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.12", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.13", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.13", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.13", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.13", default-features = false, optional = true } proc-macro-hack = { version = "0.5.19", optional = true } proc-macro-nested = { version = "0.1.2", optional = true } slab = { version = "0.4.2", optional = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.12.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.13.crate" } - version: "0.3.12" + version: "0.3.13" license_type: NOTICE last_upgrade_date { year: 2021 - month: 2 - day: 9 + month: 4 + day: 1 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index 7e10dd0..203632d 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -2,7 +2,55 @@ { "presubmit": [ { + "name": "anyhow_device_test_tests_test_boxed" + }, + { + "name": "anyhow_device_test_tests_test_ffi" + }, + { "name": "futures-util_device_test_src_lib" + }, + { + "name": "anyhow_device_test_tests_test_chain" + }, + { + "name": "anyhow_device_test_tests_test_convert" + }, + { + "name": "anyhow_device_test_tests_test_source" + }, + { + "name": "tokio-test_device_test_tests_io" + }, + { + "name": "tokio-test_device_test_tests_macros" + }, + { + "name": "anyhow_device_test_tests_test_context" + }, + { + "name": "anyhow_device_test_tests_test_autotrait" + }, + { + "name": "tokio-test_device_test_src_lib" + }, + { + "name": "anyhow_device_test_tests_test_macros" + }, + { + "name": "anyhow_device_test_src_lib" + }, + { + "name": "tokio-test_device_test_tests_block_on" + }, + { + "name": "anyhow_device_test_tests_test_fmt" + }, + { + "name": "anyhow_device_test_tests_test_downcast" + }, + { + "name": "anyhow_device_test_tests_test_repr" } ] } diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs index d6ddb6d..965d9fb 100644 --- a/src/async_await/join_mod.rs +++ b/src/async_await/join_mod.rs @@ -8,7 +8,7 @@ macro_rules! document_join_macro { /// of all results once complete. /// /// While `join!(a, b)` is similar to `(a.await, b.await)`, - /// `join!` polls both futures concurrently and therefore is more efficent. + /// `join!` polls both futures concurrently and therefore is more efficient. /// /// This macro is only usable inside of async functions, closures, and blocks. /// It is also gated behind the `async-await` feature of this library, which is diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs index ac70a53..b5782df 100644 --- a/src/async_await/poll.rs +++ b/src/async_await/poll.rs @@ -9,6 +9,10 @@ use futures_core::task::{Context, Poll}; /// This macro is only usable inside of `async` functions, closures, and blocks. /// It is also gated behind the `async-await` feature of this library, which is /// activated by default. +/// +/// If you need the result of polling a [`Stream`](crate::stream::Stream), +/// you can use this macro with the [`next`](crate::stream::StreamExt::next) method: +/// `poll!(stream.next())`. #[macro_export] macro_rules! poll { ($x:expr $(,)?) => { diff --git a/src/future/abortable.rs b/src/future/abortable.rs index 1fc75b0..3f2e5a0 100644 --- a/src/future/abortable.rs +++ b/src/future/abortable.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::task::AtomicWaker; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -39,10 +40,10 @@ impl<Fut> Abortable<Fut> where Fut: Future { /// # }); /// ``` pub fn new(future: Fut, reg: AbortRegistration) -> Self { - Self { + assert_future::<Result<Fut::Output, Aborted>, _>(Self { future, inner: reg.inner, - } + }) } } diff --git a/src/future/either.rs b/src/future/either.rs index a1b9f0a..5f5b614 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -101,6 +101,13 @@ where Either::Right(x) => x.poll_next(cx), } } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + Either::Left(x) => x.size_hint(), + Either::Right(x) => x.size_hint(), + } + } } impl<A, B> FusedStream for Either<A, B> diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index f01d346..c11d108 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -7,10 +7,10 @@ use alloc::boxed::Box; use core::pin::Pin; -use crate::future::{assert_future, Either}; -use crate::stream::assert_stream; use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; +use crate::future::{assert_future, Either}; use crate::never::Never; +use crate::stream::assert_stream; #[cfg(feature = "alloc")] use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::{ @@ -506,7 +506,8 @@ pub trait FutureExt: Future { where Self: Sized, { - remote_handle::remote_handle(self) + let (wrapped, handle) = remote_handle::remote_handle(self); + (assert_future::<(), _>(wrapped), handle) } /// Wrap the future in a Box, pinning it. diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 53635b5..74311a0 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -126,6 +126,32 @@ where } None } + + /// Gets the number of strong pointers to this allocation. + /// + /// Returns [`None`] if it has already been polled to completion. + /// + /// # Safety + /// + /// This method by itself is safe, but using it correctly requires extra care. Another thread + /// can change the strong count at any time, including potentially between calling this method + /// and acting on the result. + pub fn strong_count(&self) -> Option<usize> { + self.inner.as_ref().map(|arc| Arc::strong_count(arc)) + } + + /// Gets the number of weak pointers to this allocation. + /// + /// Returns [`None`] if it has already been polled to completion. + /// + /// # Safety + /// + /// This method by itself is safe, but using it correctly requires extra care. Another thread + /// can change the weak count at any time, including potentially between calling this method + /// and acting on the result. + pub fn weak_count(&self) -> Option<usize> { + self.inner.as_ref().map(|arc| Arc::weak_count(arc)) + } } impl<Fut> Inner<Fut> @@ -287,10 +313,8 @@ where // Wake all tasks and drop the slab let mut wakers_guard = inner.notifier.wakers.lock().unwrap(); let mut wakers = wakers_guard.take().unwrap(); - for opt_waker in wakers.drain() { - if let Some(waker) = opt_waker { - waker.wake(); - } + for waker in wakers.drain().flatten() { + waker.wake(); } drop(_reset); // Make borrow checker happy diff --git a/src/future/join.rs b/src/future/join.rs index cfe53a7..a818343 100644 --- a/src/future/join.rs +++ b/src/future/join.rs @@ -1,14 +1,13 @@ #![allow(non_snake_case)] -use crate::future::{MaybeDone, maybe_done}; +use super::assert_future; +use crate::future::{maybe_done, MaybeDone}; use core::fmt; use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; -use super::assert_future; - macro_rules! generate { ($( $(#[$doc:meta])* @@ -144,7 +143,8 @@ where Fut2: Future, Fut3: Future, { - Join3::new(future1, future2, future3) + let f = Join3::new(future1, future2, future3); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f) } /// Same as [`join`](join()), but with more futures. @@ -176,7 +176,8 @@ where Fut3: Future, Fut4: Future, { - Join4::new(future1, future2, future3, future4) + let f = Join4::new(future1, future2, future3, future4); + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f) } /// Same as [`join`](join()), but with more futures. @@ -211,5 +212,15 @@ where Fut4: Future, Fut5: Future, { - Join5::new(future1, future2, future3, future4, future5) + let f = Join5::new(future1, future2, future3, future4, future5); + assert_future::< + ( + Fut1::Output, + Fut2::Output, + Fut3::Output, + Fut4::Output, + Fut5::Output, + ), + _, + >(f) } diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 0c8357c..7ccf869 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -10,7 +10,7 @@ use core::task::{Context, Poll}; use alloc::boxed::Box; use alloc::vec::Vec; -use super::MaybeDone; +use super::{MaybeDone, assert_future}; fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's @@ -85,7 +85,7 @@ where I::Item: Future, { let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect(); - JoinAll { elems: elems.into() } + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() }) } impl<F> Future for JoinAll<F> diff --git a/src/future/lazy.rs b/src/future/lazy.rs index 409717a..42812d3 100644 --- a/src/future/lazy.rs +++ b/src/future/lazy.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; @@ -34,7 +35,7 @@ impl<F> Unpin for Lazy<F> {} pub fn lazy<F, R>(f: F) -> Lazy<F> where F: FnOnce(&mut Context<'_>) -> R, { - Lazy { f: Some(f) } + assert_future::<R, _>(Lazy { f: Some(f) }) } impl<F, R> FusedFuture for Lazy<F> diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs index bb5579e..26e6c27 100644 --- a/src/future/maybe_done.rs +++ b/src/future/maybe_done.rs @@ -1,5 +1,6 @@ //! Definition of the MaybeDone combinator +use super::assert_future; use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; @@ -40,7 +41,7 @@ impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} /// # }); /// ``` pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { - MaybeDone::Future(future) + assert_future::<(), _>(MaybeDone::Future(future)) } impl<Fut: Future> MaybeDone<Fut> { diff --git a/src/future/mod.rs b/src/future/mod.rs index ab29823..84e457c 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -9,9 +9,12 @@ //! from a closure that defines its return value, and [`ready`](ready()), //! which constructs a future with an immediate defined value. +#[doc(no_inline)] +pub use core::future::Future; + #[cfg(feature = "alloc")] pub use futures_core::future::{BoxFuture, LocalBoxFuture}; -pub use futures_core::future::{FusedFuture, Future, TryFuture}; +pub use futures_core::future::{FusedFuture, TryFuture}; pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; // Extension traits and combinators diff --git a/src/future/pending.rs b/src/future/pending.rs index 5a7bbb8..4311b9a 100644 --- a/src/future/pending.rs +++ b/src/future/pending.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::marker; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; @@ -33,9 +34,9 @@ impl<T> FusedFuture for Pending<T> { /// # }); /// ``` pub fn pending<T>() -> Pending<T> { - Pending { + assert_future::<T, _>(Pending { _data: marker::PhantomData, - } + }) } impl<T> Future for Pending<T> { diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs index b7b10be..6ac1ab8 100644 --- a/src/future/poll_fn.rs +++ b/src/future/poll_fn.rs @@ -1,5 +1,6 @@ //! Definition of the `PollFn` adapter combinator +use super::assert_future; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -36,7 +37,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<T> { - PollFn { f } + assert_future::<T, _>(PollFn { f }) } impl<F> fmt::Debug for PollFn<F> { diff --git a/src/future/ready.rs b/src/future/ready.rs index 35f01c9..e3d791b 100644 --- a/src/future/ready.rs +++ b/src/future/ready.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; @@ -45,7 +46,7 @@ impl<T> Future for Ready<T> { /// # }); /// ``` pub fn ready<T>(t: T) -> Ready<T> { - Ready(Some(t)) + assert_future::<T, _>(Ready(Some(t))) } /// Create a future that is immediately ready with a success value. diff --git a/src/future/select.rs b/src/future/select.rs index bc24779..043ed17 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -1,3 +1,4 @@ +use super::assert_future; use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; @@ -31,25 +32,33 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// /// ``` /// # futures::executor::block_on(async { -/// use futures::future::{self, Either}; -/// use futures::pin_mut; -/// -/// // These two futures have different types even though their outputs have the same type -/// let future1 = async { 1 }; -/// let future2 = async { 2 }; +/// use futures::{ +/// pin_mut, +/// future::Either, +/// future::self, +/// }; +/// +/// // These two futures have different types even though their outputs have the same type. +/// let future1 = async { +/// future::pending::<()>().await; // will never finish +/// 1 +/// }; +/// let future2 = async { +/// future::ready(2).await +/// }; /// /// // 'select' requires Future + Unpin bounds /// pin_mut!(future1); /// pin_mut!(future2); /// /// let value = match future::select(future1, future2).await { -/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` -/// // `_` represents `future2` +/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` +/// // `_` represents `future2` /// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2` /// // `_` represents `future1` /// }; /// -/// assert!(value == 1 || value == 2); +/// assert!(value == 2); /// # }); /// ``` /// @@ -75,7 +84,7 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} pub fn select<A, B>(future1: A, future2: B) -> Select<A, B> where A: Future + Unpin, B: Future + Unpin { - Select { inner: Some((future1, future2)) } + assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { inner: Some((future1, future2)) }) } impl<A, B> Future for Select<A, B> diff --git a/src/future/select_all.rs b/src/future/select_all.rs index 9f7fb24..0db90a7 100644 --- a/src/future/select_all.rs +++ b/src/future/select_all.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::future::FutureExt; use core::iter::FromIterator; use core::mem; @@ -38,7 +39,7 @@ pub fn select_all<I>(iter: I) -> SelectAll<I::Item> inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty()); - ret + assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret) } impl<Fut: Future + Unpin> Future for SelectAll<Fut> { diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs index 7f4f4d6..52d393c 100644 --- a/src/future/select_ok.rs +++ b/src/future/select_ok.rs @@ -1,3 +1,4 @@ +use super::assert_future; use crate::future::TryFutureExt; use core::iter::FromIterator; use core::mem; @@ -36,7 +37,7 @@ pub fn select_ok<I>(iter: I) -> SelectOk<I::Item> inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); - ret + assert_future::<Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, _>(ret) } impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs index 1ce01d2..fb3bdd8 100644 --- a/src/future/try_future/mod.rs +++ b/src/future/try_future/mod.rs @@ -173,7 +173,7 @@ pub trait TryFutureExt: TryFuture { Self::Ok: Sink<Item, Error = Self::Error>, Self: Sized, { - FlattenSink::new(self) + crate::sink::assert_sink::<Item, Self::Error, _>(FlattenSink::new(self)) } /// Maps this future's success value to a different value. @@ -501,7 +501,7 @@ pub trait TryFutureExt: TryFuture { Self::Ok: TryFuture<Error = Self::Error>, Self: Sized, { - TryFlatten::new(self) + assert_future::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryFlatten::new(self)) } /// Flatten the execution of this future when the successful result of this @@ -539,7 +539,7 @@ pub trait TryFutureExt: TryFuture { )) } - /// Unwraps this future's ouput, producing a future with this future's + /// Unwraps this future's output, producing a future with this future's /// [`Ok`](TryFuture::Ok) type as its /// [`Output`](std::future::Future::Output) type. /// @@ -569,8 +569,8 @@ pub trait TryFutureExt: TryFuture { assert_future::<Self::Ok, _>(UnwrapOrElse::new(self, f)) } - /// Wraps a [`TryFuture`] into a future compatable with libraries using - /// futures 0.1 future definitons. Requires the `compat` feature to enable. + /// Wraps a [`TryFuture`] into a future compatible with libraries using + /// futures 0.1 future definitions. Requires the `compat` feature to enable. #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> Compat<Self> diff --git a/src/future/try_join.rs b/src/future/try_join.rs index 25ccdde..6af1f0c 100644 --- a/src/future/try_join.rs +++ b/src/future/try_join.rs @@ -1,6 +1,6 @@ #![allow(non_snake_case)] -use crate::future::{TryMaybeDone, try_maybe_done}; +use crate::future::{assert_future, try_maybe_done, TryMaybeDone}; use core::fmt; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; @@ -108,7 +108,7 @@ generate! { /// /// This function will return a new future which awaits both futures to /// complete. If successful, the returned future will finish with a tuple of -/// both results. If unsuccesful, it will complete with the first error +/// both results. If unsuccessful, it will complete with the first error /// encountered. /// /// Note that this function consumes the passed futures and returns a @@ -150,7 +150,7 @@ where Fut1: TryFuture, Fut2: TryFuture<Error = Fut1::Error>, { - TryJoin::new(future1, future2) + assert_future::<Result<(Fut1::Ok, Fut2::Ok), Fut1::Error>, _>(TryJoin::new(future1, future2)) } /// Same as [`try_join`](try_join()), but with more futures. @@ -179,7 +179,9 @@ where Fut2: TryFuture<Error = Fut1::Error>, Fut3: TryFuture<Error = Fut1::Error>, { - TryJoin3::new(future1, future2, future3) + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok), Fut1::Error>, _>(TryJoin3::new( + future1, future2, future3, + )) } /// Same as [`try_join`](try_join()), but with more futures. @@ -211,7 +213,9 @@ where Fut3: TryFuture<Error = Fut1::Error>, Fut4: TryFuture<Error = Fut1::Error>, { - TryJoin4::new(future1, future2, future3, future4) + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok), Fut1::Error>, _>( + TryJoin4::new(future1, future2, future3, future4), + ) } /// Same as [`try_join`](try_join()), but with more futures. @@ -246,5 +250,7 @@ where Fut4: TryFuture<Error = Fut1::Error>, Fut5: TryFuture<Error = Fut1::Error>, { - TryJoin5::new(future1, future2, future3, future4, future5) + assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok, Fut5::Ok), Fut1::Error>, _>( + TryJoin5::new(future1, future2, future3, future4, future5), + ) } diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 4de0a79..371f753 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -10,7 +10,7 @@ use core::task::{Context, Poll}; use alloc::boxed::Box; use alloc::vec::Vec; -use super::{TryFuture, TryMaybeDone}; +use super::{assert_future, TryFuture, TryMaybeDone}; fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's @@ -93,9 +93,9 @@ where I::Item: TryFuture, { let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - TryJoinAll { + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(TryJoinAll { elems: elems.into(), - } + }) } impl<F> Future for TryJoinAll<F> diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs index 90067e9..dfd2900 100644 --- a/src/future/try_maybe_done.rs +++ b/src/future/try_maybe_done.rs @@ -1,5 +1,6 @@ //! Definition of the TryMaybeDone combinator +use super::assert_future; use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; @@ -25,7 +26,7 @@ impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {} /// Wraps a future into a `TryMaybeDone` pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> { - TryMaybeDone::Future(future) + assert_future::<Result<(), Fut::Error>, _>(TryMaybeDone::Future(future)) } impl<Fut: TryFuture> TryMaybeDone<Fut> { diff --git a/src/future/try_select.rs b/src/future/try_select.rs index 56564f5..b26eed3 100644 --- a/src/future/try_select.rs +++ b/src/future/try_select.rs @@ -50,7 +50,10 @@ impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B> where A: TryFuture + Unpin, B: TryFuture + Unpin { - TrySelect { inner: Some((future1, future2)) } + super::assert_future::<Result< + Either<(A::Ok, B), (B::Ok, A)>, + Either<(A::Error, B), (B::Error, A)>, + >, _>(TrySelect { inner: Some((future1, future2)) }) } impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> diff --git a/src/io/cursor.rs b/src/io/cursor.rs index b11dbf5..084fb08 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -13,7 +13,7 @@ use std::pin::Pin; /// allowing these buffers to be used anywhere you might use a reader or writer /// that does actual I/O. /// -/// The standard library implements some I/O traits on various types which +/// This library implements some I/O traits on various types which /// are commonly used as a buffer, like `Cursor<`[`Vec`]`<u8>>` and /// `Cursor<`[`&[u8]`][bytes]`>`. /// diff --git a/src/io/mod.rs b/src/io/mod.rs index a7e2add..1437930 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -19,15 +19,20 @@ #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] use crate::compat::Compat; +use crate::future::assert_future; +use crate::stream::assert_stream; use std::{ptr, pin::Pin}; -pub use futures_io::{ - AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, - IoSlice, IoSliceMut, Result, SeekFrom, -}; +// Re-export some types from `std::io` so that users don't have to deal +// with conflicts when `use`ing `futures::io` and `std::io`. +#[doc(no_inline)] +pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; +#[doc(no_inline)] #[cfg(feature = "read-initializer")] #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] -pub use futures_io::Initializer; +pub use std::io::Initializer; + +pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; // used by `BufReader` and `BufWriter` // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 @@ -173,7 +178,7 @@ pub trait AsyncReadExt: AsyncRead { Self: Sized, R: AsyncRead, { - Chain::new(self, next) + assert_read(Chain::new(self, next)) } /// Tries to read some bytes directly into the given `buf` in asynchronous @@ -203,7 +208,7 @@ pub trait AsyncReadExt: AsyncRead { fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin, { - Read::new(self, buf) + assert_future::<Result<usize>, _>(Read::new(self, buf)) } /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored @@ -214,7 +219,7 @@ pub trait AsyncReadExt: AsyncRead { fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin, { - ReadVectored::new(self, bufs) + assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs)) } /// Creates a future which will read exactly enough bytes to fill `buf`, @@ -260,7 +265,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadExact<'a, Self> where Self: Unpin, { - ReadExact::new(self, buf) + assert_future::<Result<()>, _>(ReadExact::new(self, buf)) } /// Creates a future which will read all the bytes from this `AsyncRead`. @@ -288,7 +293,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadToEnd<'a, Self> where Self: Unpin, { - ReadToEnd::new(self, buf) + assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf)) } /// Creates a future which will read all the bytes from this `AsyncRead`. @@ -316,7 +321,7 @@ pub trait AsyncReadExt: AsyncRead { ) -> ReadToString<'a, Self> where Self: Unpin, { - ReadToString::new(self, buf) + assert_future::<Result<usize>, _>(ReadToString::new(self, buf)) } /// Helper method for splitting this read/write object into two halves. @@ -351,7 +356,8 @@ pub trait AsyncReadExt: AsyncRead { fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized, { - split::split(self) + let (r, w) = split::split(self); + (assert_read(r), assert_write(w)) } /// Creates an AsyncRead adapter which will read at most `limit` bytes @@ -376,7 +382,7 @@ pub trait AsyncReadExt: AsyncRead { fn take(self, limit: u64) -> Take<Self> where Self: Sized { - Take::new(self, limit) + assert_read(Take::new(self, limit)) } /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be @@ -423,14 +429,14 @@ pub trait AsyncWriteExt: AsyncWrite { fn flush(&mut self) -> Flush<'_, Self> where Self: Unpin, { - Flush::new(self) + assert_future::<Result<()>, _>(Flush::new(self)) } /// Creates a future which will entirely close this `AsyncWrite`. fn close(&mut self) -> Close<'_, Self> where Self: Unpin, { - Close::new(self) + assert_future::<Result<()>, _>(Close::new(self)) } /// Creates a future which will write bytes from `buf` into the object. @@ -440,7 +446,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin, { - Write::new(self, buf) + assert_future::<Result<usize>, _>(Write::new(self, buf)) } /// Creates a future which will write bytes from `bufs` into the object using vectored @@ -451,7 +457,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin, { - WriteVectored::new(self, bufs) + assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs)) } /// Write data into this object. @@ -477,7 +483,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin, { - WriteAll::new(self, buf) + assert_future::<Result<()>, _>(WriteAll::new(self, buf)) } /// Attempts to write multiple buffers into this writer. @@ -532,7 +538,7 @@ pub trait AsyncWriteExt: AsyncWrite { where Self: Unpin, { - WriteAllVectored::new(self, bufs) + assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs)) } /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be @@ -577,7 +583,7 @@ pub trait AsyncWriteExt: AsyncWrite { fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized, { - IntoSink::new(self) + crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self)) } } @@ -593,7 +599,7 @@ pub trait AsyncSeekExt: AsyncSeek { fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin, { - Seek::new(self, pos) + assert_future::<Result<u64>, _>(Seek::new(self, pos)) } } @@ -627,7 +633,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin, { - FillBuf::new(self) + assert_future::<Result<&[u8]>, _>(FillBuf::new(self)) } /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types. @@ -701,7 +707,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { ) -> ReadUntil<'a, Self> where Self: Unpin, { - ReadUntil::new(self, byte, buf) + assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf)) } /// Creates a future which will read all the bytes associated with this I/O @@ -758,7 +764,7 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin, { - ReadLine::new(self, buf) + assert_future::<Result<usize>, _>(ReadLine::new(self, buf)) } /// Returns a stream over the lines of this reader. @@ -796,8 +802,25 @@ pub trait AsyncBufReadExt: AsyncBufRead { fn lines(self) -> Lines<Self> where Self: Sized, { - Lines::new(self) + assert_stream::<Result<String>, _>(Lines::new(self)) } } impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} + +// Just a helper function to ensure the reader we're returning all have the +// right implementations. +pub(crate) fn assert_read<R>(reader: R) -> R +where + R: AsyncRead, +{ + reader +} +// Just a helper function to ensure the writer we're returning all have the +// right implementations. +pub(crate) fn assert_write<W>(writer: W) -> W +where + W: AsyncWrite, +{ + writer +} diff --git a/src/sink/close.rs b/src/sink/close.rs index 4421d10..4fc99f5 100644 --- a/src/sink/close.rs +++ b/src/sink/close.rs @@ -16,7 +16,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {} /// A future that completes when the sink has finished closing. /// -/// The sink itself is returned after closeing is complete. +/// The sink itself is returned after closing is complete. impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si) -> Self { Self { diff --git a/src/sink/drain.rs b/src/sink/drain.rs index 46de83b..33c5b31 100644 --- a/src/sink/drain.rs +++ b/src/sink/drain.rs @@ -1,3 +1,4 @@ +use super::assert_sink; use crate::never::Never; use core::marker::PhantomData; use core::pin::Pin; @@ -26,7 +27,7 @@ pub struct Drain<T> { /// # Ok::<(), futures::never::Never>(()) }).unwrap(); /// ``` pub fn drain<T>() -> Drain<T> { - Drain { marker: PhantomData } + assert_sink::<T, Never, _>(Drain { marker: PhantomData }) } impl<T> Unpin for Drain<T> {} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 1a062d0..e5b515b 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -6,7 +6,7 @@ //! - The [`SinkExt`] trait, which provides adapters for chaining and composing //! sinks. -use crate::future::Either; +use crate::future::{assert_future, Either}; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, TryStream}; @@ -81,7 +81,7 @@ pub trait SinkExt<Item>: Sink<Item> { E: From<Self::Error>, Self: Sized, { - With::new(self, f) + assert_sink::<U, E, _>(With::new(self, f)) } /// Composes a function *in front of* the sink. @@ -122,7 +122,7 @@ pub trait SinkExt<Item>: Sink<Item> { St: Stream<Item = Result<Item, Self::Error>>, Self: Sized, { - WithFlatMap::new(self, f) + assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f)) } /* @@ -145,7 +145,7 @@ pub trait SinkExt<Item>: Sink<Item> { F: FnOnce(Self::Error) -> E, Self: Sized, { - SinkMapErr::new(self, f) + assert_sink::<Item, E, _>(SinkMapErr::new(self, f)) } /// Map this sink's error to a different error type using the `Into` trait. @@ -156,7 +156,7 @@ pub trait SinkExt<Item>: Sink<Item> { Self: Sized, Self::Error: Into<E>, { - SinkErrInto::new(self) + assert_sink::<Item, E, _>(SinkErrInto::new(self)) } /// Adds a fixed-size buffer to the current sink. @@ -176,7 +176,7 @@ pub trait SinkExt<Item>: Sink<Item> { where Self: Sized, { - Buffer::new(self, capacity) + assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity)) } /// Close the sink. @@ -184,7 +184,7 @@ pub trait SinkExt<Item>: Sink<Item> { where Self: Unpin, { - Close::new(self) + assert_future::<Result<(), Self::Error>, _>(Close::new(self)) } /// Fanout items to multiple sinks. @@ -197,7 +197,7 @@ pub trait SinkExt<Item>: Sink<Item> { Item: Clone, Si: Sink<Item, Error = Self::Error>, { - Fanout::new(self, other) + assert_sink::<Item, Self::Error, _>(Fanout::new(self, other)) } /// Flush the sink, processing all pending items. @@ -208,7 +208,7 @@ pub trait SinkExt<Item>: Sink<Item> { where Self: Unpin, { - Flush::new(self) + assert_future::<Result<(), Self::Error>, _>(Flush::new(self)) } /// A future that completes after the given item has been fully processed @@ -221,7 +221,7 @@ pub trait SinkExt<Item>: Sink<Item> { where Self: Unpin, { - Send::new(self, item) + assert_future::<Result<(), Self::Error>, _>(Send::new(self, item)) } /// A future that completes after the given item has been received @@ -231,9 +231,10 @@ pub trait SinkExt<Item>: Sink<Item> { /// It is the caller's responsibility to ensure all pending items /// are processed, which can be done via `flush` or `close`. fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { - Feed::new(self, item) + assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item)) } /// A future that completes after the given stream has been fully processed @@ -250,8 +251,11 @@ pub trait SinkExt<Item>: Sink<Item> { fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, + // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized, Self: Unpin, { + // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>` + // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream)) SendAll::new(self, stream) } @@ -265,7 +269,7 @@ pub trait SinkExt<Item>: Sink<Item> { Si2: Sink<Item, Error = Self::Error>, Self: Sized, { - Either::Left(self) + assert_sink::<Item, Self::Error, _>(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant @@ -278,7 +282,7 @@ pub trait SinkExt<Item>: Sink<Item> { Si1: Sink<Item, Error = Self::Error>, Self: Sized, { - Either::Right(self) + assert_sink::<Item, Self::Error, _>(Either::Right(self)) } /// Wraps a [`Sink`] into a sink compatible with libraries using @@ -328,3 +332,12 @@ pub trait SinkExt<Item>: Sink<Item> { Pin::new(self).poll_close(cx) } } + +// Just a helper function to ensure the sinks we're returning all have the +// right implementations. +pub(crate) fn assert_sink<T, E, S>(sink: S) -> S +where + S: Sink<T, Error = E>, +{ + sink +} diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs index 1aab200..3903716 100644 --- a/src/sink/unfold.rs +++ b/src/sink/unfold.rs @@ -1,3 +1,4 @@ +use super::assert_sink; use crate::unfold_state::UnfoldState; use core::{future::Future, pin::Pin}; use futures_core::ready; @@ -40,10 +41,10 @@ where F: FnMut(T, Item) -> R, R: Future<Output = Result<T, E>>, { - Unfold { + assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init }, - } + }) } impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> diff --git a/src/stream/empty.rs b/src/stream/empty.rs index d228b31..c629a4b 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker::PhantomData; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,9 +15,9 @@ pub struct Empty<T> { /// /// The returned stream will always return `Ready(None)` when polled. pub fn empty<T>() -> Empty<T> { - Empty { + assert_stream::<T, _>(Empty { _phantom: PhantomData - } + }) } impl<T> Unpin for Empty<T> {} diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 37b7d7e..8dcc551 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -30,22 +30,6 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -const YIELD_EVERY: usize = 32; /// A set of futures which may complete in any order. /// @@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; @@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { let task = bomb.task.take().unwrap(); bomb.queue.link(task); - if polled == YIELD_EVERY { + if polled == yield_every { // We have polled a large number of futures in a row without yielding. // To ensure we do not starve other tasks waiting on the executor, // we yield here, but immediately wake ourselves up to continue. diff --git a/src/stream/iter.rs b/src/stream/iter.rs index cab8cd8..033dae1 100644 --- a/src/stream/iter.rs +++ b/src/stream/iter.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -28,9 +29,9 @@ impl<I> Unpin for Iter<I> {} pub fn iter<I>(i: I) -> Iter<I::IntoIter> where I: IntoIterator, { - Iter { + assert_stream::<I::Item, _>(Iter { iter: i.into_iter(), - } + }) } impl<I> Stream for Iter<I> diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a5624ba..f3b2baa 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -109,11 +109,11 @@ cfg_target_has_atomic! { pub use self::select_all::{select_all, SelectAll}; } -// Just a helper function to ensure the futures we're returning all have the +// Just a helper function to ensure the streams we're returning all have the // right implementations. pub(crate) fn assert_stream<T, S>(stream: S) -> S - where - S: Stream<Item = T>, +where + S: Stream<Item = T>, { stream } diff --git a/src/stream/once.rs b/src/stream/once.rs index 318de07..e16fe00 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; @@ -17,7 +18,7 @@ use pin_project_lite::pin_project; /// # }); /// ``` pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { - Once::new(future) + assert_stream::<Fut::Output, _>(Once::new(future)) } pin_project! { diff --git a/src/stream/pending.rs b/src/stream/pending.rs index ca793c1..d7030ff 100644 --- a/src/stream/pending.rs +++ b/src/stream/pending.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,7 +15,7 @@ pub struct Pending<T> { /// /// The returned stream will always return `Pending` when polled. pub fn pending<T>() -> Pending<T> { - Pending { _data: marker::PhantomData } + assert_stream::<T, _>(Pending { _data: marker::PhantomData }) } impl<T> Unpin for Pending<T> {} diff --git a/src/stream/poll_fn.rs b/src/stream/poll_fn.rs index e33ca57..b9bd7d1 100644 --- a/src/stream/poll_fn.rs +++ b/src/stream/poll_fn.rs @@ -1,5 +1,6 @@ //! Definition of the `PollFn` combinator +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::stream::Stream; @@ -41,7 +42,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, { - PollFn { f } + assert_stream::<T, _>(PollFn { f }) } impl<T, F> Stream for PollFn<F> diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 6a2637d..cf9f21b 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -26,7 +27,7 @@ pub struct Repeat<T> { pub fn repeat<T>(item: T) -> Repeat<T> where T: Clone { - Repeat { item } + assert_stream::<T, _>(Repeat { item }) } impl<T> Unpin for Repeat<T> {} diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index eb3313d..0255643 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -89,5 +90,5 @@ impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> /// # }); /// ``` pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> { - RepeatWith { repeater } + assert_stream::<A, _>(RepeatWith { repeater }) } diff --git a/src/stream/select.rs b/src/stream/select.rs index 2b7ebec..2942494 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -31,11 +32,11 @@ pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2> where St1: Stream, St2: Stream<Item = St1::Item> { - Select { + assert_stream::<St1::Item, _>(Select { stream1: stream1.fuse(), stream2: stream2.fuse(), flag: false, - } + }) } impl<St1, St2> Select<St1, St2> { diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index 00368bb..c0b92fa 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -8,6 +8,7 @@ use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; +use super::assert_stream; use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; /// An unbounded set of streams @@ -124,7 +125,7 @@ pub fn select_all<I>(streams: I) -> SelectAll<I::Item> set.push(stream); } - set + assert_stream::<<I::Item as Stream>::Item, _>(set) } impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b1b4384..c3340ec 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -4,8 +4,11 @@ //! including the `StreamExt` trait which adds methods to `Stream` types. use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; #[cfg(feature = "alloc")] use alloc::boxed::Box; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; #[cfg(feature = "sink")] use futures_core::stream::TryStream; @@ -19,7 +22,7 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use crate::fns::{InspectFn, inspect_fn}; +use crate::fns::{inspect_fn, InspectFn}; mod chain; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -201,7 +204,6 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; -use crate::stream::assert_stream; impl<T: ?Sized> StreamExt for T where T: Stream {} @@ -689,7 +691,7 @@ pub trait StreamExt: Stream { U: Stream, Self: Sized, { - FlatMap::new(self, f) + assert_stream::<U::Item, _>(FlatMap::new(self, f)) } /// Combinator similar to [`StreamExt::fold`] that holds internal state @@ -722,7 +724,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = Option<B>>, Self: Sized, { - Scan::new(self, initial_state, f) + assert_stream::<B, _>(Scan::new(self, initial_state, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -793,7 +795,7 @@ pub trait StreamExt: Stream { /// this stream combinator will always return that the stream is done. /// /// The stopping future may return any type. Once the stream is stopped - /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`. + /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. /// The stream may also be resumed with `TakeUntil::take_future()`. /// See the documentation of [`TakeUntil`] for more information. /// @@ -827,7 +829,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - TakeUntil::new(self, fut) + assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) } /// Runs this stream to completion, executing the provided asynchronous @@ -1289,7 +1291,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity)) + assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) } /// An adaptor for chunking up ready items of the stream inside a vector. @@ -1312,10 +1314,10 @@ pub trait StreamExt: Stream { /// This method will panic if `capacity` is zero. #[cfg(feature = "alloc")] fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> - where - Self: Sized, + where + Self: Sized, { - ReadyChunks::new(self, capacity) + assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) } /// A future that completes after the given stream has been fully processed @@ -1334,7 +1336,10 @@ pub trait StreamExt: Stream { where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized, + // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, { + // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` + // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) Forward::new(self, sink) } @@ -1356,7 +1361,10 @@ pub trait StreamExt: Stream { Self: Sink<Item> + Sized, { let (sink, stream) = split::split(self); - (sink, assert_stream::<Self::Item, _>(stream)) + ( + crate::sink::assert_sink::<Item, Self::Error, _>(sink), + assert_stream::<Self::Item, _>(stream), + ) } /// Do something with each item of this stream, afterwards passing it on. @@ -1459,6 +1467,6 @@ pub trait StreamExt: Stream { where Self: Unpin + FusedStream, { - SelectNextSome::new(self) + assert_future::<Self::Item, _>(SelectNextSome::new(self)) } } diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 6a48a4c..b7353d9 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -14,7 +14,9 @@ use futures_core::{ use crate::fns::{ InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, }; +use crate::future::assert_future; use crate::stream::{Map, Inspect}; +use crate::stream::assert_stream; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -135,8 +137,6 @@ mod into_async_read; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; -use crate::future::assert_future; -use crate::stream::assert_stream; impl<S: ?Sized + TryStream> TryStreamExt for S {} @@ -471,7 +471,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized, { - TryTakeWhile::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) } /// Attempts to run this stream to completion, executing the provided asynchronous @@ -919,7 +919,7 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture<Error = Self::Error>, Self: Sized, { - TryBuffered::new(self, n) + assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n)) } // TODO: false positive warning from rustdoc. Verify once #43466 settles @@ -997,6 +997,6 @@ pub trait TryStreamExt: TryStream { Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, Self::Ok: AsRef<[u8]>, { - IntoAsyncRead::new(self) + crate::io::assert_read(IntoAsyncRead::new(self)) } } diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index c8fc421..258c18e 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; @@ -60,11 +61,11 @@ where F: FnMut(T) -> Fut, Fut: TryFuture<Ok = Option<(Item, T)>>, { - TryUnfold { + assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None, - } + }) } pin_project! { diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 473bb67..e17d465 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::unfold_state::UnfoldState; use core::fmt; use core::pin::Pin; @@ -51,10 +52,10 @@ where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>, { - Unfold { + assert_stream::<Item, _>(Unfold { f, state: UnfoldState::Value { value: init }, - } + }) } pin_project! { diff --git a/src/task/mod.rs b/src/task/mod.rs index 77e5a96..dd1515c 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -10,7 +10,8 @@ //! The remaining types and traits in the module are used for implementing //! executors or dealing with synchronization issues around task wakeup. -pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; +#[doc(no_inline)] +pub use core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; pub use futures_task::{ Spawn, LocalSpawn, SpawnError, |