aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-04-02 16:10:11 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-04-02 16:10:11 +0000
commitcf1d92ef037819957a5a01d24e222b592cadb284 (patch)
tree931c1432113b74ac060d81ede8b4cbf7d3abfd76
parent2eb284d7dbdf6e1f022ab60aa0d0ae4d44cb2041 (diff)
parent1fdff8fd4c4296e2707526c7f2cd2fd78447a7b1 (diff)
downloadfutures-util-cf1d92ef037819957a5a01d24e222b592cadb284.tar.gz
Upgrade rust/crates/futures-util to 0.3.13 am: 1fdff8fd4c
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1662922 Change-Id: I0247d2fa66aa9ccee39d7219a0ae75468c013ee5
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp28
-rw-r--r--Cargo.toml14
-rw-r--r--Cargo.toml.orig14
-rw-r--r--METADATA8
-rw-r--r--TEST_MAPPING48
-rw-r--r--src/async_await/join_mod.rs2
-rw-r--r--src/async_await/poll.rs4
-rw-r--r--src/future/abortable.rs5
-rw-r--r--src/future/either.rs7
-rw-r--r--src/future/future/mod.rs7
-rw-r--r--src/future/future/shared.rs32
-rw-r--r--src/future/join.rs25
-rw-r--r--src/future/join_all.rs4
-rw-r--r--src/future/lazy.rs3
-rw-r--r--src/future/maybe_done.rs3
-rw-r--r--src/future/mod.rs5
-rw-r--r--src/future/pending.rs5
-rw-r--r--src/future/poll_fn.rs3
-rw-r--r--src/future/ready.rs3
-rw-r--r--src/future/select.rs29
-rw-r--r--src/future/select_all.rs3
-rw-r--r--src/future/select_ok.rs3
-rw-r--r--src/future/try_future/mod.rs10
-rw-r--r--src/future/try_join.rs18
-rw-r--r--src/future/try_join_all.rs6
-rw-r--r--src/future/try_maybe_done.rs3
-rw-r--r--src/future/try_select.rs5
-rw-r--r--src/io/cursor.rs2
-rw-r--r--src/io/mod.rs73
-rw-r--r--src/sink/close.rs2
-rw-r--r--src/sink/drain.rs3
-rw-r--r--src/sink/mod.rs41
-rw-r--r--src/sink/unfold.rs5
-rw-r--r--src/stream/empty.rs5
-rw-r--r--src/stream/futures_unordered/mod.rs34
-rw-r--r--src/stream/iter.rs5
-rw-r--r--src/stream/mod.rs6
-rw-r--r--src/stream/once.rs3
-rw-r--r--src/stream/pending.rs3
-rw-r--r--src/stream/poll_fn.rs3
-rw-r--r--src/stream/repeat.rs3
-rw-r--r--src/stream/repeat_with.rs3
-rw-r--r--src/stream/select.rs5
-rw-r--r--src/stream/select_all.rs3
-rw-r--r--src/stream/stream/mod.rs32
-rw-r--r--src/stream/try_stream/mod.rs10
-rw-r--r--src/stream/try_stream/try_unfold.rs5
-rw-r--r--src/stream/unfold.rs5
-rw-r--r--src/task/mod.rs3
50 files changed, 368 insertions, 185 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 4fd4ba3..f3ad3ab 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6"
+ "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
}
}
diff --git a/Android.bp b/Android.bp
index 7345360..cd38e9a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
// This file is generated by cargo2android.py --run --device --features channel,default,io,memchr,sink --dependencies --tests --patch=patches/Android.bp.patch.
+// Do not modify this file as changes will be overridden on upgrade.
package {
default_applicable_licenses: ["external_rust_crates_futures-util_license"],
@@ -141,7 +142,7 @@ rust_library {
// dependent_library ["feature_list"]
// autocfg-1.0.1
-// byteorder-1.4.2 "default,std"
+// byteorder-1.4.3 "default,std"
// bytes-0.4.12
// cfg-if-0.1.10
// cfg-if-1.0.0
@@ -150,16 +151,16 @@ rust_library {
// crossbeam-queue-0.2.3 "default,std"
// crossbeam-utils-0.7.2 "default,lazy_static,std"
// fnv-1.0.7 "default,std"
-// futures-0.1.30 "default,use_std,with-deprecated"
-// futures-channel-0.3.12 "alloc,std"
-// futures-core-0.3.12 "alloc,std"
-// futures-io-0.3.12 "std"
-// futures-macro-0.3.12
-// futures-sink-0.3.12
-// futures-task-0.3.12 "alloc,once_cell,std"
+// futures-0.1.31 "default,use_std,with-deprecated"
+// futures-channel-0.3.13 "alloc,std"
+// futures-core-0.3.13 "alloc,std"
+// futures-io-0.3.13 "std"
+// futures-macro-0.3.13
+// futures-sink-0.3.13
+// futures-task-0.3.13 "alloc,std"
// iovec-0.1.4
// lazy_static-1.4.0
-// libc-0.2.86 "default,std"
+// libc-0.2.92 "default,std"
// lock_api-0.3.4
// log-0.4.14
// maybe-uninit-2.0.0
@@ -169,22 +170,21 @@ rust_library {
// mio-uds-0.6.8
// net2-0.2.37 "default,duration"
// num_cpus-1.13.0
-// once_cell-1.5.2 "alloc,std"
// parking_lot-0.9.0 "default"
// parking_lot_core-0.6.2
-// pin-project-lite-0.2.4
+// pin-project-lite-0.2.6
// pin-utils-0.1.0
// proc-macro-hack-0.5.19
// proc-macro-nested-0.1.7
-// proc-macro2-1.0.24 "default,proc-macro"
-// quote-1.0.8 "default,proc-macro"
+// proc-macro2-1.0.26 "default,proc-macro"
+// quote-1.0.9 "default,proc-macro"
// rustc_version-0.2.3
// scopeguard-1.1.0
// semver-0.9.0 "default"
// semver-parser-0.7.0
// slab-0.4.2
// smallvec-0.6.14 "default,std"
-// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
+// syn-1.0.68 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds"
// tokio-codec-0.1.2
// tokio-current-thread-0.1.7
diff --git a/Cargo.toml b/Cargo.toml
index 1b96b8f..9b87df6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "futures-util"
-version = "0.3.12"
+version = "0.3.13"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "Common utilities and extension traits for the futures-rs library.\n"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -24,33 +24,33 @@ repository = "https://github.com/rust-lang/futures-rs"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-channel]
-version = "0.3.12"
+version = "0.3.13"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.12"
+version = "0.3.13"
default-features = false
[dependencies.futures-io]
-version = "0.3.12"
+version = "0.3.13"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.12"
+version = "=0.3.13"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.12"
+version = "0.3.13"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.12"
+version = "0.3.13"
default-features = false
[dependencies.futures_01]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 010902b..b7cc193 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "futures-util"
edition = "2018"
-version = "0.3.12"
+version = "0.3.13"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
@@ -33,12 +33,12 @@ read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
write-all-vectored = ["io"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.12", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.12", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.12", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.12", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.13", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.13", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.13", default-features = false, optional = true }
proc-macro-hack = { version = "0.5.19", optional = true }
proc-macro-nested = { version = "0.1.2", optional = true }
slab = { version = "0.4.2", optional = true }
diff --git a/METADATA b/METADATA
index f29b4dd..36be8c4 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.12.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.13.crate"
}
- version: "0.3.12"
+ version: "0.3.13"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 2
- day: 9
+ month: 4
+ day: 1
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 7e10dd0..203632d 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -2,7 +2,55 @@
{
"presubmit": [
{
+ "name": "anyhow_device_test_tests_test_boxed"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_ffi"
+ },
+ {
"name": "futures-util_device_test_src_lib"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_chain"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_convert"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_source"
+ },
+ {
+ "name": "tokio-test_device_test_tests_io"
+ },
+ {
+ "name": "tokio-test_device_test_tests_macros"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_context"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_autotrait"
+ },
+ {
+ "name": "tokio-test_device_test_src_lib"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_macros"
+ },
+ {
+ "name": "anyhow_device_test_src_lib"
+ },
+ {
+ "name": "tokio-test_device_test_tests_block_on"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_fmt"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_downcast"
+ },
+ {
+ "name": "anyhow_device_test_tests_test_repr"
}
]
}
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index d6ddb6d..965d9fb 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -8,7 +8,7 @@ macro_rules! document_join_macro {
/// of all results once complete.
///
/// While `join!(a, b)` is similar to `(a.await, b.await)`,
- /// `join!` polls both futures concurrently and therefore is more efficent.
+ /// `join!` polls both futures concurrently and therefore is more efficient.
///
/// This macro is only usable inside of async functions, closures, and blocks.
/// It is also gated behind the `async-await` feature of this library, which is
diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs
index ac70a53..b5782df 100644
--- a/src/async_await/poll.rs
+++ b/src/async_await/poll.rs
@@ -9,6 +9,10 @@ use futures_core::task::{Context, Poll};
/// This macro is only usable inside of `async` functions, closures, and blocks.
/// It is also gated behind the `async-await` feature of this library, which is
/// activated by default.
+///
+/// If you need the result of polling a [`Stream`](crate::stream::Stream),
+/// you can use this macro with the [`next`](crate::stream::StreamExt::next) method:
+/// `poll!(stream.next())`.
#[macro_export]
macro_rules! poll {
($x:expr $(,)?) => {
diff --git a/src/future/abortable.rs b/src/future/abortable.rs
index 1fc75b0..3f2e5a0 100644
--- a/src/future/abortable.rs
+++ b/src/future/abortable.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use crate::task::AtomicWaker;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
@@ -39,10 +40,10 @@ impl<Fut> Abortable<Fut> where Fut: Future {
/// # });
/// ```
pub fn new(future: Fut, reg: AbortRegistration) -> Self {
- Self {
+ assert_future::<Result<Fut::Output, Aborted>, _>(Self {
future,
inner: reg.inner,
- }
+ })
}
}
diff --git a/src/future/either.rs b/src/future/either.rs
index a1b9f0a..5f5b614 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -101,6 +101,13 @@ where
Either::Right(x) => x.poll_next(cx),
}
}
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self {
+ Either::Left(x) => x.size_hint(),
+ Either::Right(x) => x.size_hint(),
+ }
+ }
}
impl<A, B> FusedStream for Either<A, B>
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs
index f01d346..c11d108 100644
--- a/src/future/future/mod.rs
+++ b/src/future/future/mod.rs
@@ -7,10 +7,10 @@
use alloc::boxed::Box;
use core::pin::Pin;
-use crate::future::{assert_future, Either};
-use crate::stream::assert_stream;
use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
+use crate::future::{assert_future, Either};
use crate::never::Never;
+use crate::stream::assert_stream;
#[cfg(feature = "alloc")]
use futures_core::future::{BoxFuture, LocalBoxFuture};
use futures_core::{
@@ -506,7 +506,8 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
- remote_handle::remote_handle(self)
+ let (wrapped, handle) = remote_handle::remote_handle(self);
+ (assert_future::<(), _>(wrapped), handle)
}
/// Wrap the future in a Box, pinning it.
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index 53635b5..74311a0 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -126,6 +126,32 @@ where
}
None
}
+
+ /// Gets the number of strong pointers to this allocation.
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ ///
+ /// # Safety
+ ///
+ /// This method by itself is safe, but using it correctly requires extra care. Another thread
+ /// can change the strong count at any time, including potentially between calling this method
+ /// and acting on the result.
+ pub fn strong_count(&self) -> Option<usize> {
+ self.inner.as_ref().map(|arc| Arc::strong_count(arc))
+ }
+
+ /// Gets the number of weak pointers to this allocation.
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ ///
+ /// # Safety
+ ///
+ /// This method by itself is safe, but using it correctly requires extra care. Another thread
+ /// can change the weak count at any time, including potentially between calling this method
+ /// and acting on the result.
+ pub fn weak_count(&self) -> Option<usize> {
+ self.inner.as_ref().map(|arc| Arc::weak_count(arc))
+ }
}
impl<Fut> Inner<Fut>
@@ -287,10 +313,8 @@ where
// Wake all tasks and drop the slab
let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
let mut wakers = wakers_guard.take().unwrap();
- for opt_waker in wakers.drain() {
- if let Some(waker) = opt_waker {
- waker.wake();
- }
+ for waker in wakers.drain().flatten() {
+ waker.wake();
}
drop(_reset); // Make borrow checker happy
diff --git a/src/future/join.rs b/src/future/join.rs
index cfe53a7..a818343 100644
--- a/src/future/join.rs
+++ b/src/future/join.rs
@@ -1,14 +1,13 @@
#![allow(non_snake_case)]
-use crate::future::{MaybeDone, maybe_done};
+use super::assert_future;
+use crate::future::{maybe_done, MaybeDone};
use core::fmt;
use core::pin::Pin;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
-use super::assert_future;
-
macro_rules! generate {
($(
$(#[$doc:meta])*
@@ -144,7 +143,8 @@ where
Fut2: Future,
Fut3: Future,
{
- Join3::new(future1, future2, future3)
+ let f = Join3::new(future1, future2, future3);
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output), _>(f)
}
/// Same as [`join`](join()), but with more futures.
@@ -176,7 +176,8 @@ where
Fut3: Future,
Fut4: Future,
{
- Join4::new(future1, future2, future3, future4)
+ let f = Join4::new(future1, future2, future3, future4);
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output), _>(f)
}
/// Same as [`join`](join()), but with more futures.
@@ -211,5 +212,15 @@ where
Fut4: Future,
Fut5: Future,
{
- Join5::new(future1, future2, future3, future4, future5)
+ let f = Join5::new(future1, future2, future3, future4, future5);
+ assert_future::<
+ (
+ Fut1::Output,
+ Fut2::Output,
+ Fut3::Output,
+ Fut4::Output,
+ Fut5::Output,
+ ),
+ _,
+ >(f)
}
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 0c8357c..7ccf869 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -10,7 +10,7 @@ use core::task::{Context, Poll};
use alloc::boxed::Box;
use alloc::vec::Vec;
-use super::MaybeDone;
+use super::{MaybeDone, assert_future};
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
@@ -85,7 +85,7 @@ where
I::Item: Future,
{
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
- JoinAll { elems: elems.into() }
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
}
impl<F> Future for JoinAll<F>
diff --git a/src/future/lazy.rs b/src/future/lazy.rs
index 409717a..42812d3 100644
--- a/src/future/lazy.rs
+++ b/src/future/lazy.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
@@ -34,7 +35,7 @@ impl<F> Unpin for Lazy<F> {}
pub fn lazy<F, R>(f: F) -> Lazy<F>
where F: FnOnce(&mut Context<'_>) -> R,
{
- Lazy { f: Some(f) }
+ assert_future::<R, _>(Lazy { f: Some(f) })
}
impl<F, R> FusedFuture for Lazy<F>
diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs
index bb5579e..26e6c27 100644
--- a/src/future/maybe_done.rs
+++ b/src/future/maybe_done.rs
@@ -1,5 +1,6 @@
//! Definition of the MaybeDone combinator
+use super::assert_future;
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
@@ -40,7 +41,7 @@ impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
/// # });
/// ```
pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
- MaybeDone::Future(future)
+ assert_future::<(), _>(MaybeDone::Future(future))
}
impl<Fut: Future> MaybeDone<Fut> {
diff --git a/src/future/mod.rs b/src/future/mod.rs
index ab29823..84e457c 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -9,9 +9,12 @@
//! from a closure that defines its return value, and [`ready`](ready()),
//! which constructs a future with an immediate defined value.
+#[doc(no_inline)]
+pub use core::future::Future;
+
#[cfg(feature = "alloc")]
pub use futures_core::future::{BoxFuture, LocalBoxFuture};
-pub use futures_core::future::{FusedFuture, Future, TryFuture};
+pub use futures_core::future::{FusedFuture, TryFuture};
pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
// Extension traits and combinators
diff --git a/src/future/pending.rs b/src/future/pending.rs
index 5a7bbb8..4311b9a 100644
--- a/src/future/pending.rs
+++ b/src/future/pending.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use core::marker;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
@@ -33,9 +34,9 @@ impl<T> FusedFuture for Pending<T> {
/// # });
/// ```
pub fn pending<T>() -> Pending<T> {
- Pending {
+ assert_future::<T, _>(Pending {
_data: marker::PhantomData,
- }
+ })
}
impl<T> Future for Pending<T> {
diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs
index b7b10be..6ac1ab8 100644
--- a/src/future/poll_fn.rs
+++ b/src/future/poll_fn.rs
@@ -1,5 +1,6 @@
//! Definition of the `PollFn` adapter combinator
+use super::assert_future;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
@@ -36,7 +37,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>
{
- PollFn { f }
+ assert_future::<T, _>(PollFn { f })
}
impl<F> fmt::Debug for PollFn<F> {
diff --git a/src/future/ready.rs b/src/future/ready.rs
index 35f01c9..e3d791b 100644
--- a/src/future/ready.rs
+++ b/src/future/ready.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
@@ -45,7 +46,7 @@ impl<T> Future for Ready<T> {
/// # });
/// ```
pub fn ready<T>(t: T) -> Ready<T> {
- Ready(Some(t))
+ assert_future::<T, _>(Ready(Some(t)))
}
/// Create a future that is immediately ready with a success value.
diff --git a/src/future/select.rs b/src/future/select.rs
index bc24779..043ed17 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
@@ -31,25 +32,33 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
///
/// ```
/// # futures::executor::block_on(async {
-/// use futures::future::{self, Either};
-/// use futures::pin_mut;
-///
-/// // These two futures have different types even though their outputs have the same type
-/// let future1 = async { 1 };
-/// let future2 = async { 2 };
+/// use futures::{
+/// pin_mut,
+/// future::Either,
+/// future::self,
+/// };
+///
+/// // These two futures have different types even though their outputs have the same type.
+/// let future1 = async {
+/// future::pending::<()>().await; // will never finish
+/// 1
+/// };
+/// let future2 = async {
+/// future::ready(2).await
+/// };
///
/// // 'select' requires Future + Unpin bounds
/// pin_mut!(future1);
/// pin_mut!(future2);
///
/// let value = match future::select(future1, future2).await {
-/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
-/// // `_` represents `future2`
+/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
+/// // `_` represents `future2`
/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
/// // `_` represents `future1`
/// };
///
-/// assert!(value == 1 || value == 2);
+/// assert!(value == 2);
/// # });
/// ```
///
@@ -75,7 +84,7 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
where A: Future + Unpin, B: Future + Unpin
{
- Select { inner: Some((future1, future2)) }
+ assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { inner: Some((future1, future2)) })
}
impl<A, B> Future for Select<A, B>
diff --git a/src/future/select_all.rs b/src/future/select_all.rs
index 9f7fb24..0db90a7 100644
--- a/src/future/select_all.rs
+++ b/src/future/select_all.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use crate::future::FutureExt;
use core::iter::FromIterator;
use core::mem;
@@ -38,7 +39,7 @@ pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
inner: iter.into_iter().collect()
};
assert!(!ret.inner.is_empty());
- ret
+ assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret)
}
impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs
index 7f4f4d6..52d393c 100644
--- a/src/future/select_ok.rs
+++ b/src/future/select_ok.rs
@@ -1,3 +1,4 @@
+use super::assert_future;
use crate::future::TryFutureExt;
use core::iter::FromIterator;
use core::mem;
@@ -36,7 +37,7 @@ pub fn select_ok<I>(iter: I) -> SelectOk<I::Item>
inner: iter.into_iter().collect()
};
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
- ret
+ assert_future::<Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, _>(ret)
}
impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs
index 1ce01d2..fb3bdd8 100644
--- a/src/future/try_future/mod.rs
+++ b/src/future/try_future/mod.rs
@@ -173,7 +173,7 @@ pub trait TryFutureExt: TryFuture {
Self::Ok: Sink<Item, Error = Self::Error>,
Self: Sized,
{
- FlattenSink::new(self)
+ crate::sink::assert_sink::<Item, Self::Error, _>(FlattenSink::new(self))
}
/// Maps this future's success value to a different value.
@@ -501,7 +501,7 @@ pub trait TryFutureExt: TryFuture {
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
{
- TryFlatten::new(self)
+ assert_future::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryFlatten::new(self))
}
/// Flatten the execution of this future when the successful result of this
@@ -539,7 +539,7 @@ pub trait TryFutureExt: TryFuture {
))
}
- /// Unwraps this future's ouput, producing a future with this future's
+ /// Unwraps this future's output, producing a future with this future's
/// [`Ok`](TryFuture::Ok) type as its
/// [`Output`](std::future::Future::Output) type.
///
@@ -569,8 +569,8 @@ pub trait TryFutureExt: TryFuture {
assert_future::<Self::Ok, _>(UnwrapOrElse::new(self, f))
}
- /// Wraps a [`TryFuture`] into a future compatable with libraries using
- /// futures 0.1 future definitons. Requires the `compat` feature to enable.
+ /// Wraps a [`TryFuture`] into a future compatible with libraries using
+ /// futures 0.1 future definitions. Requires the `compat` feature to enable.
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> Compat<Self>
diff --git a/src/future/try_join.rs b/src/future/try_join.rs
index 25ccdde..6af1f0c 100644
--- a/src/future/try_join.rs
+++ b/src/future/try_join.rs
@@ -1,6 +1,6 @@
#![allow(non_snake_case)]
-use crate::future::{TryMaybeDone, try_maybe_done};
+use crate::future::{assert_future, try_maybe_done, TryMaybeDone};
use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
@@ -108,7 +108,7 @@ generate! {
///
/// This function will return a new future which awaits both futures to
/// complete. If successful, the returned future will finish with a tuple of
-/// both results. If unsuccesful, it will complete with the first error
+/// both results. If unsuccessful, it will complete with the first error
/// encountered.
///
/// Note that this function consumes the passed futures and returns a
@@ -150,7 +150,7 @@ where
Fut1: TryFuture,
Fut2: TryFuture<Error = Fut1::Error>,
{
- TryJoin::new(future1, future2)
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok), Fut1::Error>, _>(TryJoin::new(future1, future2))
}
/// Same as [`try_join`](try_join()), but with more futures.
@@ -179,7 +179,9 @@ where
Fut2: TryFuture<Error = Fut1::Error>,
Fut3: TryFuture<Error = Fut1::Error>,
{
- TryJoin3::new(future1, future2, future3)
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok), Fut1::Error>, _>(TryJoin3::new(
+ future1, future2, future3,
+ ))
}
/// Same as [`try_join`](try_join()), but with more futures.
@@ -211,7 +213,9 @@ where
Fut3: TryFuture<Error = Fut1::Error>,
Fut4: TryFuture<Error = Fut1::Error>,
{
- TryJoin4::new(future1, future2, future3, future4)
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok), Fut1::Error>, _>(
+ TryJoin4::new(future1, future2, future3, future4),
+ )
}
/// Same as [`try_join`](try_join()), but with more futures.
@@ -246,5 +250,7 @@ where
Fut4: TryFuture<Error = Fut1::Error>,
Fut5: TryFuture<Error = Fut1::Error>,
{
- TryJoin5::new(future1, future2, future3, future4, future5)
+ assert_future::<Result<(Fut1::Ok, Fut2::Ok, Fut3::Ok, Fut4::Ok, Fut5::Ok), Fut1::Error>, _>(
+ TryJoin5::new(future1, future2, future3, future4, future5),
+ )
}
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs
index 4de0a79..371f753 100644
--- a/src/future/try_join_all.rs
+++ b/src/future/try_join_all.rs
@@ -10,7 +10,7 @@ use core::task::{Context, Poll};
use alloc::boxed::Box;
use alloc::vec::Vec;
-use super::{TryFuture, TryMaybeDone};
+use super::{assert_future, TryFuture, TryMaybeDone};
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
@@ -93,9 +93,9 @@ where
I::Item: TryFuture,
{
let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
- TryJoinAll {
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(TryJoinAll {
elems: elems.into(),
- }
+ })
}
impl<F> Future for TryJoinAll<F>
diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs
index 90067e9..dfd2900 100644
--- a/src/future/try_maybe_done.rs
+++ b/src/future/try_maybe_done.rs
@@ -1,5 +1,6 @@
//! Definition of the TryMaybeDone combinator
+use super::assert_future;
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
@@ -25,7 +26,7 @@ impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {}
/// Wraps a future into a `TryMaybeDone`
pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> {
- TryMaybeDone::Future(future)
+ assert_future::<Result<(), Fut::Error>, _>(TryMaybeDone::Future(future))
}
impl<Fut: TryFuture> TryMaybeDone<Fut> {
diff --git a/src/future/try_select.rs b/src/future/try_select.rs
index 56564f5..b26eed3 100644
--- a/src/future/try_select.rs
+++ b/src/future/try_select.rs
@@ -50,7 +50,10 @@ impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
where A: TryFuture + Unpin, B: TryFuture + Unpin
{
- TrySelect { inner: Some((future1, future2)) }
+ super::assert_future::<Result<
+ Either<(A::Ok, B), (B::Ok, A)>,
+ Either<(A::Error, B), (B::Error, A)>,
+ >, _>(TrySelect { inner: Some((future1, future2)) })
}
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
diff --git a/src/io/cursor.rs b/src/io/cursor.rs
index b11dbf5..084fb08 100644
--- a/src/io/cursor.rs
+++ b/src/io/cursor.rs
@@ -13,7 +13,7 @@ use std::pin::Pin;
/// allowing these buffers to be used anywhere you might use a reader or writer
/// that does actual I/O.
///
-/// The standard library implements some I/O traits on various types which
+/// This library implements some I/O traits on various types which
/// are commonly used as a buffer, like `Cursor<`[`Vec`]`<u8>>` and
/// `Cursor<`[`&[u8]`][bytes]`>`.
///
diff --git a/src/io/mod.rs b/src/io/mod.rs
index a7e2add..1437930 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -19,15 +19,20 @@
#[cfg(feature = "io-compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
use crate::compat::Compat;
+use crate::future::assert_future;
+use crate::stream::assert_stream;
use std::{ptr, pin::Pin};
-pub use futures_io::{
- AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
- IoSlice, IoSliceMut, Result, SeekFrom,
-};
+// Re-export some types from `std::io` so that users don't have to deal
+// with conflicts when `use`ing `futures::io` and `std::io`.
+#[doc(no_inline)]
+pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
+#[doc(no_inline)]
#[cfg(feature = "read-initializer")]
#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
-pub use futures_io::Initializer;
+pub use std::io::Initializer;
+
+pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
@@ -173,7 +178,7 @@ pub trait AsyncReadExt: AsyncRead {
Self: Sized,
R: AsyncRead,
{
- Chain::new(self, next)
+ assert_read(Chain::new(self, next))
}
/// Tries to read some bytes directly into the given `buf` in asynchronous
@@ -203,7 +208,7 @@ pub trait AsyncReadExt: AsyncRead {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
where Self: Unpin,
{
- Read::new(self, buf)
+ assert_future::<Result<usize>, _>(Read::new(self, buf))
}
/// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
@@ -214,7 +219,7 @@ pub trait AsyncReadExt: AsyncRead {
fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
where Self: Unpin,
{
- ReadVectored::new(self, bufs)
+ assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
}
/// Creates a future which will read exactly enough bytes to fill `buf`,
@@ -260,7 +265,7 @@ pub trait AsyncReadExt: AsyncRead {
) -> ReadExact<'a, Self>
where Self: Unpin,
{
- ReadExact::new(self, buf)
+ assert_future::<Result<()>, _>(ReadExact::new(self, buf))
}
/// Creates a future which will read all the bytes from this `AsyncRead`.
@@ -288,7 +293,7 @@ pub trait AsyncReadExt: AsyncRead {
) -> ReadToEnd<'a, Self>
where Self: Unpin,
{
- ReadToEnd::new(self, buf)
+ assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
}
/// Creates a future which will read all the bytes from this `AsyncRead`.
@@ -316,7 +321,7 @@ pub trait AsyncReadExt: AsyncRead {
) -> ReadToString<'a, Self>
where Self: Unpin,
{
- ReadToString::new(self, buf)
+ assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
}
/// Helper method for splitting this read/write object into two halves.
@@ -351,7 +356,8 @@ pub trait AsyncReadExt: AsyncRead {
fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
where Self: AsyncWrite + Sized,
{
- split::split(self)
+ let (r, w) = split::split(self);
+ (assert_read(r), assert_write(w))
}
/// Creates an AsyncRead adapter which will read at most `limit` bytes
@@ -376,7 +382,7 @@ pub trait AsyncReadExt: AsyncRead {
fn take(self, limit: u64) -> Take<Self>
where Self: Sized
{
- Take::new(self, limit)
+ assert_read(Take::new(self, limit))
}
/// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
@@ -423,14 +429,14 @@ pub trait AsyncWriteExt: AsyncWrite {
fn flush(&mut self) -> Flush<'_, Self>
where Self: Unpin,
{
- Flush::new(self)
+ assert_future::<Result<()>, _>(Flush::new(self))
}
/// Creates a future which will entirely close this `AsyncWrite`.
fn close(&mut self) -> Close<'_, Self>
where Self: Unpin,
{
- Close::new(self)
+ assert_future::<Result<()>, _>(Close::new(self))
}
/// Creates a future which will write bytes from `buf` into the object.
@@ -440,7 +446,7 @@ pub trait AsyncWriteExt: AsyncWrite {
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
where Self: Unpin,
{
- Write::new(self, buf)
+ assert_future::<Result<usize>, _>(Write::new(self, buf))
}
/// Creates a future which will write bytes from `bufs` into the object using vectored
@@ -451,7 +457,7 @@ pub trait AsyncWriteExt: AsyncWrite {
fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
where Self: Unpin,
{
- WriteVectored::new(self, bufs)
+ assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
}
/// Write data into this object.
@@ -477,7 +483,7 @@ pub trait AsyncWriteExt: AsyncWrite {
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
where Self: Unpin,
{
- WriteAll::new(self, buf)
+ assert_future::<Result<()>, _>(WriteAll::new(self, buf))
}
/// Attempts to write multiple buffers into this writer.
@@ -532,7 +538,7 @@ pub trait AsyncWriteExt: AsyncWrite {
where
Self: Unpin,
{
- WriteAllVectored::new(self, bufs)
+ assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
}
/// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
@@ -577,7 +583,7 @@ pub trait AsyncWriteExt: AsyncWrite {
fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
where Self: Sized,
{
- IntoSink::new(self)
+ crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
}
}
@@ -593,7 +599,7 @@ pub trait AsyncSeekExt: AsyncSeek {
fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
where Self: Unpin,
{
- Seek::new(self, pos)
+ assert_future::<Result<u64>, _>(Seek::new(self, pos))
}
}
@@ -627,7 +633,7 @@ pub trait AsyncBufReadExt: AsyncBufRead {
fn fill_buf(&mut self) -> FillBuf<'_, Self>
where Self: Unpin,
{
- FillBuf::new(self)
+ assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
}
/// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
@@ -701,7 +707,7 @@ pub trait AsyncBufReadExt: AsyncBufRead {
) -> ReadUntil<'a, Self>
where Self: Unpin,
{
- ReadUntil::new(self, byte, buf)
+ assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
}
/// Creates a future which will read all the bytes associated with this I/O
@@ -758,7 +764,7 @@ pub trait AsyncBufReadExt: AsyncBufRead {
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
where Self: Unpin,
{
- ReadLine::new(self, buf)
+ assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
}
/// Returns a stream over the lines of this reader.
@@ -796,8 +802,25 @@ pub trait AsyncBufReadExt: AsyncBufRead {
fn lines(self) -> Lines<Self>
where Self: Sized,
{
- Lines::new(self)
+ assert_stream::<Result<String>, _>(Lines::new(self))
}
}
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
+
+// Just a helper function to ensure the reader we're returning all have the
+// right implementations.
+pub(crate) fn assert_read<R>(reader: R) -> R
+where
+ R: AsyncRead,
+{
+ reader
+}
+// Just a helper function to ensure the writer we're returning all have the
+// right implementations.
+pub(crate) fn assert_write<W>(writer: W) -> W
+where
+ W: AsyncWrite,
+{
+ writer
+}
diff --git a/src/sink/close.rs b/src/sink/close.rs
index 4421d10..4fc99f5 100644
--- a/src/sink/close.rs
+++ b/src/sink/close.rs
@@ -16,7 +16,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {}
/// A future that completes when the sink has finished closing.
///
-/// The sink itself is returned after closeing is complete.
+/// The sink itself is returned after closing is complete.
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si) -> Self {
Self {
diff --git a/src/sink/drain.rs b/src/sink/drain.rs
index 46de83b..33c5b31 100644
--- a/src/sink/drain.rs
+++ b/src/sink/drain.rs
@@ -1,3 +1,4 @@
+use super::assert_sink;
use crate::never::Never;
use core::marker::PhantomData;
use core::pin::Pin;
@@ -26,7 +27,7 @@ pub struct Drain<T> {
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn drain<T>() -> Drain<T> {
- Drain { marker: PhantomData }
+ assert_sink::<T, Never, _>(Drain { marker: PhantomData })
}
impl<T> Unpin for Drain<T> {}
diff --git a/src/sink/mod.rs b/src/sink/mod.rs
index 1a062d0..e5b515b 100644
--- a/src/sink/mod.rs
+++ b/src/sink/mod.rs
@@ -6,7 +6,7 @@
//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
//! sinks.
-use crate::future::Either;
+use crate::future::{assert_future, Either};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
@@ -81,7 +81,7 @@ pub trait SinkExt<Item>: Sink<Item> {
E: From<Self::Error>,
Self: Sized,
{
- With::new(self, f)
+ assert_sink::<U, E, _>(With::new(self, f))
}
/// Composes a function *in front of* the sink.
@@ -122,7 +122,7 @@ pub trait SinkExt<Item>: Sink<Item> {
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized,
{
- WithFlatMap::new(self, f)
+ assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
}
/*
@@ -145,7 +145,7 @@ pub trait SinkExt<Item>: Sink<Item> {
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
- SinkMapErr::new(self, f)
+ assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
}
/// Map this sink's error to a different error type using the `Into` trait.
@@ -156,7 +156,7 @@ pub trait SinkExt<Item>: Sink<Item> {
Self: Sized,
Self::Error: Into<E>,
{
- SinkErrInto::new(self)
+ assert_sink::<Item, E, _>(SinkErrInto::new(self))
}
/// Adds a fixed-size buffer to the current sink.
@@ -176,7 +176,7 @@ pub trait SinkExt<Item>: Sink<Item> {
where
Self: Sized,
{
- Buffer::new(self, capacity)
+ assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
}
/// Close the sink.
@@ -184,7 +184,7 @@ pub trait SinkExt<Item>: Sink<Item> {
where
Self: Unpin,
{
- Close::new(self)
+ assert_future::<Result<(), Self::Error>, _>(Close::new(self))
}
/// Fanout items to multiple sinks.
@@ -197,7 +197,7 @@ pub trait SinkExt<Item>: Sink<Item> {
Item: Clone,
Si: Sink<Item, Error = Self::Error>,
{
- Fanout::new(self, other)
+ assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
}
/// Flush the sink, processing all pending items.
@@ -208,7 +208,7 @@ pub trait SinkExt<Item>: Sink<Item> {
where
Self: Unpin,
{
- Flush::new(self)
+ assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
}
/// A future that completes after the given item has been fully processed
@@ -221,7 +221,7 @@ pub trait SinkExt<Item>: Sink<Item> {
where
Self: Unpin,
{
- Send::new(self, item)
+ assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
}
/// A future that completes after the given item has been received
@@ -231,9 +231,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// It is the caller's responsibility to ensure all pending items
/// are processed, which can be done via `flush` or `close`.
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
- where Self: Unpin,
+ where
+ Self: Unpin,
{
- Feed::new(self, item)
+ assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
}
/// A future that completes after the given stream has been fully processed
@@ -250,8 +251,11 @@ pub trait SinkExt<Item>: Sink<Item> {
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
+ // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
Self: Unpin,
{
+ // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
+ // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
SendAll::new(self, stream)
}
@@ -265,7 +269,7 @@ pub trait SinkExt<Item>: Sink<Item> {
Si2: Sink<Item, Error = Self::Error>,
Self: Sized,
{
- Either::Left(self)
+ assert_sink::<Item, Self::Error, _>(Either::Left(self))
}
/// Wrap this stream in an `Either` stream, making it the right-hand variant
@@ -278,7 +282,7 @@ pub trait SinkExt<Item>: Sink<Item> {
Si1: Sink<Item, Error = Self::Error>,
Self: Sized,
{
- Either::Right(self)
+ assert_sink::<Item, Self::Error, _>(Either::Right(self))
}
/// Wraps a [`Sink`] into a sink compatible with libraries using
@@ -328,3 +332,12 @@ pub trait SinkExt<Item>: Sink<Item> {
Pin::new(self).poll_close(cx)
}
}
+
+// Just a helper function to ensure the sinks we're returning all have the
+// right implementations.
+pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
+where
+ S: Sink<T, Error = E>,
+{
+ sink
+}
diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs
index 1aab200..3903716 100644
--- a/src/sink/unfold.rs
+++ b/src/sink/unfold.rs
@@ -1,3 +1,4 @@
+use super::assert_sink;
use crate::unfold_state::UnfoldState;
use core::{future::Future, pin::Pin};
use futures_core::ready;
@@ -40,10 +41,10 @@ where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
- Unfold {
+ assert_sink::<Item, E, _>(Unfold {
function,
state: UnfoldState::Value { value: init },
- }
+ })
}
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
diff --git a/src/stream/empty.rs b/src/stream/empty.rs
index d228b31..c629a4b 100644
--- a/src/stream/empty.rs
+++ b/src/stream/empty.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -14,9 +15,9 @@ pub struct Empty<T> {
///
/// The returned stream will always return `Ready(None)` when polled.
pub fn empty<T>() -> Empty<T> {
- Empty {
+ assert_stream::<T, _>(Empty {
_phantom: PhantomData
- }
+ })
}
impl<T> Unpin for Empty<T> {}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 37b7d7e..8dcc551 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -30,22 +30,6 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
-/// Constant used for a `FuturesUnordered` to determine how many times it is
-/// allowed to poll underlying futures without yielding.
-///
-/// A single call to `poll_next` may potentially do a lot of work before
-/// yielding. This happens in particular if the underlying futures are awoken
-/// frequently but continue to return `Pending`. This is problematic if other
-/// tasks are waiting on the executor, since they do not get to run. This value
-/// caps the number of calls to `poll` on underlying futures a single call to
-/// `poll_next` is allowed to make.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
-/// that amortize wakeup and scheduling costs, but low enough that we do not
-/// starve other tasks for long.
-///
-/// See also https://github.com/rust-lang/futures-rs/issues/2047.
-const YIELD_EVERY: usize = 32;
/// A set of futures which may complete in any order.
///
@@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
+ // Variable to determine how many times it is allowed to poll underlying
+ // futures without yielding.
+ //
+ // A single call to `poll_next` may potentially do a lot of work before
+ // yielding. This happens in particular if the underlying futures are awoken
+ // frequently but continue to return `Pending`. This is problematic if other
+ // tasks are waiting on the executor, since they do not get to run. This value
+ // caps the number of calls to `poll` on underlying futures a single call to
+ // `poll_next` is allowed to make.
+ //
+ // The value is the length of FuturesUnordered. This ensures that each
+ // future is polled only once at most per iteration.
+ //
+ // See also https://github.com/rust-lang/futures-rs/issues/2047.
+ let yield_every = self.len();
+
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
@@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);
- if polled == YIELD_EVERY {
+ if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
diff --git a/src/stream/iter.rs b/src/stream/iter.rs
index cab8cd8..033dae1 100644
--- a/src/stream/iter.rs
+++ b/src/stream/iter.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
@@ -28,9 +29,9 @@ impl<I> Unpin for Iter<I> {}
pub fn iter<I>(i: I) -> Iter<I::IntoIter>
where I: IntoIterator,
{
- Iter {
+ assert_stream::<I::Item, _>(Iter {
iter: i.into_iter(),
- }
+ })
}
impl<I> Stream for Iter<I>
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index a5624ba..f3b2baa 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -109,11 +109,11 @@ cfg_target_has_atomic! {
pub use self::select_all::{select_all, SelectAll};
}
-// Just a helper function to ensure the futures we're returning all have the
+// Just a helper function to ensure the streams we're returning all have the
// right implementations.
pub(crate) fn assert_stream<T, S>(stream: S) -> S
- where
- S: Stream<Item = T>,
+where
+ S: Stream<Item = T>,
{
stream
}
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 318de07..e16fe00 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
@@ -17,7 +18,7 @@ use pin_project_lite::pin_project;
/// # });
/// ```
pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
- Once::new(future)
+ assert_stream::<Fut::Output, _>(Once::new(future))
}
pin_project! {
diff --git a/src/stream/pending.rs b/src/stream/pending.rs
index ca793c1..d7030ff 100644
--- a/src/stream/pending.rs
+++ b/src/stream/pending.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::marker;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -14,7 +15,7 @@ pub struct Pending<T> {
///
/// The returned stream will always return `Pending` when polled.
pub fn pending<T>() -> Pending<T> {
- Pending { _data: marker::PhantomData }
+ assert_stream::<T, _>(Pending { _data: marker::PhantomData })
}
impl<T> Unpin for Pending<T> {}
diff --git a/src/stream/poll_fn.rs b/src/stream/poll_fn.rs
index e33ca57..b9bd7d1 100644
--- a/src/stream/poll_fn.rs
+++ b/src/stream/poll_fn.rs
@@ -1,5 +1,6 @@
//! Definition of the `PollFn` combinator
+use super::assert_stream;
use core::fmt;
use core::pin::Pin;
use futures_core::stream::Stream;
@@ -41,7 +42,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
- PollFn { f }
+ assert_stream::<T, _>(PollFn { f })
}
impl<T, F> Stream for PollFn<F>
diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs
index 6a2637d..cf9f21b 100644
--- a/src/stream/repeat.rs
+++ b/src/stream/repeat.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
@@ -26,7 +27,7 @@ pub struct Repeat<T> {
pub fn repeat<T>(item: T) -> Repeat<T>
where T: Clone
{
- Repeat { item }
+ assert_stream::<T, _>(Repeat { item })
}
impl<T> Unpin for Repeat<T> {}
diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs
index eb3313d..0255643 100644
--- a/src/stream/repeat_with.rs
+++ b/src/stream/repeat_with.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
@@ -89,5 +90,5 @@ impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F>
/// # });
/// ```
pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> {
- RepeatWith { repeater }
+ assert_stream::<A, _>(RepeatWith { repeater })
}
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 2b7ebec..2942494 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use crate::stream::{StreamExt, Fuse};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -31,11 +32,11 @@ pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>
{
- Select {
+ assert_stream::<St1::Item, _>(Select {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
- }
+ })
}
impl<St1, St2> Select<St1, St2> {
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs
index 00368bb..c0b92fa 100644
--- a/src/stream/select_all.rs
+++ b/src/stream/select_all.rs
@@ -8,6 +8,7 @@ use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
+use super::assert_stream;
use crate::stream::{StreamExt, StreamFuture, FuturesUnordered};
/// An unbounded set of streams
@@ -124,7 +125,7 @@ pub fn select_all<I>(streams: I) -> SelectAll<I::Item>
set.push(stream);
}
- set
+ assert_stream::<<I::Item as Stream>::Item, _>(set)
}
impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> {
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index b1b4384..c3340ec 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -4,8 +4,11 @@
//! including the `StreamExt` trait which adds methods to `Stream` types.
use crate::future::{assert_future, Either};
+use crate::stream::assert_stream;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
+#[cfg(feature = "alloc")]
+use alloc::vec::Vec;
use core::pin::Pin;
#[cfg(feature = "sink")]
use futures_core::stream::TryStream;
@@ -19,7 +22,7 @@ use futures_core::{
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use crate::fns::{InspectFn, inspect_fn};
+use crate::fns::{inspect_fn, InspectFn};
mod chain;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -201,7 +204,6 @@ mod catch_unwind;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::catch_unwind::CatchUnwind;
-use crate::stream::assert_stream;
impl<T: ?Sized> StreamExt for T where T: Stream {}
@@ -689,7 +691,7 @@ pub trait StreamExt: Stream {
U: Stream,
Self: Sized,
{
- FlatMap::new(self, f)
+ assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
/// Combinator similar to [`StreamExt::fold`] that holds internal state
@@ -722,7 +724,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = Option<B>>,
Self: Sized,
{
- Scan::new(self, initial_state, f)
+ assert_stream::<B, _>(Scan::new(self, initial_state, f))
}
/// Skip elements on this stream while the provided asynchronous predicate
@@ -793,7 +795,7 @@ pub trait StreamExt: Stream {
/// this stream combinator will always return that the stream is done.
///
/// The stopping future may return any type. Once the stream is stopped
- /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`.
+ /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
/// The stream may also be resumed with `TakeUntil::take_future()`.
/// See the documentation of [`TakeUntil`] for more information.
///
@@ -827,7 +829,7 @@ pub trait StreamExt: Stream {
Fut: Future,
Self: Sized,
{
- TakeUntil::new(self, fut)
+ assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
}
/// Runs this stream to completion, executing the provided asynchronous
@@ -1289,7 +1291,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity))
+ assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
}
/// An adaptor for chunking up ready items of the stream inside a vector.
@@ -1312,10 +1314,10 @@ pub trait StreamExt: Stream {
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
- where
- Self: Sized,
+ where
+ Self: Sized,
{
- ReadyChunks::new(self, capacity)
+ assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
}
/// A future that completes after the given stream has been fully processed
@@ -1334,7 +1336,10 @@ pub trait StreamExt: Stream {
where
S: Sink<Self::Ok, Error = Self::Error>,
Self: TryStream + Sized,
+ // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
{
+ // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
+ // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
Forward::new(self, sink)
}
@@ -1356,7 +1361,10 @@ pub trait StreamExt: Stream {
Self: Sink<Item> + Sized,
{
let (sink, stream) = split::split(self);
- (sink, assert_stream::<Self::Item, _>(stream))
+ (
+ crate::sink::assert_sink::<Item, Self::Error, _>(sink),
+ assert_stream::<Self::Item, _>(stream),
+ )
}
/// Do something with each item of this stream, afterwards passing it on.
@@ -1459,6 +1467,6 @@ pub trait StreamExt: Stream {
where
Self: Unpin + FusedStream,
{
- SelectNextSome::new(self)
+ assert_future::<Self::Item, _>(SelectNextSome::new(self))
}
}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 6a48a4c..b7353d9 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -14,7 +14,9 @@ use futures_core::{
use crate::fns::{
InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn,
};
+use crate::future::assert_future;
use crate::stream::{Map, Inspect};
+use crate::stream::assert_stream;
mod and_then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -135,8 +137,6 @@ mod into_async_read;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_async_read::IntoAsyncRead;
-use crate::future::assert_future;
-use crate::stream::assert_stream;
impl<S: ?Sized + TryStream> TryStreamExt for S {}
@@ -471,7 +471,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
{
- TryTakeWhile::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
}
/// Attempts to run this stream to completion, executing the provided asynchronous
@@ -919,7 +919,7 @@ pub trait TryStreamExt: TryStream {
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
{
- TryBuffered::new(self, n)
+ assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n))
}
// TODO: false positive warning from rustdoc. Verify once #43466 settles
@@ -997,6 +997,6 @@ pub trait TryStreamExt: TryStream {
Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
Self::Ok: AsRef<[u8]>,
{
- IntoAsyncRead::new(self)
+ crate::io::assert_read(IntoAsyncRead::new(self))
}
}
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index c8fc421..258c18e 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
@@ -60,11 +61,11 @@ where
F: FnMut(T) -> Fut,
Fut: TryFuture<Ok = Option<(Item, T)>>,
{
- TryUnfold {
+ assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold {
f,
state: Some(init),
fut: None,
- }
+ })
}
pin_project! {
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 473bb67..e17d465 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use crate::unfold_state::UnfoldState;
use core::fmt;
use core::pin::Pin;
@@ -51,10 +52,10 @@ where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
- Unfold {
+ assert_stream::<Item, _>(Unfold {
f,
state: UnfoldState::Value { value: init },
- }
+ })
}
pin_project! {
diff --git a/src/task/mod.rs b/src/task/mod.rs
index 77e5a96..dd1515c 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -10,7 +10,8 @@
//! The remaining types and traits in the module are used for implementing
//! executors or dealing with synchronization issues around task wakeup.
-pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
+#[doc(no_inline)]
+pub use core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
pub use futures_task::{
Spawn, LocalSpawn, SpawnError,