aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChih-Hung Hsieh <chh@google.com>2020-10-25 23:16:22 -0700
committerChih-Hung Hsieh <chh@google.com>2020-10-25 23:16:22 -0700
commitd45e96efa48a28ad7d831aa7da55f4b6fb9aa22d (patch)
treeaed583542acdae126b0bf494e7a48c8bdbad44cd /src
parent10466ab5740c4fd628db8b98d4150f4867d09d38 (diff)
downloadfutures-util-d45e96efa48a28ad7d831aa7da55f4b6fb9aa22d.tar.gz
Upgrade rust/crates/futures-util to 0.3.7
Test: make all rust crates Change-Id: I9af426348ba630949d7f3facc0717f439cfe5a7e
Diffstat (limited to 'src')
-rw-r--r--src/async_await/join_mod.rs20
-rw-r--r--src/async_await/mod.rs14
-rw-r--r--src/async_await/pending.rs2
-rw-r--r--src/async_await/poll.rs2
-rw-r--r--src/async_await/select_mod.rs10
-rw-r--r--src/compat/compat01as03.rs6
-rw-r--r--src/compat/compat03as01.rs2
-rw-r--r--src/compat/mod.rs3
-rw-r--r--src/fns.rs3
-rw-r--r--src/future/either.rs103
-rw-r--r--src/future/future/flatten.rs48
-rw-r--r--src/future/future/fuse.rs2
-rw-r--r--src/future/future/map.rs53
-rw-r--r--src/future/future/mod.rs46
-rw-r--r--src/future/future/remote_handle.rs16
-rw-r--r--src/future/join_all.rs6
-rw-r--r--src/future/maybe_done.rs42
-rw-r--r--src/future/mod.rs4
-rw-r--r--src/future/select.rs28
-rw-r--r--src/future/try_future/mod.rs25
-rw-r--r--src/future/try_future/try_flatten.rs48
-rw-r--r--src/future/try_future/try_flatten_err.rs12
-rw-r--r--src/future/try_maybe_done.rs40
-rw-r--r--src/io/buf_reader.rs24
-rw-r--r--src/io/buf_writer.rs20
-rw-r--r--src/io/chain.rs54
-rw-r--r--src/io/copy_buf.rs18
-rw-r--r--src/io/fill_buf.rs50
-rw-r--r--src/io/into_sink.rs13
-rw-r--r--src/io/lines.rs20
-rw-r--r--src/io/mod.rs79
-rw-r--r--src/io/take.rs34
-rw-r--r--src/io/write_all_vectored.rs3
-rw-r--r--src/lib.rs57
-rw-r--r--src/lock/bilock.rs5
-rw-r--r--src/lock/mod.rs2
-rw-r--r--src/lock/mutex.rs14
-rw-r--r--src/sink/buffer.rs16
-rw-r--r--src/sink/err_into.rs2
-rw-r--r--src/sink/fanout.rs40
-rw-r--r--src/sink/mod.rs1
-rw-r--r--src/sink/with.rs20
-rw-r--r--src/sink/with_flat_map.rs32
-rw-r--r--src/stream/futures_ordered.rs8
-rw-r--r--src/stream/futures_unordered/task.rs2
-rw-r--r--src/stream/mod.rs14
-rw-r--r--src/stream/once.rs12
-rw-r--r--src/stream/select.rs19
-rw-r--r--src/stream/stream/buffer_unordered.rs16
-rw-r--r--src/stream/stream/buffered.rs16
-rw-r--r--src/stream/stream/catch_unwind.rs12
-rw-r--r--src/stream/stream/chain.rs12
-rw-r--r--src/stream/stream/chunks.rs16
-rw-r--r--src/stream/stream/collect.rs10
-rw-r--r--src/stream/stream/concat.rs14
-rw-r--r--src/stream/stream/enumerate.rs12
-rw-r--r--src/stream/stream/filter.rs22
-rw-r--r--src/stream/stream/filter_map.rs14
-rw-r--r--src/stream/stream/flatten.rs14
-rw-r--r--src/stream/stream/fold.rs20
-rw-r--r--src/stream/stream/for_each.rs14
-rw-r--r--src/stream/stream/for_each_concurrent.rs18
-rw-r--r--src/stream/stream/forward.rs10
-rw-r--r--src/stream/stream/fuse.rs12
-rw-r--r--src/stream/stream/into_future.rs2
-rw-r--r--src/stream/stream/map.rs10
-rw-r--r--src/stream/stream/mod.rs77
-rw-r--r--src/stream/stream/peek.rs22
-rw-r--r--src/stream/stream/ready_chunks.rs22
-rw-r--r--src/stream/stream/scan.rs18
-rw-r--r--src/stream/stream/skip.rs15
-rw-r--r--src/stream/stream/skip_while.rs24
-rw-r--r--src/stream/stream/split.rs3
-rw-r--r--src/stream/stream/take.rs12
-rw-r--r--src/stream/stream/take_until.rs18
-rw-r--r--src/stream/stream/take_while.rs20
-rw-r--r--src/stream/stream/then.rs16
-rw-r--r--src/stream/stream/zip.rs30
-rw-r--r--src/stream/try_stream/and_then.rs14
-rw-r--r--src/stream/try_stream/into_async_read.rs1
-rw-r--r--src/stream/try_stream/mod.rs85
-rw-r--r--src/stream/try_stream/or_else.rs14
-rw-r--r--src/stream/try_stream/try_buffer_unordered.rs16
-rw-r--r--src/stream/try_stream/try_collect.rs12
-rw-r--r--src/stream/try_stream/try_concat.rs15
-rw-r--r--src/stream/try_stream/try_filter.rs23
-rw-r--r--src/stream/try_stream/try_filter_map.rs17
-rw-r--r--src/stream/try_stream/try_flatten.rs15
-rw-r--r--src/stream/try_stream/try_fold.rs21
-rw-r--r--src/stream/try_stream/try_for_each.rs14
-rw-r--r--src/stream/try_stream/try_for_each_concurrent.rs26
-rw-r--r--src/stream/try_stream/try_skip_while.rs24
-rw-r--r--src/stream/try_stream/try_take_while.rs132
-rw-r--r--src/stream/try_stream/try_unfold.rs18
-rw-r--r--src/stream/unfold.rs16
-rw-r--r--src/task/spawn.rs3
96 files changed, 1119 insertions, 962 deletions
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index 909cd3b..4200c08 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -22,8 +22,13 @@ macro_rules! document_join_macro {
///
/// let a = async { 1 };
/// let b = async { 2 };
- ///
/// assert_eq!(join!(a, b), (1, 2));
+ ///
+ /// // `join!` is variadic, so you can pass any number of futures
+ /// let c = async { 3 };
+ /// let d = async { 4 };
+ /// let e = async { 5 };
+ /// assert_eq!(join!(c, d, e), (3, 4, 5));
/// # });
/// ```
$join
@@ -48,9 +53,14 @@ macro_rules! document_join_macro {
/// use futures::try_join;
///
/// let a = async { Ok::<i32, i32>(1) };
- /// let b = async { Ok::<u64, i32>(2) };
- ///
+ /// let b = async { Ok::<i32, i32>(2) };
/// assert_eq!(try_join!(a, b), Ok((1, 2)));
+ ///
+ /// // `try_join!` is variadic, so you can pass any number of futures
+ /// let c = async { Ok::<i32, i32>(3) };
+ /// let d = async { Ok::<i32, i32>(4) };
+ /// let e = async { Ok::<i32, i32>(5) };
+ /// assert_eq!(try_join!(c, d, e), Ok((3, 4, 5)));
/// # });
/// ```
///
@@ -83,7 +93,7 @@ document_join_macro! {
#[macro_export]
macro_rules! join {
($($tokens:tt)*) => {{
- use $crate::__reexport as __futures_crate;
+ use $crate::__private as __futures_crate;
$crate::join_internal! {
$( $tokens )*
}
@@ -93,7 +103,7 @@ document_join_macro! {
#[macro_export]
macro_rules! try_join {
($($tokens:tt)*) => {{
- use $crate::__reexport as __futures_crate;
+ use $crate::__private as __futures_crate;
$crate::try_join_internal! {
$( $tokens )*
}
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs
index 69cae13..bdaed95 100644
--- a/src/async_await/mod.rs
+++ b/src/async_await/mod.rs
@@ -3,37 +3,37 @@
//! This module contains a number of functions and combinators for working
//! with `async`/`await` code.
-use futures_core::future::Future;
-use futures_core::stream::Stream;
-
-#[doc(hidden)]
-pub use futures_core::future::FusedFuture;
-#[doc(hidden)]
-pub use futures_core::stream::FusedStream;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::stream::{Stream, FusedStream};
#[macro_use]
mod poll;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
pub use self::poll::*;
#[macro_use]
mod pending;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
pub use self::pending::*;
// Primary export is a macro
#[cfg(feature = "async-await-macro")]
mod join_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "async-await-macro")]
pub use self::join_mod::*;
// Primary export is a macro
#[cfg(feature = "async-await-macro")]
mod select_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "async-await-macro")]
pub use self::select_mod::*;
#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
mod random;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
#[cfg(feature = "std")]
#[cfg(feature = "async-await-macro")]
pub use self::random::*;
diff --git a/src/async_await/pending.rs b/src/async_await/pending.rs
index b143869..e0cf341 100644
--- a/src/async_await/pending.rs
+++ b/src/async_await/pending.rs
@@ -15,7 +15,7 @@ use futures_core::task::{Context, Poll};
#[macro_export]
macro_rules! pending {
() => {
- $crate::async_await::pending_once().await
+ $crate::__private::async_await::pending_once().await
}
}
diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs
index dffa94b..ac70a53 100644
--- a/src/async_await/poll.rs
+++ b/src/async_await/poll.rs
@@ -12,7 +12,7 @@ use futures_core::task::{Context, Poll};
#[macro_export]
macro_rules! poll {
($x:expr $(,)?) => {
- $crate::async_await::poll($x).await
+ $crate::__private::async_await::poll($x).await
}
}
diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs
index 0471f09..47eca4d 100644
--- a/src/async_await/select_mod.rs
+++ b/src/async_await/select_mod.rs
@@ -14,7 +14,7 @@ macro_rules! document_select_macro {
/// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin`
/// requirement is relaxed, since the macro will pin the resulting `Future`
/// on the stack. However the `Future` returned by the expression must
- /// still implement `FusedFuture`. This difference is presented
+ /// still implement `FusedFuture`.
///
/// Futures and streams which are not already fused can be fused using the
/// `.fuse()` method. Note, though, that fusing a future or stream directly
@@ -84,7 +84,7 @@ macro_rules! document_select_macro {
/// a_res = async_identity_fn(62).fuse() => a_res + 1,
/// b_res = async_identity_fn(13).fuse() => b_res,
/// };
- /// assert!(res == 63 || res == 12);
+ /// assert!(res == 63 || res == 13);
/// # });
/// ```
///
@@ -167,7 +167,7 @@ macro_rules! document_select_macro {
/// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin`
/// requirement is relaxed, since the macro will pin the resulting `Future`
/// on the stack. However the `Future` returned by the expression must
- /// still implement `FusedFuture`. This difference is presented
+ /// still implement `FusedFuture`.
///
/// Futures and streams which are not already fused can be fused using the
/// `.fuse()` method. Note, though, that fusing a future or stream directly
@@ -322,7 +322,7 @@ document_select_macro! {
#[macro_export]
macro_rules! select {
($($tokens:tt)*) => {{
- use $crate::__reexport as __futures_crate;
+ use $crate::__private as __futures_crate;
$crate::select_internal! {
$( $tokens )*
}
@@ -332,7 +332,7 @@ document_select_macro! {
#[macro_export]
macro_rules! select_biased {
($($tokens:tt)*) => {{
- use $crate::__reexport as __futures_crate;
+ use $crate::__private as __futures_crate;
$crate::select_biased_internal! {
$( $tokens )*
}
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
index 562a7ad..95025d2 100644
--- a/src/compat/compat01as03.rs
+++ b/src/compat/compat01as03.rs
@@ -15,6 +15,7 @@ use std::task::Context;
use futures_sink::Sink as Sink03;
#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
@@ -115,6 +116,7 @@ impl<St: Stream01> Stream01CompatExt for St {}
/// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub trait Sink01CompatExt: Sink01 {
/// Converts a futures 0.1
/// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
@@ -180,6 +182,7 @@ impl<St: Stream01> Stream03 for Compat01As03<St> {
/// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Compat01As03Sink<S, SinkItem> {
@@ -366,6 +369,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
}
#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
mod io {
use super::*;
#[cfg(feature = "read-initializer")]
@@ -375,6 +379,7 @@ mod io {
use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
/// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
pub trait AsyncRead01CompatExt: AsyncRead01 {
/// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
/// [`AsyncRead`](futures_io::AsyncRead).
@@ -403,6 +408,7 @@ mod io {
impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
/// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
pub trait AsyncWrite01CompatExt: AsyncWrite01 {
/// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
/// [`AsyncWrite`](futures_io::AsyncWrite).
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
index 3fd2ae0..4841c5e 100644
--- a/src/compat/compat03as01.rs
+++ b/src/compat/compat03as01.rs
@@ -40,6 +40,7 @@ pub struct Compat<T> {
/// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
/// [`Sink`](futures_01::sink::Sink).
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct CompatSink<T, Item> {
@@ -236,6 +237,7 @@ where
}
#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
mod io {
use super::*;
use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
diff --git a/src/compat/mod.rs b/src/compat/mod.rs
index 1826836..897a50a 100644
--- a/src/compat/mod.rs
+++ b/src/compat/mod.rs
@@ -9,11 +9,14 @@ pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03};
mod compat01as03;
pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt};
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::compat01as03::{Compat01As03Sink, Sink01CompatExt};
#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
mod compat03as01;
pub use self::compat03as01::Compat;
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::compat03as01::CompatSink;
diff --git a/src/fns.rs b/src/fns.rs
index 2b4e3c6..6908bff 100644
--- a/src/fns.rs
+++ b/src/fns.rs
@@ -140,6 +140,7 @@ trivial_fn_impls!(merge_result_fn <> MergeResultFn = "merge_result");
#[derive(Debug, Copy, Clone, Default)]
pub struct InspectFn<F>(F);
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<F, A> FnOnce1<A> for InspectFn<F>
where
F: for<'a> FnOnce1<&'a A, Output=()>,
@@ -150,6 +151,7 @@ where
arg
}
}
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<F, A> FnMut1<A> for InspectFn<F>
where
F: for<'a> FnMut1<&'a A, Output=()>,
@@ -159,6 +161,7 @@ where
arg
}
}
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<F, A> Fn1<A> for InspectFn<F>
where
F: for<'a> Fn1<&'a A, Output=()>,
diff --git a/src/future/either.rs b/src/future/either.rs
index be28829..aa17fa7 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -4,11 +4,11 @@ use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
-#[pin_project]
+#[pin_project(project = EitherProj)]
#[derive(Debug, Clone)]
pub enum Either<A, B> {
/// First branch of the type
@@ -58,12 +58,10 @@ where
{
type Output = A::Output;
- #[project]
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<A::Output> {
- #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
- Either::Left(x) => x.poll(cx),
- Either::Right(x) => x.poll(cx),
+ EitherProj::Left(x) => x.poll(cx),
+ EitherProj::Right(x) => x.poll(cx),
}
}
}
@@ -88,12 +86,10 @@ where
{
type Item = A::Item;
- #[project]
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<A::Item>> {
- #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
- Either::Left(x) => x.poll_next(cx),
- Either::Right(x) => x.poll_next(cx),
+ EitherProj::Left(x) => x.poll_next(cx),
+ EitherProj::Right(x) => x.poll_next(cx),
}
}
}
@@ -119,39 +115,31 @@ where
{
type Error = A::Error;
- #[project]
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_ready(cx),
- Either::Right(x) => x.poll_ready(cx),
+ EitherProj::Left(x) => x.poll_ready(cx),
+ EitherProj::Right(x) => x.poll_ready(cx),
}
}
- #[project]
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- #[project]
match self.project() {
- Either::Left(x) => x.start_send(item),
- Either::Right(x) => x.start_send(item),
+ EitherProj::Left(x) => x.start_send(item),
+ EitherProj::Right(x) => x.start_send(item),
}
}
- #[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_flush(cx),
- Either::Right(x) => x.poll_flush(cx),
+ EitherProj::Left(x) => x.poll_flush(cx),
+ EitherProj::Right(x) => x.poll_flush(cx),
}
}
- #[project]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_close(cx),
- Either::Right(x) => x.poll_close(cx),
+ EitherProj::Left(x) => x.poll_close(cx),
+ EitherProj::Right(x) => x.poll_close(cx),
}
}
}
@@ -182,29 +170,25 @@ mod if_std {
}
}
- #[project]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_read(cx, buf),
- Either::Right(x) => x.poll_read(cx, buf),
+ EitherProj::Left(x) => x.poll_read(cx, buf),
+ EitherProj::Right(x) => x.poll_read(cx, buf),
}
}
- #[project]
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_read_vectored(cx, bufs),
- Either::Right(x) => x.poll_read_vectored(cx, bufs),
+ EitherProj::Left(x) => x.poll_read_vectored(cx, bufs),
+ EitherProj::Right(x) => x.poll_read_vectored(cx, bufs),
}
}
}
@@ -214,47 +198,39 @@ mod if_std {
A: AsyncWrite,
B: AsyncWrite,
{
- #[project]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_write(cx, buf),
- Either::Right(x) => x.poll_write(cx, buf),
+ EitherProj::Left(x) => x.poll_write(cx, buf),
+ EitherProj::Right(x) => x.poll_write(cx, buf),
}
}
- #[project]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_write_vectored(cx, bufs),
- Either::Right(x) => x.poll_write_vectored(cx, bufs),
+ EitherProj::Left(x) => x.poll_write_vectored(cx, bufs),
+ EitherProj::Right(x) => x.poll_write_vectored(cx, bufs),
}
}
- #[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_flush(cx),
- Either::Right(x) => x.poll_flush(cx),
+ EitherProj::Left(x) => x.poll_flush(cx),
+ EitherProj::Right(x) => x.poll_flush(cx),
}
}
- #[project]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_close(cx),
- Either::Right(x) => x.poll_close(cx),
+ EitherProj::Left(x) => x.poll_close(cx),
+ EitherProj::Right(x) => x.poll_close(cx),
}
}
}
@@ -264,16 +240,14 @@ mod if_std {
A: AsyncSeek,
B: AsyncSeek,
{
- #[project]
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
- #[project]
match self.project() {
- Either::Left(x) => x.poll_seek(cx, pos),
- Either::Right(x) => x.poll_seek(cx, pos),
+ EitherProj::Left(x) => x.poll_seek(cx, pos),
+ EitherProj::Right(x) => x.poll_seek(cx, pos),
}
}
}
@@ -283,24 +257,17 @@ mod if_std {
A: AsyncBufRead,
B: AsyncBufRead,
{
- #[project]
- fn poll_fill_buf(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<&[u8]>> {
- #[project]
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
match self.project() {
- Either::Left(x) => x.poll_fill_buf(cx),
- Either::Right(x) => x.poll_fill_buf(cx),
+ EitherProj::Left(x) => x.poll_fill_buf(cx),
+ EitherProj::Right(x) => x.poll_fill_buf(cx),
}
}
- #[project]
fn consume(self: Pin<&mut Self>, amt: usize) {
- #[project]
match self.project() {
- Either::Left(x) => x.consume(amt),
- Either::Right(x) => x.consume(amt),
+ EitherProj::Left(x) => x.consume(amt),
+ EitherProj::Right(x) => x.consume(amt),
}
}
}
diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs
index f59464c..53f75e2 100644
--- a/src/future/future/flatten.rs
+++ b/src/future/future/flatten.rs
@@ -4,9 +4,9 @@ use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
-#[pin_project]
+#[pin_project(project = FlattenProj)]
#[derive(Debug)]
pub enum Flatten<Fut1, Fut2> {
First(#[pin] Fut1),
@@ -38,21 +38,19 @@ impl<Fut> Future for Flatten<Fut, Fut::Output>
{
type Output = <Fut::Output as Future>::Output;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- Flatten::First(f) => {
+ FlattenProj::First(f) => {
let f = ready!(f.poll(cx));
self.set(Flatten::Second(f));
},
- Flatten::Second(f) => {
+ FlattenProj::Second(f) => {
let output = ready!(f.poll(cx));
self.set(Flatten::Empty);
break output;
},
- Flatten::Empty => panic!("Flatten polled after completion"),
+ FlattenProj::Empty => panic!("Flatten polled after completion"),
}
})
}
@@ -76,23 +74,21 @@ impl<Fut> Stream for Flatten<Fut, Fut::Output>
{
type Item = <Fut::Output as Stream>::Item;
- #[project]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- Flatten::First(f) => {
+ FlattenProj::First(f) => {
let f = ready!(f.poll(cx));
self.set(Flatten::Second(f));
},
- Flatten::Second(f) => {
+ FlattenProj::Second(f) => {
let output = ready!(f.poll_next(cx));
if output.is_none() {
self.set(Flatten::Empty);
}
break output;
},
- Flatten::Empty => break None,
+ FlattenProj::Empty => break None,
}
})
}
@@ -107,54 +103,46 @@ where
{
type Error = <Fut::Output as Sink<Item>>::Error;
- #[project]
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- Flatten::First(f) => {
+ FlattenProj::First(f) => {
let f = ready!(f.poll(cx));
self.set(Flatten::Second(f));
},
- Flatten::Second(f) => {
+ FlattenProj::Second(f) => {
break ready!(f.poll_ready(cx));
},
- Flatten::Empty => panic!("poll_ready called after eof"),
+ FlattenProj::Empty => panic!("poll_ready called after eof"),
}
})
}
- #[project]
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- #[project]
match self.project() {
- Flatten::First(_) => panic!("poll_ready not called first"),
- Flatten::Second(f) => f.start_send(item),
- Flatten::Empty => panic!("start_send called after eof"),
+ FlattenProj::First(_) => panic!("poll_ready not called first"),
+ FlattenProj::Second(f) => f.start_send(item),
+ FlattenProj::Empty => panic!("start_send called after eof"),
}
}
- #[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- #[project]
match self.project() {
- Flatten::First(_) => Poll::Ready(Ok(())),
- Flatten::Second(f) => f.poll_flush(cx),
- Flatten::Empty => panic!("poll_flush called after eof"),
+ FlattenProj::First(_) => Poll::Ready(Ok(())),
+ FlattenProj::Second(f) => f.poll_flush(cx),
+ FlattenProj::Empty => panic!("poll_flush called after eof"),
}
}
- #[project]
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- #[project]
let res = match self.as_mut().project() {
- Flatten::Second(f) => f.poll_close(cx),
+ FlattenProj::Second(f) => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
};
if res.is_ready() {
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs
index 69a8a69..9f2e1ca 100644
--- a/src/future/future/fuse.rs
+++ b/src/future/future/fuse.rs
@@ -38,7 +38,7 @@ impl<Fut: Future> Fuse<Fut> {
/// sender.unbounded_send(()).unwrap();
/// drop(sender);
///
- /// // Use `Fuse::termianted()` to create an already-terminated future
+ /// // Use `Fuse::terminated()` to create an already-terminated future
/// // which may be instantiated later.
/// let foo_printer = Fuse::terminated();
/// pin_mut!(foo_printer);
diff --git a/src/future/future/map.rs b/src/future/future/map.rs
index 080f871..8e7f636 100644
--- a/src/future/future/map.rs
+++ b/src/future/future/map.rs
@@ -1,13 +1,12 @@
use core::pin::Pin;
-use core::ptr;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use crate::fns::FnOnce1;
/// Internal Map future
-#[pin_project]
+#[pin_project(project = MapProj, project_replace = MapProjOwn)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Map<Fut, F> {
@@ -19,17 +18,6 @@ pub enum Map<Fut, F> {
Complete,
}
-// Helper type to mark a `Map` as complete without running its destructor.
-struct UnsafeMarkAsComplete<Fut, F>(*mut Map<Fut, F>);
-
-impl<Fut, F> Drop for UnsafeMarkAsComplete<Fut, F> {
- fn drop(&mut self) {
- unsafe {
- ptr::write(self.0, Map::Complete);
- }
- }
-}
-
impl<Fut, F> Map<Fut, F> {
/// Creates a new Map.
pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> {
@@ -55,35 +43,16 @@ impl<Fut, F, T> Future for Map<Fut, F>
{
type Output = T;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
- unsafe {
- // Store this pointer for later...
- let self_ptr: *mut Self = self.as_mut().get_unchecked_mut();
-
- match &mut *self_ptr {
- Map::Incomplete { future, f } => {
- let mut future = Pin::new_unchecked(future);
- let output = match future.as_mut().poll(cx) {
- Poll::Ready(x) => x,
- Poll::Pending => return Poll::Pending,
- };
-
- // Here be dragons
- let f = ptr::read(f);
- {
- // The ordering here is important, the call to `drop_in_place` must be
- // last as it may panic. Other lines must not panic.
- let _cleanup = UnsafeMarkAsComplete(self_ptr);
- ptr::drop_in_place(future.get_unchecked_mut());
- };
-
- // Phew, everything is back to normal, and we should be in the
- // `Complete` state!
- Poll::Ready(f.call_once(output))
- },
- Map::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
- }
+ match self.as_mut().project() {
+ MapProj::Incomplete { future, .. } => {
+ let output = ready!(future.poll(cx));
+ match self.project_replace(Map::Complete) {
+ MapProjOwn::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
+ MapProjOwn::Complete => unreachable!(),
+ }
+ },
+ MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
}
}
}
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs
index c3e035b..f5d5dd2 100644
--- a/src/future/future/mod.rs
+++ b/src/future/future/mod.rs
@@ -3,11 +3,14 @@
//! This module contains a number of functions for working with `Future`s,
//! including the `FutureExt` trait which adds methods to `Future` types.
-use super::{assert_future, Either};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::pin::Pin;
+use crate::future::{assert_future, Either};
+use crate::stream::assert_stream;
+use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
+use crate::never::Never;
#[cfg(feature = "alloc")]
use futures_core::future::{BoxFuture, LocalBoxFuture};
use futures_core::{
@@ -15,8 +18,7 @@ use futures_core::{
stream::Stream,
task::{Context, Poll},
};
-use crate::never::Never;
-use crate::fns::{OkFn, ok_fn, IntoFn, into_fn, InspectFn, inspect_fn};
+use pin_utils::pin_mut;
// Combinators
@@ -44,7 +46,7 @@ delegate_all!(
pub use fuse::Fuse;
delegate_all!(
- /// Future for the [`flatten`](super::FutureExt::flatten) method.
+ /// Future for the [`map`](super::FutureExt::map) method.
Map<Fut, F>(
map::Map<Fut, F>
): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)]
@@ -99,9 +101,11 @@ mod catch_unwind;
pub use self::catch_unwind::CatchUnwind;
#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
mod remote_handle;
#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::remote_handle::{Remote, RemoteHandle};
@@ -220,7 +224,7 @@ pub trait FutureExt: Future {
B: Future<Output = Self::Output>,
Self: Sized,
{
- Either::Left(self)
+ assert_future::<Self::Output, _>(Either::Left(self))
}
/// Wrap this future in an `Either` future, making it the right-hand variant
@@ -250,7 +254,7 @@ pub trait FutureExt: Future {
A: Future<Output = Self::Output>,
Self: Sized,
{
- Either::Right(self)
+ assert_future::<Self::Output, _>(Either::Right(self))
}
/// Convert this future into a single element stream.
@@ -275,7 +279,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
- IntoStream::new(self)
+ assert_stream::<Self::Output, _>(IntoStream::new(self))
}
/// Flatten the execution of this future when the output of this
@@ -339,7 +343,7 @@ pub trait FutureExt: Future {
Self::Output: Stream,
Self: Sized,
{
- FlattenStream::new(self)
+ assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self))
}
/// Fuse a future such that `poll` will never again be called once it has
@@ -428,7 +432,9 @@ pub trait FutureExt: Future {
where
Self: Sized + ::std::panic::UnwindSafe,
{
- CatchUnwind::new(self)
+ assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new(
+ self,
+ ))
}
/// Create a cloneable handle to this future where all handles will resolve
@@ -482,7 +488,7 @@ pub trait FutureExt: Future {
Self: Sized,
Self::Output: Clone,
{
- Shared::new(self)
+ assert_future::<Self::Output, _>(Shared::new(self))
}
/// Turn this future into a future that yields `()` on completion and sends
@@ -494,6 +500,7 @@ pub trait FutureExt: Future {
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
#[cfg(feature = "channel")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
where
@@ -511,7 +518,7 @@ pub trait FutureExt: Future {
where
Self: Sized + Send + 'a,
{
- Box::pin(self)
+ assert_future::<Self::Output, _>(Box::pin(self))
}
/// Wrap the future in a Box, pinning it.
@@ -525,7 +532,7 @@ pub trait FutureExt: Future {
where
Self: Sized + 'a,
{
- Box::pin(self)
+ assert_future::<Self::Output, _>(Box::pin(self))
}
/// Turns a [`Future<Output = T>`](Future) into a
@@ -534,7 +541,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
- UnitError::new(self)
+ assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self))
}
/// Turns a [`Future<Output = T>`](Future) into a
@@ -543,7 +550,7 @@ pub trait FutureExt: Future {
where
Self: Sized,
{
- NeverError::new(self)
+ assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self))
}
/// A convenience for calling `Future::poll` on `Unpin` future types.
@@ -585,19 +592,16 @@ pub trait FutureExt: Future {
///
/// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar");
/// ```
- fn now_or_never(mut self) -> Option<Self::Output>
+ fn now_or_never(self) -> Option<Self::Output>
where
Self: Sized,
{
let noop_waker = crate::task::noop_waker();
let mut cx = Context::from_waker(&noop_waker);
- // SAFETY: This is safe because this method consumes the future, so `poll` is
- // only going to be called once. Thus it doesn't matter to us if the
- // future is `Unpin` or not.
- let pinned = unsafe { Pin::new_unchecked(&mut self) };
-
- match pinned.poll(&mut cx) {
+ let this = self;
+ pin_mut!(this);
+ match this.poll(&mut cx) {
Poll::Ready(x) => Some(x),
_ => None,
}
diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs
index 9495bec..598f63c 100644
--- a/src/future/future/remote_handle.rs
+++ b/src/future/future/remote_handle.rs
@@ -16,7 +16,7 @@ use {
},
thread,
},
- pin_project::{pin_project, project},
+ pin_project::pin_project,
};
/// The handle to a remote future returned by
@@ -37,6 +37,7 @@ use {
/// will unwind.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
pub struct RemoteHandle<T> {
rx: Receiver<thread::Result<T>>,
keep_running: Arc<AtomicBool>,
@@ -72,6 +73,7 @@ type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'stati
/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
pub struct Remote<Fut: Future> {
tx: Option<Sender<SendMsg<Fut>>>,
keep_running: Arc<AtomicBool>,
@@ -90,23 +92,21 @@ impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
impl<Fut: Future> Future for Remote<Fut> {
type Output = ();
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- #[project]
- let Remote { tx, keep_running, future } = self.project();
+ let this = self.project();
- if let Poll::Ready(_) = tx.as_mut().unwrap().poll_canceled(cx) {
- if !keep_running.load(Ordering::SeqCst) {
+ if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) {
+ if !this.keep_running.load(Ordering::SeqCst) {
// Cancelled, bail out
return Poll::Ready(())
}
}
- let output = ready!(future.poll(cx));
+ let output = ready!(this.future.poll(cx));
// if the receiving end has gone away then that's ok, we just ignore the
// send error here.
- drop(tx.take().unwrap().send(output));
+ drop(this.tx.take().unwrap().send(output));
Poll::Ready(())
}
}
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index df62f3a..0c8357c 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -56,8 +56,10 @@ where
///
/// This is purposefully a very simple API for basic use-cases. In a lot of
/// cases you will want to use the more powerful
-/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some
-/// examples of additional functionality that provides:
+/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
+/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
+///
+/// Some examples for additional functionality provided by these are:
///
/// * Adding new futures to the set even after it has been started.
///
diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs
index 71cb6fa..5120a9b 100644
--- a/src/future/maybe_done.rs
+++ b/src/future/maybe_done.rs
@@ -1,15 +1,14 @@
//! Definition of the MaybeDone combinator
-use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// A future that may have completed.
///
/// This is created by the [`maybe_done()`] function.
-#[pin_project]
+#[pin_project(project = MaybeDoneProj, project_replace = MaybeDoneProjOwn)]
#[derive(Debug)]
pub enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
@@ -47,12 +46,10 @@ impl<Fut: Future> MaybeDone<Fut> {
/// The output of this method will be [`Some`] if and only if the inner
/// future has been completed and [`take_output`](MaybeDone::take_output)
/// has not yet been called.
- #[project]
#[inline]
pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
- #[project]
match self.project() {
- MaybeDone::Done(res) => Some(res),
+ MaybeDoneProj::Done(res) => Some(res),
_ => None,
}
}
@@ -61,22 +58,13 @@ impl<Fut: Future> MaybeDone<Fut> {
/// towards completion.
#[inline]
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
- // Safety: we return immediately unless we are in the `Done`
- // state, which does not have any pinning guarantees to uphold.
- //
- // Hopefully `pin_project` will support this safely soon:
- // https://github.com/taiki-e/pin-project/issues/184
- unsafe {
- let this = self.get_unchecked_mut();
- match this {
- MaybeDone::Done(_) => {},
- MaybeDone::Future(_) | MaybeDone::Gone => return None,
- };
- if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
- Some(output)
- } else {
- unreachable!()
- }
+ match &*self {
+ MaybeDone::Done(_) => {}
+ MaybeDone::Future(_) | MaybeDone::Gone => return None,
+ }
+ match self.project_replace(MaybeDone::Gone) {
+ MaybeDoneProjOwn::Done(output) => Some(output),
+ _ => unreachable!(),
}
}
}
@@ -93,16 +81,14 @@ impl<Fut: Future> FusedFuture for MaybeDone<Fut> {
impl<Fut: Future> Future for MaybeDone<Fut> {
type Output = ();
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
match self.as_mut().project() {
- MaybeDone::Future(f) => {
+ MaybeDoneProj::Future(f) => {
let res = ready!(f.poll(cx));
self.set(MaybeDone::Done(res));
- },
- MaybeDone::Done(_) => {},
- MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+ }
+ MaybeDoneProj::Done(_) => {}
+ MaybeDoneProj::Gone => panic!("MaybeDone polled after value taken"),
}
Poll::Ready(())
}
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 7962894..3f19c19 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -23,6 +23,7 @@ pub use self::future::FlattenStream;
pub use self::future::CatchUnwind;
#[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
pub use self::future::{Remote, RemoteHandle};
@@ -36,6 +37,7 @@ pub use self::try_future::{
};
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::try_future::FlattenSink;
// Primitive futures
@@ -107,7 +109,7 @@ cfg_target_has_atomic! {
// Just a helper function to ensure the futures we're returning all have the
// right implementations.
-fn assert_future<T, F>(future: F) -> F
+pub(crate) fn assert_future<T, F>(future: F) -> F
where
F: Future<Output = T>,
{
diff --git a/src/future/select.rs b/src/future/select.rs
index 3e0a447..bc24779 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -27,6 +27,34 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
///
/// # Examples
///
+/// A simple example
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::{self, Either};
+/// use futures::pin_mut;
+///
+/// // These two futures have different types even though their outputs have the same type
+/// let future1 = async { 1 };
+/// let future2 = async { 2 };
+///
+/// // 'select' requires Future + Unpin bounds
+/// pin_mut!(future1);
+/// pin_mut!(future2);
+///
+/// let value = match future::select(future1, future2).await {
+/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
+/// // `_` represents `future2`
+/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
+/// // `_` represents `future1`
+/// };
+///
+/// assert!(value == 1 || value == 2);
+/// # });
+/// ```
+///
+/// A more complex example
+///
/// ```
/// use futures::future::{self, Either, Future, FutureExt};
///
diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs
index bd1ab33..1ce01d2 100644
--- a/src/future/try_future/mod.rs
+++ b/src/future/try_future/mod.rs
@@ -14,13 +14,13 @@ use futures_core::{
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use super::assert_future;
-use crate::future::{Map, Inspect};
use crate::fns::{
- MapOkFn, map_ok_fn, MapErrFn, map_err_fn, MapOkOrElseFn,
- map_ok_or_else_fn, IntoFn, UnwrapOrElseFn, unwrap_or_else_fn, InspectOkFn, inspect_ok_fn, InspectErrFn,
- inspect_err_fn, into_fn
+ inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn,
+ unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn,
+ UnwrapOrElseFn,
};
+use crate::future::{assert_future, Inspect, Map};
+use crate::stream::assert_stream;
// Combinators
mod into_future;
@@ -52,6 +52,7 @@ delegate_all!(
#[cfg(feature = "sink")]
delegate_all!(
/// Sink for the [`flatten_sink`](TryFutureExt::flatten_sink) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
FlattenSink<Fut, Si>(
try_flatten::TryFlatten<Fut, Si>
): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)]
@@ -166,6 +167,7 @@ pub trait TryFutureExt: TryFuture {
/// take_sink(fut.flatten_sink())
/// ```
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
where
Self::Ok: Sink<Item, Error = Self::Error>,
@@ -228,7 +230,7 @@ pub trait TryFutureExt: TryFuture {
/// The provided closure `f` will only be called if this future is resolved
/// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
/// the provided closure will never be invoked.
- ///
+ ///
/// The provided closure `e` will only be called if this future is resolved
/// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
/// the provided closure will never be invoked.
@@ -245,13 +247,13 @@ pub trait TryFutureExt: TryFuture {
/// let future = async { Ok::<i32, i32>(5) };
/// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
/// assert_eq!(future.await, 8);
- ///
+ ///
/// let future = async { Err::<i32, i32>(5) };
/// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
/// assert_eq!(future.await, 10);
/// # });
/// ```
- ///
+ ///
fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
where
F: FnOnce(Self::Ok) -> T,
@@ -532,7 +534,9 @@ pub trait TryFutureExt: TryFuture {
Self::Ok: TryStream<Error = Self::Error>,
Self: Sized,
{
- TryFlattenStream::new(self)
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new(
+ self,
+ ))
}
/// Unwraps this future's ouput, producing a future with this future's
@@ -568,6 +572,7 @@ pub trait TryFutureExt: TryFuture {
/// Wraps a [`TryFuture`] into a future compatable with libraries using
/// futures 0.1 future definitons. Requires the `compat` feature to enable.
#[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> Compat<Self>
where
Self: Sized + Unpin,
@@ -600,7 +605,7 @@ pub trait TryFutureExt: TryFuture {
where
Self: Sized,
{
- IntoFuture::new(self)
+ assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self))
}
/// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs
index 661d3ad..2bcadc5 100644
--- a/src/future/try_future/try_flatten.rs
+++ b/src/future/try_future/try_flatten.rs
@@ -4,9 +4,9 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
-#[pin_project]
+#[pin_project(project = TryFlattenProj)]
#[derive(Debug)]
pub enum TryFlatten<Fut1, Fut2> {
First(#[pin] Fut1),
@@ -38,12 +38,10 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
{
type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- TryFlatten::First(f) => {
+ TryFlattenProj::First(f) => {
match ready!(f.try_poll(cx)) {
Ok(f) => self.set(TryFlatten::Second(f)),
Err(e) => {
@@ -52,12 +50,12 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
}
}
},
- TryFlatten::Second(f) => {
+ TryFlattenProj::Second(f) => {
let output = ready!(f.try_poll(cx));
self.set(TryFlatten::Empty);
break output;
},
- TryFlatten::Empty => panic!("TryFlatten polled after completion"),
+ TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
}
})
}
@@ -81,12 +79,10 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
{
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
- #[project]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- TryFlatten::First(f) => {
+ TryFlattenProj::First(f) => {
match ready!(f.try_poll(cx)) {
Ok(f) => self.set(TryFlatten::Second(f)),
Err(e) => {
@@ -95,14 +91,14 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
}
}
},
- TryFlatten::Second(f) => {
+ TryFlattenProj::Second(f) => {
let output = ready!(f.try_poll_next(cx));
if output.is_none() {
self.set(TryFlatten::Empty);
}
break output;
},
- TryFlatten::Empty => break None,
+ TryFlattenProj::Empty => break None,
}
})
}
@@ -117,15 +113,13 @@ where
{
type Error = Fut::Error;
- #[project]
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- TryFlatten::First(f) => {
+ TryFlattenProj::First(f) => {
match ready!(f.try_poll(cx)) {
Ok(f) => self.set(TryFlatten::Second(f)),
Err(e) => {
@@ -134,42 +128,36 @@ where
}
}
},
- TryFlatten::Second(f) => {
+ TryFlattenProj::Second(f) => {
break ready!(f.poll_ready(cx));
},
- TryFlatten::Empty => panic!("poll_ready called after eof"),
+ TryFlattenProj::Empty => panic!("poll_ready called after eof"),
}
})
}
- #[project]
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- #[project]
match self.project() {
- TryFlatten::First(_) => panic!("poll_ready not called first"),
- TryFlatten::Second(f) => f.start_send(item),
- TryFlatten::Empty => panic!("start_send called after eof"),
+ TryFlattenProj::First(_) => panic!("poll_ready not called first"),
+ TryFlattenProj::Second(f) => f.start_send(item),
+ TryFlattenProj::Empty => panic!("start_send called after eof"),
}
}
- #[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- #[project]
match self.project() {
- TryFlatten::First(_) => Poll::Ready(Ok(())),
- TryFlatten::Second(f) => f.poll_flush(cx),
- TryFlatten::Empty => panic!("poll_flush called after eof"),
+ TryFlattenProj::First(_) => Poll::Ready(Ok(())),
+ TryFlattenProj::Second(f) => f.poll_flush(cx),
+ TryFlattenProj::Empty => panic!("poll_flush called after eof"),
}
}
- #[project]
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- #[project]
let res = match self.as_mut().project() {
- TryFlatten::Second(f) => f.poll_close(cx),
+ TryFlattenProj::Second(f) => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
};
if res.is_ready() {
diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs
index fbb413d..480f8c3 100644
--- a/src/future/try_future/try_flatten_err.rs
+++ b/src/future/try_future/try_flatten_err.rs
@@ -1,9 +1,9 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
-#[pin_project]
+#[pin_project(project = TryFlattenErrProj)]
#[derive(Debug)]
pub enum TryFlattenErr<Fut1, Fut2> {
First(#[pin] Fut1),
@@ -35,12 +35,10 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
{
type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
- #[project]
match self.as_mut().project() {
- TryFlattenErr::First(f) => {
+ TryFlattenErrProj::First(f) => {
match ready!(f.try_poll(cx)) {
Err(f) => self.set(TryFlattenErr::Second(f)),
Ok(e) => {
@@ -49,12 +47,12 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
}
}
},
- TryFlattenErr::Second(f) => {
+ TryFlattenErrProj::Second(f) => {
let output = ready!(f.try_poll(cx));
self.set(TryFlattenErr::Empty);
break output;
},
- TryFlattenErr::Empty => panic!("TryFlattenErr polled after completion"),
+ TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"),
}
})
}
diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs
index a249c5c..b38b038 100644
--- a/src/future/try_maybe_done.rs
+++ b/src/future/try_maybe_done.rs
@@ -1,15 +1,14 @@
//! Definition of the TryMaybeDone combinator
-use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// A future that may have completed with an error.
///
/// This is created by the [`try_maybe_done()`] function.
-#[pin_project]
+#[pin_project(project = TryMaybeDoneProj, project_replace = TryMaybeDoneProjOwn)]
#[derive(Debug)]
pub enum TryMaybeDone<Fut: TryFuture> {
/// A not-yet-completed future
@@ -32,12 +31,10 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
/// The output of this method will be [`Some`] if and only if the inner
/// future has completed successfully and [`take_output`](TryMaybeDone::take_output)
/// has not yet been called.
- #[project]
#[inline]
pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> {
- #[project]
match self.project() {
- TryMaybeDone::Done(res) => Some(res),
+ TryMaybeDoneProj::Done(res) => Some(res),
_ => None,
}
}
@@ -46,22 +43,13 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
/// towards completion.
#[inline]
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> {
- // Safety: we return immediately unless we are in the `Done`
- // state, which does not have any pinning guarantees to uphold.
- //
- // Hopefully `pin_project` will support this safely soon:
- // https://github.com/taiki-e/pin-project/issues/184
- unsafe {
- let this = self.get_unchecked_mut();
- match this {
- TryMaybeDone::Done(_) => {},
- TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None,
- };
- if let TryMaybeDone::Done(output) = mem::replace(this, TryMaybeDone::Gone) {
- Some(output)
- } else {
- unreachable!()
- }
+ match &*self {
+ TryMaybeDone::Done(_) => {},
+ TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None,
+ }
+ match self.project_replace(TryMaybeDone::Gone) {
+ TryMaybeDoneProjOwn::Done(output) => Some(output),
+ _ => unreachable!()
}
}
}
@@ -78,11 +66,9 @@ impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> {
impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
type Output = Result<(), Fut::Error>;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
match self.as_mut().project() {
- TryMaybeDone::Future(f) => {
+ TryMaybeDoneProj::Future(f) => {
match ready!(f.try_poll(cx)) {
Ok(res) => self.set(TryMaybeDone::Done(res)),
Err(e) => {
@@ -91,8 +77,8 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
}
}
},
- TryMaybeDone::Done(_) => {},
- TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"),
+ TryMaybeDoneProj::Done(_) => {},
+ TryMaybeDoneProj::Gone => panic!("TryMaybeDone polled after value taken"),
}
Poll::Ready(Ok(()))
}
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 94b3d2a..2755667 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use std::io::{self, Read};
use std::pin::Pin;
use std::{cmp, fmt};
@@ -68,13 +68,11 @@ impl<R: AsyncRead> BufReader<R> {
}
/// Invalidates all data in the internal buffer.
- #[project]
#[inline]
fn discard_buffer(self: Pin<&mut Self>) {
- #[project]
- let BufReader { pos, cap, .. } = self.project();
- *pos = 0;
- *cap = 0;
+ let this = self.project();
+ *this.pos = 0;
+ *this.cap = 0;
}
}
@@ -123,24 +121,22 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
- #[project]
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
- #[project]
- let BufReader { inner, buffer, cap, pos } = self.project();
+ let this = self.project();
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
- if *pos >= *cap {
- debug_assert!(*pos == *cap);
- *cap = ready!(inner.poll_read(cx, buffer))?;
- *pos = 0;
+ if *this.pos >= *this.cap {
+ debug_assert!(*this.pos == *this.cap);
+ *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
+ *this.pos = 0;
}
- Poll::Ready(Ok(&buffer[*pos..*cap]))
+ Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
}
fn consume(self: Pin<&mut Self>, amt: usize) {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index 66df81f..ed9196d 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -1,6 +1,6 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use std::fmt;
use std::io::{self, Write};
use std::pin::Pin;
@@ -51,15 +51,13 @@ impl<W: AsyncWrite> BufWriter<W> {
}
}
- #[project]
fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- #[project]
- let BufWriter { mut inner, buf, written } = self.project();
+ let mut this = self.project();
- let len = buf.len();
+ let len = this.buf.len();
let mut ret = Ok(());
- while *written < len {
- match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
+ while *this.written < len {
+ match ready!(this.inner.as_mut().poll_write(cx, &this.buf[*this.written..])) {
Ok(0) => {
ret = Err(io::Error::new(
io::ErrorKind::WriteZero,
@@ -67,17 +65,17 @@ impl<W: AsyncWrite> BufWriter<W> {
));
break;
}
- Ok(n) => *written += n,
+ Ok(n) => *this.written += n,
Err(e) => {
ret = Err(e);
break;
}
}
}
- if *written > 0 {
- buf.drain(..*written);
+ if *this.written > 0 {
+ this.buf.drain(..*this.written);
}
- *written = 0;
+ *this.written = 0;
Poll::Ready(ret)
}
diff --git a/src/io/chain.rs b/src/io/chain.rs
index 4e85854..336307f 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use std::fmt;
use std::io;
use std::pin::Pin;
@@ -51,10 +51,8 @@ where
/// underlying readers as doing so may corrupt the internal state of this
/// `Chain`.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
- unsafe {
- let Self { first, second, .. } = self.get_unchecked_mut();
- (Pin::new_unchecked(first), Pin::new_unchecked(second))
- }
+ let this = self.project();
+ (this.first, this.second)
}
/// Consumes the `Chain`, returning the wrapped readers.
@@ -82,42 +80,38 @@ where
T: AsyncRead,
U: AsyncRead,
{
- #[project]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- #[project]
- let Chain { first, second, done_first } = self.project();
+ let this = self.project();
- if !*done_first {
- match ready!(first.poll_read(cx, buf)?) {
- 0 if !buf.is_empty() => *done_first = true,
+ if !*this.done_first {
+ match ready!(this.first.poll_read(cx, buf)?) {
+ 0 if !buf.is_empty() => *this.done_first = true,
n => return Poll::Ready(Ok(n)),
}
}
- second.poll_read(cx, buf)
+ this.second.poll_read(cx, buf)
}
- #[project]
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
- #[project]
- let Chain { first, second, done_first } = self.project();
+ let this = self.project();
- if !*done_first {
- let n = ready!(first.poll_read_vectored(cx, bufs)?);
+ if !*this.done_first {
+ let n = ready!(this.first.poll_read_vectored(cx, bufs)?);
if n == 0 && bufs.iter().any(|b| !b.is_empty()) {
- *done_first = true
+ *this.done_first = true
} else {
return Poll::Ready(Ok(n));
}
}
- second.poll_read_vectored(cx, bufs)
+ this.second.poll_read_vectored(cx, bufs)
}
#[cfg(feature = "read-initializer")]
@@ -136,31 +130,27 @@ where
T: AsyncBufRead,
U: AsyncBufRead,
{
- #[project]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
- #[project]
- let Chain { first, second, done_first } = self.project();
+ let this = self.project();
- if !*done_first {
- match ready!(first.poll_fill_buf(cx)?) {
+ if !*this.done_first {
+ match ready!(this.first.poll_fill_buf(cx)?) {
buf if buf.is_empty() => {
- *done_first = true;
+ *this.done_first = true;
}
buf => return Poll::Ready(Ok(buf)),
}
}
- second.poll_fill_buf(cx)
+ this.second.poll_fill_buf(cx)
}
- #[project]
fn consume(self: Pin<&mut Self>, amt: usize) {
- #[project]
- let Chain { first, second, done_first } = self.project();
+ let this = self.project();
- if !*done_first {
- first.consume(amt)
+ if !*this.done_first {
+ this.first.consume(amt)
} else {
- second.consume(amt)
+ this.second.consume(amt)
}
}
}
diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs
index f8bb49e..f47144a 100644
--- a/src/io/copy_buf.rs
+++ b/src/io/copy_buf.rs
@@ -3,7 +3,7 @@ use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncWrite};
use std::io;
use std::pin::Pin;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Creates a future which copies all the bytes from one object to another.
///
@@ -59,23 +59,21 @@ impl<R, W> Future for CopyBuf<'_, R, W>
{
type Output = io::Result<u64>;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
- let CopyBuf { mut reader, mut writer, amt } = self.project();
+ let mut this = self.project();
loop {
- let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?;
+ let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
- ready!(Pin::new(&mut writer).poll_flush(cx))?;
- return Poll::Ready(Ok(*amt));
+ ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(*this.amt));
}
- let i = ready!(Pin::new(&mut writer).poll_write(cx, buffer))?;
+ let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
- *amt += i as u64;
- reader.as_mut().consume(i);
+ *this.amt += i as u64;
+ this.reader.as_mut().consume(i);
}
}
}
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs
new file mode 100644
index 0000000..015547e
--- /dev/null
+++ b/src/io/fill_buf.rs
@@ -0,0 +1,50 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct FillBuf<'a, R: ?Sized> {
+ reader: Option<&'a mut R>,
+}
+
+impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
+ pub(super) fn new(reader: &'a mut R) -> Self {
+ FillBuf { reader: Some(reader) }
+ }
+}
+
+impl<'a, R> Future for FillBuf<'a, R>
+ where R: AsyncBufRead + ?Sized + Unpin,
+{
+ type Output = io::Result<&'a [u8]>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ let reader = this.reader.take().expect("Polled FillBuf after completion");
+
+ match Pin::new(&mut *reader).poll_fill_buf(cx) {
+ // With polinius it is possible to remove this inner match and just have the correct
+ // lifetime of the reference inferred based on which branch is taken
+ Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
+ Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
+ Poll::Ready(Err(err)) => {
+ unreachable!("reader indicated readiness but then returned an error: {:?}", err)
+ }
+ Poll::Pending => {
+ unreachable!("reader indicated readiness but then returned pending")
+ }
+ },
+ Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
+ Poll::Pending => {
+ this.reader = Some(reader);
+ Poll::Pending
+ }
+ }
+ }
+}
diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs
index 0589f3a..082c581 100644
--- a/src/io/into_sink.rs
+++ b/src/io/into_sink.rs
@@ -3,7 +3,7 @@ use futures_io::AsyncWrite;
use futures_sink::Sink;
use std::io;
use std::pin::Pin;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
#[derive(Debug)]
struct Block<Item> {
@@ -15,6 +15,7 @@ struct Block<Item> {
#[pin_project]
#[must_use = "sinks do nothing unless polled"]
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub struct IntoSink<W, Item> {
#[pin]
writer: W,
@@ -30,26 +31,24 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
/// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
/// flush the writer after it succeeds in pushing the block into it.
- #[project]
fn poll_flush_buffer(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>>
{
- #[project]
- let IntoSink { mut writer, buffer } = self.project();
+ let mut this = self.project();
- if let Some(buffer) = buffer {
+ if let Some(buffer) = this.buffer {
loop {
let bytes = buffer.bytes.as_ref();
- let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
+ let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
buffer.offset += written;
if buffer.offset == bytes.len() {
break;
}
}
}
- *buffer = None;
+ *this.buffer = None;
Poll::Ready(Ok(()))
}
diff --git a/src/io/lines.rs b/src/io/lines.rs
index af0b491..90b993d 100644
--- a/src/io/lines.rs
+++ b/src/io/lines.rs
@@ -5,7 +5,7 @@ use std::io;
use std::mem;
use std::pin::Pin;
use super::read_line::read_line_internal;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
@@ -34,20 +34,18 @@ impl<R: AsyncBufRead> Lines<R> {
impl<R: AsyncBufRead> Stream for Lines<R> {
type Item = io::Result<String>;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- #[project]
- let Lines { reader, buf, bytes, read } = self.project();
- let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
- if n == 0 && buf.is_empty() {
+ let this = self.project();
+ let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?;
+ if n == 0 && this.buf.is_empty() {
return Poll::Ready(None)
}
- if buf.ends_with('\n') {
- buf.pop();
- if buf.ends_with('\r') {
- buf.pop();
+ if this.buf.ends_with('\n') {
+ this.buf.pop();
+ if this.buf.ends_with('\r') {
+ this.buf.pop();
}
}
- Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
+ Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
}
}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 29b6418..51ee995 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -10,14 +10,16 @@
//! library is activated, and it is activated by default.
#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
use crate::compat::Compat;
-use std::ptr;
+use std::{ptr, pin::Pin};
pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
IoSlice, IoSliceMut, Result, SeekFrom,
};
#[cfg(feature = "read-initializer")]
+#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
pub use futures_io::Initializer;
// used by `BufReader` and `BufWriter`
@@ -65,12 +67,17 @@ pub use self::cursor::Cursor;
mod empty;
pub use self::empty::{empty, Empty};
+mod fill_buf;
+pub use self::fill_buf::FillBuf;
+
mod flush;
pub use self::flush::Flush;
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
mod into_sink;
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::into_sink::IntoSink;
mod lines;
@@ -124,9 +131,9 @@ pub use self::write_vectored::WriteVectored;
mod write_all;
pub use self::write_all::WriteAll;
-#[cfg(feature = "write_all_vectored")]
+#[cfg(feature = "write-all-vectored")]
mod write_all_vectored;
-#[cfg(feature = "write_all_vectored")]
+#[cfg(feature = "write-all-vectored")]
pub use self::write_all_vectored::WriteAllVectored;
/// An extension trait which adds utility methods to `AsyncRead` types.
@@ -372,6 +379,7 @@ pub trait AsyncReadExt: AsyncRead {
///
/// Requires the `io-compat` feature to enable.
#[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
fn compat(self) -> Compat<Self>
where Self: Sized + Unpin,
{
@@ -493,9 +501,10 @@ pub trait AsyncWriteExt: AsyncWrite {
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::AsyncWriteExt;
- /// use std::io::{Cursor, IoSlice};
+ /// use futures_util::io::Cursor;
+ /// use std::io::IoSlice;
///
- /// let mut writer = Cursor::new([0u8; 7]);
+ /// let mut writer = Cursor::new(Vec::new());
/// let bufs = &mut [
/// IoSlice::new(&[1]),
/// IoSlice::new(&[2, 3]),
@@ -503,12 +512,12 @@ pub trait AsyncWriteExt: AsyncWrite {
/// ];
///
/// writer.write_all_vectored(bufs).await?;
- /// // Note: the contents of `bufs` is now undefined, see the Notes section.
+ /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
///
- /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]);
+ /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
- #[cfg(feature = "write_all_vectored")]
+ #[cfg(feature = "write-all-vectored")]
fn write_all_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSlice<'a>],
@@ -523,6 +532,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
/// Requires the `io-compat` feature to enable.
#[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
fn compat_write(self) -> Compat<Self>
where Self: Sized + Unpin,
{
@@ -556,6 +566,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
where Self: Sized,
{
@@ -583,6 +594,58 @@ impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {
+ /// Creates a future which will wait for a non-empty buffer to be available from this I/O
+ /// object or EOF to be reached.
+ ///
+ /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
+ ///
+ /// ```rust
+ /// # futures::executor::block_on(async {
+ /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+ ///
+ /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+ /// stream.consume_unpin(2);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![3]);
+ /// stream.consume_unpin(1);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
+ /// stream.consume_unpin(3);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn fill_buf(&mut self) -> FillBuf<'_, Self>
+ where Self: Unpin,
+ {
+ FillBuf::new(self)
+ }
+
+ /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
+ ///
+ /// ```rust
+ /// # futures::executor::block_on(async {
+ /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+ ///
+ /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+ /// stream.consume_unpin(2);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![3]);
+ /// stream.consume_unpin(1);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn consume_unpin(&mut self, amt: usize)
+ where Self: Unpin,
+ {
+ Pin::new(self).consume(amt)
+ }
+
/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
diff --git a/src/io/take.rs b/src/io/take.rs
index 39088b7..6179486 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, AsyncBufRead};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use std::{cmp, io};
use std::pin::Pin;
@@ -83,22 +83,20 @@ impl<R: AsyncRead> Take<R> {
}
impl<R: AsyncRead> AsyncRead for Take<R> {
- #[project]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
- #[project]
- let Take { inner, limit_ } = self.project();
+ let this = self.project();
- if *limit_ == 0 {
+ if *this.limit_ == 0 {
return Poll::Ready(Ok(0));
}
- let max = std::cmp::min(buf.len() as u64, *limit_) as usize;
- let n = ready!(inner.poll_read(cx, &mut buf[..max]))?;
- *limit_ -= n as u64;
+ let max = cmp::min(buf.len() as u64, *this.limit_) as usize;
+ let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?;
+ *this.limit_ -= n as u64;
Poll::Ready(Ok(n))
}
@@ -109,29 +107,25 @@ impl<R: AsyncRead> AsyncRead for Take<R> {
}
impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
- #[project]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
- #[project]
- let Take { inner, limit_ } = self.project();
+ let this = self.project();
// Don't call into inner reader at all at EOF because it may still block
- if *limit_ == 0 {
+ if *this.limit_ == 0 {
return Poll::Ready(Ok(&[]));
}
- let buf = ready!(inner.poll_fill_buf(cx)?);
- let cap = cmp::min(buf.len() as u64, *limit_) as usize;
+ let buf = ready!(this.inner.poll_fill_buf(cx)?);
+ let cap = cmp::min(buf.len() as u64, *this.limit_) as usize;
Poll::Ready(Ok(&buf[..cap]))
}
- #[project]
fn consume(self: Pin<&mut Self>, amt: usize) {
- #[project]
- let Take { inner, limit_ } = self.project();
+ let this = self.project();
// Don't let callers reset the limit by passing an overlarge value
- let amt = cmp::min(amt as u64, *limit_) as usize;
- *limit_ -= amt as u64;
- inner.consume(amt);
+ let amt = cmp::min(amt as u64, *this.limit_) as usize;
+ *this.limit_ -= amt as u64;
+ this.inner.consume(amt);
}
}
diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs
index fbe7e73..ec28798 100644
--- a/src/io/write_all_vectored.rs
+++ b/src/io/write_all_vectored.rs
@@ -19,7 +19,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
- WriteAllVectored { writer, bufs }
+ WriteAllVectored { writer, bufs: IoSlice::advance(bufs, 0) }
}
}
@@ -171,6 +171,7 @@ mod tests {
#[rustfmt::skip] // Becomes unreadable otherwise.
let tests: Vec<(_, &'static [u8])> = vec![
(vec![], &[]),
+ (vec![IoSlice::new(&[]), IoSlice::new(&[])], &[]),
(vec![IoSlice::new(&[1])], &[1]),
(vec![IoSlice::new(&[1, 2])], &[1, 2]),
(vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]),
diff --git a/src/lib.rs b/src/lib.rs
index 68b954a..1cf165c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -7,16 +7,20 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+// It cannot be included in the published code because this lints have false positives in the minimum required version.
+#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]
-// The solution for this lint is not available on 1.39 which is the current minimum supported version.
-// Can be removed as of minimum supported 1.40 or if https://github.com/rust-lang/rust-clippy/issues/3941
+// mem::take requires Rust 1.40, matches! requires Rust 1.42
+// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
// get's implemented.
-#![allow(clippy::mem_replace_with_default)]
+#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-#![doc(html_root_url = "https://docs.rs/futures-util/0.3.5")]
+#![doc(html_root_url = "https://docs.rs/futures-util/0.3.7")]
+
+#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
@@ -37,25 +41,27 @@ extern crate futures_core;
pub use futures_core::ready;
pub use pin_utils::pin_mut;
-// Not public API.
#[cfg(feature = "async-await")]
#[macro_use]
-#[doc(hidden)]
-pub mod async_await;
+mod async_await;
#[cfg(feature = "async-await")]
#[doc(hidden)]
pub use self::async_await::*;
// Not public API.
-#[doc(hidden)]
-pub use futures_core::core_reexport;
-
-// Not public API.
#[cfg(feature = "async-await")]
#[doc(hidden)]
-pub mod __reexport {
- #[doc(hidden)]
+pub mod __private {
pub use crate::*;
+ pub use core::{
+ option::Option::{self, Some, None},
+ pin::Pin,
+ result::Result::{Err, Ok},
+ };
+
+ pub mod async_await {
+ pub use crate::async_await::*;
+ }
}
macro_rules! cfg_target_has_atomic {
@@ -70,8 +76,8 @@ macro_rules! delegate_sink {
($field:ident, $item:ty) => {
fn poll_ready(
self: core::pin::Pin<&mut Self>,
- cx: &mut $crate::core_reexport::task::Context<'_>,
- ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<Result<(), Self::Error>> {
self.project().$field.poll_ready(cx)
}
@@ -84,15 +90,15 @@ macro_rules! delegate_sink {
fn poll_flush(
self: core::pin::Pin<&mut Self>,
- cx: &mut $crate::core_reexport::task::Context<'_>,
- ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<Result<(), Self::Error>> {
self.project().$field.poll_flush(cx)
}
fn poll_close(
self: core::pin::Pin<&mut Self>,
- cx: &mut $crate::core_reexport::task::Context<'_>,
- ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<Result<(), Self::Error>> {
self.project().$field.poll_close(cx)
}
}
@@ -102,8 +108,8 @@ macro_rules! delegate_future {
($field:ident) => {
fn poll(
self: core::pin::Pin<&mut Self>,
- cx: &mut $crate::core_reexport::task::Context<'_>,
- ) -> $crate::core_reexport::task::Poll<Self::Output> {
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<Self::Output> {
self.project().$field.poll(cx)
}
}
@@ -113,8 +119,8 @@ macro_rules! delegate_stream {
($field:ident) => {
fn poll_next(
self: core::pin::Pin<&mut Self>,
- cx: &mut $crate::core_reexport::task::Context<'_>,
- ) -> $crate::core_reexport::task::Poll<Option<Self::Item>> {
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<Option<Self::Item>> {
self.project().$field.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
@@ -183,7 +189,7 @@ macro_rules! delegate_async_buf_read {
) -> core::task::Poll<std::io::Result<&[u8]>> {
self.project().$field.poll_fill_buf(cx)
}
-
+
fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
self.project().$field.consume(amt)
}
@@ -308,6 +314,7 @@ pub mod stream;
#[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt};
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;
#[cfg(feature = "sink")]
#[doc(hidden)] pub use crate::sink::SinkExt;
@@ -317,9 +324,11 @@ pub mod task;
pub mod never;
#[cfg(feature = "compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
pub mod compat;
#[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
pub mod io;
#[cfg(feature = "io")]
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs
index cccd0be..3698406 100644
--- a/src/lock/bilock.rs
+++ b/src/lock/bilock.rs
@@ -34,6 +34,7 @@ use alloc::sync::Arc;
/// This type is only available when the `bilock` feature of this
/// library is activated.
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub struct BiLock<T> {
arc: Arc<Inner<T>>,
}
@@ -142,6 +143,7 @@ impl<T> BiLock<T> {
///
/// Note that the returned future will never resolve to an error.
#[cfg(feature = "bilock")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub fn lock(&self) -> BiLockAcquire<'_, T> {
BiLockAcquire {
bilock: self,
@@ -198,6 +200,7 @@ impl<T> Drop for Inner<T> {
/// Error indicating two `BiLock<T>`s were not two halves of a whole, and
/// thus could not be `reunite`d.
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
impl<T> fmt::Debug for ReuniteError<T> {
@@ -223,6 +226,7 @@ impl<T: core::any::Any> std::error::Error for ReuniteError<T> {}
/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
/// unlocked.
#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub struct BiLockGuard<'a, T> {
bilock: &'a BiLock<T>,
}
@@ -258,6 +262,7 @@ impl<T> Drop for BiLockGuard<'_, T> {
/// Future returned by `BiLock::lock` which will resolve when the lock is
/// acquired.
#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct BiLockAcquire<'a, T> {
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index 3db5e5b..b252613 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -9,9 +9,11 @@ mod mutex;
pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(any(feature = "sink", feature = "io"))]
#[cfg(not(feature = "bilock"))]
diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs
index 96549ea..84aeeda 100644
--- a/src/lock/mutex.rs
+++ b/src/lock/mutex.rs
@@ -3,15 +3,16 @@ use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::{fmt, mem};
use std::cell::UnsafeCell;
+use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicUsize, Ordering};
/// A futures-aware mutex.
-///
+///
/// # Fairness
-///
+///
/// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
/// in the order that they requested the lock, and it's possible for a single task
/// which repeatedly takes the lock to starve other tasks, which may be left waiting
@@ -288,7 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> {
// Don't run the `drop` method for MutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
mem::forget(this);
- MappedMutexGuard { mutex, value }
+ MappedMutexGuard { mutex, value, _marker: PhantomData }
}
}
@@ -325,6 +326,7 @@ impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
mutex: &'a Mutex<T>,
value: *mut U,
+ _marker: PhantomData<&'a mut U>,
}
impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
@@ -354,7 +356,7 @@ impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
// Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
// locked state is being moved to the returned MappedMutexGuard.
mem::forget(this);
- MappedMutexGuard { mutex, value }
+ MappedMutexGuard { mutex, value, _marker: PhantomData }
}
}
@@ -401,8 +403,8 @@ unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
-unsafe impl<T: ?Sized + Send, U: ?Sized> Send for MappedMutexGuard<'_, T, U> {}
-unsafe impl<T: ?Sized + Sync, U: ?Sized> Sync for MappedMutexGuard<'_, T, U> {}
+unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
+unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
#[test]
fn test_mutex_guard_debug_not_recurse() {
diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs
index c3df3b9..8176abd 100644
--- a/src/sink/buffer.rs
+++ b/src/sink/buffer.rs
@@ -1,7 +1,7 @@
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::pin::Pin;
use alloc::collections::VecDeque;
@@ -29,18 +29,16 @@ impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
delegate_access_inner!(sink, Si, ());
- #[project]
fn try_empty_buffer(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Si::Error>> {
- #[project]
- let Buffer { mut sink, buf, .. } = self.project();
- ready!(sink.as_mut().poll_ready(cx))?;
- while let Some(item) = buf.pop_front() {
- sink.as_mut().start_send(item)?;
- if !buf.is_empty() {
- ready!(sink.as_mut().poll_ready(cx))?;
+ let mut this = self.project();
+ ready!(this.sink.as_mut().poll_ready(cx))?;
+ while let Some(item) = this.buf.pop_front() {
+ this.sink.as_mut().start_send(item)?;
+ if !this.buf.is_empty() {
+ ready!(this.sink.as_mut().poll_ready(cx))?;
}
}
Poll::Ready(Ok(()))
diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs
index 530e1cc..b23ada7 100644
--- a/src/sink/err_into.rs
+++ b/src/sink/err_into.rs
@@ -1,7 +1,7 @@
use crate::sink::{SinkExt, SinkMapErr};
use futures_core::stream::{Stream, FusedStream};
use futures_sink::{Sink};
-use pin_project::{pin_project};
+use pin_project::pin_project;
/// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
#[pin_project]
diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs
index 7066e21..d71d793 100644
--- a/src/sink/fanout.rs
+++ b/src/sink/fanout.rs
@@ -2,7 +2,7 @@ use core::fmt::{Debug, Formatter, Result as FmtResult};
use core::pin::Pin;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Sink that clones incoming items and forwards them to two sinks at the same time.
///
@@ -33,11 +33,9 @@ impl<Si1, Si2> Fanout<Si1, Si2> {
}
/// Get a pinned mutable reference to the inner sinks.
- #[project]
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
- #[project]
- let Fanout { sink1, sink2 } = self.project();
- (sink1, sink2)
+ let this = self.project();
+ (this.sink1, this.sink2)
}
/// Consumes this combinator, returning the underlying sinks.
@@ -65,57 +63,49 @@ impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
{
type Error = Si1::Error;
- #[project]
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- #[project]
- let Fanout { sink1, sink2 } = self.project();
+ let this = self.project();
- let sink1_ready = sink1.poll_ready(cx)?.is_ready();
- let sink2_ready = sink2.poll_ready(cx)?.is_ready();
+ let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
let ready = sink1_ready && sink2_ready;
if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
}
- #[project]
fn start_send(
self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
- #[project]
- let Fanout { sink1, sink2 } = self.project();
+ let this = self.project();
- sink1.start_send(item.clone())?;
- sink2.start_send(item)?;
+ this.sink1.start_send(item.clone())?;
+ this.sink2.start_send(item)?;
Ok(())
}
- #[project]
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- #[project]
- let Fanout { sink1, sink2 } = self.project();
+ let this = self.project();
- let sink1_ready = sink1.poll_flush(cx)?.is_ready();
- let sink2_ready = sink2.poll_flush(cx)?.is_ready();
+ let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
let ready = sink1_ready && sink2_ready;
if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
}
- #[project]
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
- #[project]
- let Fanout { sink1, sink2 } = self.project();
+ let this = self.project();
- let sink1_ready = sink1.poll_close(cx)?.is_ready();
- let sink2_ready = sink2.poll_close(cx)?.is_ready();
+ let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
+ let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
let ready = sink1_ready && sink2_ready;
if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
}
diff --git a/src/sink/mod.rs b/src/sink/mod.rs
index ebdb999..b0e2c83 100644
--- a/src/sink/mod.rs
+++ b/src/sink/mod.rs
@@ -258,6 +258,7 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Wraps a [`Sink`] into a sink compatible with libraries using
/// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
#[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
where Self: Sized + Unpin,
{
diff --git a/src/sink/with.rs b/src/sink/with.rs
index 802123f..6329a0c 100644
--- a/src/sink/with.rs
+++ b/src/sink/with.rs
@@ -5,7 +5,7 @@ use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Sink for the [`with`](super::SinkExt::with) method.
#[pin_project]
@@ -71,20 +71,18 @@ impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
delegate_access_inner!(sink, Si, ());
/// Completes the processing of previous item if any.
- #[project]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), E>> {
- #[project]
- let With { mut state, sink, .. } = self.project();
+ let mut this = self.project();
- let item = match state.as_mut().as_pin_mut() {
+ let item = match this.state.as_mut().as_pin_mut() {
None => return Poll::Ready(Ok(())),
Some(fut) => ready!(fut.poll(cx))?,
};
- state.set(None);
- sink.start_send(item)?;
+ this.state.set(None);
+ this.sink.start_send(item)?;
Poll::Ready(Ok(()))
}
}
@@ -106,16 +104,14 @@ impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F>
Poll::Ready(Ok(()))
}
- #[project]
fn start_send(
self: Pin<&mut Self>,
item: U,
) -> Result<(), Self::Error> {
- #[project]
- let With { mut state, f, .. } = self.project();
+ let mut this = self.project();
- assert!(state.is_none());
- state.set(Some(f(item)));
+ assert!(this.state.is_none());
+ this.state.set(Some((this.f)(item)));
Ok(())
}
diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs
index f260a0b..cf213e6 100644
--- a/src/sink/with_flat_map.rs
+++ b/src/sink/with_flat_map.rs
@@ -4,7 +4,7 @@ use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method.
#[pin_project]
@@ -52,31 +52,29 @@ where
delegate_access_inner!(sink, Si, ());
- #[project]
fn try_empty_stream(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Si::Error>> {
- #[project]
- let WithFlatMap { mut sink, mut stream, buffer, .. } = self.project();
+ let mut this = self.project();
- if buffer.is_some() {
- ready!(sink.as_mut().poll_ready(cx))?;
- let item = buffer.take().unwrap();
- sink.as_mut().start_send(item)?;
+ if this.buffer.is_some() {
+ ready!(this.sink.as_mut().poll_ready(cx))?;
+ let item = this.buffer.take().unwrap();
+ this.sink.as_mut().start_send(item)?;
}
- if let Some(mut some_stream) = stream.as_mut().as_pin_mut() {
+ if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() {
while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) {
- match sink.as_mut().poll_ready(cx)? {
- Poll::Ready(()) => sink.as_mut().start_send(item)?,
+ match this.sink.as_mut().poll_ready(cx)? {
+ Poll::Ready(()) => this.sink.as_mut().start_send(item)?,
Poll::Pending => {
- *buffer = Some(item);
+ *this.buffer = Some(item);
return Poll::Pending;
}
};
}
}
- stream.set(None);
+ this.stream.set(None);
Poll::Ready(Ok(()))
}
}
@@ -119,16 +117,14 @@ where
self.try_empty_stream(cx)
}
- #[project]
fn start_send(
self: Pin<&mut Self>,
item: U,
) -> Result<(), Self::Error> {
- #[project]
- let WithFlatMap { mut stream, f, .. } = self.project();
+ let mut this = self.project();
- assert!(stream.is_none());
- stream.set(Some(f(item)));
+ assert!(this.stream.is_none());
+ this.stream.set(Some((this.f)(item)));
Ok(())
}
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index 6dc07ad..5dbd4ae 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -1,7 +1,7 @@
use crate::stream::{FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::Stream;
-use futures_core::task::{Context, Poll};
+use futures_core::{FusedStream, task::{Context, Poll}};
use pin_project::pin_project;
use core::cmp::Ordering;
use core::fmt::{self, Debug};
@@ -203,6 +203,12 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
}
}
+impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
+ fn is_terminated(&self) -> bool {
+ self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
+ }
+}
+
impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
fn extend<I>(&mut self, iter: I)
where
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index 9277229..abb0264 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -70,7 +70,7 @@ impl<Fut> ArcWake for Task<Fut> {
impl<Fut> Task<Fut> {
/// Returns a waker reference for this task without cloning the Arc.
- pub(super) fn waker_ref<'a>(this: &'a Arc<Task<Fut>>) -> WakerRef<'a> {
+ pub(super) fn waker_ref(this: &Arc<Task<Fut>>) -> WakerRef<'_> {
waker_ref(this)
}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 2a5ecf9..ca9bc89 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -28,6 +28,7 @@ pub use self::stream::Chunks;
pub use self::stream::ReadyChunks;
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::stream::Forward;
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
@@ -36,6 +37,7 @@ pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
pub use self::stream::{ReuniteError, SplitSink, SplitStream};
@@ -43,10 +45,11 @@ mod try_stream;
pub use self::try_stream::{
try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
- TrySkipWhile, TryStreamExt, TryUnfold,
+ TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
};
#[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
pub use self::try_stream::IntoAsyncRead;
@@ -97,3 +100,12 @@ cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
pub use self::select_all::{select_all, SelectAll};
}
+
+// Just a helper function to ensure the futures we're returning all have the
+// right implementations.
+pub(crate) fn assert_stream<T, S>(stream: S) -> S
+ where
+ S: Stream<Item = T>,
+{
+ stream
+}
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 21cd14b..3a8fef6 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Creates a stream of a single element.
///
@@ -20,8 +20,6 @@ pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
}
/// A stream which emits single element and then EOF.
-///
-/// This stream will never block and is always ready.
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
@@ -39,16 +37,14 @@ impl<Fut> Once<Fut> {
impl<Fut: Future> Stream for Once<Fut> {
type Item = Fut::Output;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- #[project]
- let Once { mut future } = self.project();
- let v = match future.as_mut().as_pin_mut() {
+ let mut this = self.project();
+ let v = match this.future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
- future.set(None);
+ this.future.set(None);
Poll::Ready(Some(v))
}
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 36503e4..7666386 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -2,7 +2,7 @@ use crate::stream::{StreamExt, Fuse};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`select()`] function.
#[pin_project]
@@ -58,11 +58,9 @@ impl<St1, St2> Select<St1, St2> {
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
- #[project]
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
- #[project]
- let Select { stream1, stream2, .. } = self.project();
- (stream1.get_pin_mut(), stream2.get_pin_mut())
+ let this = self.project();
+ (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
}
/// Consumes this combinator, returning the underlying streams.
@@ -89,18 +87,15 @@ impl<St1, St2> Stream for Select<St1, St2>
{
type Item = St1::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>> {
- #[project]
- let Select { flag, stream1, stream2 } = self.project();
-
- if !*flag {
- poll_inner(flag, stream1, stream2, cx)
+ let this = self.project();
+ if !*this.flag {
+ poll_inner(this.flag, this.stream1, this.stream2, cx)
} else {
- poll_inner(flag, stream2, stream1, cx)
+ poll_inner(this.flag, this.stream2, this.stream1, cx)
}
}
}
diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs
index a822576..24f4853 100644
--- a/src/stream/stream/buffer_unordered.rs
+++ b/src/stream/stream/buffer_unordered.rs
@@ -4,7 +4,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::fmt;
use core::pin::Pin;
@@ -62,31 +62,29 @@ where
{
type Item = <St::Item as Future>::Output;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let BufferUnordered { mut stream, in_progress_queue, max } = self.project();
+ let mut this = self.project();
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
- while in_progress_queue.len() < *max {
- match stream.as_mut().poll_next(cx) {
- Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- match in_progress_queue.poll_next_unpin(cx) {
+ match this.in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
// If more values are still coming from the stream, we're not done yet
- if stream.is_done() {
+ if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 9dff01f..626ead1 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -4,7 +4,7 @@ use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::fmt;
use core::pin::Pin;
@@ -59,31 +59,29 @@ where
{
type Item = <St::Item as Future>::Output;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Buffered { mut stream, in_progress_queue, max } = self.project();
+ let mut this = self.project();
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
- while in_progress_queue.len() < *max {
- match stream.as_mut().poll_next(cx) {
- Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- let res = in_progress_queue.poll_next_unpin(cx);
+ let res = this.in_progress_queue.poll_next_unpin(cx);
if let Some(val) = ready!(res) {
return Poll::Ready(Some(val))
}
// If more values are still coming from the stream, we're not done yet
- if stream.is_done() {
+ if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs
index 1bb43b2..b9eb4ba 100644
--- a/src/stream/stream/catch_unwind.rs
+++ b/src/stream/stream/catch_unwind.rs
@@ -1,6 +1,6 @@
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use std::any::Any;
use std::pin::Pin;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
@@ -26,25 +26,23 @@ impl<St: Stream + UnwindSafe> CatchUnwind<St> {
impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
type Item = Result<St::Item, Box<dyn Any + Send>>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let CatchUnwind { stream, caught_unwind } = self.project();
+ let mut this = self.project();
- if *caught_unwind {
+ if *this.caught_unwind {
Poll::Ready(None)
} else {
let res = catch_unwind(AssertUnwindSafe(|| {
- stream.poll_next(cx)
+ this.stream.as_mut().poll_next(cx)
}));
match res {
Ok(poll) => poll.map(|opt| opt.map(Ok)),
Err(e) => {
- *caught_unwind = true;
+ *this.caught_unwind = true;
Poll::Ready(Some(Err(e)))
},
}
diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs
index 720903c..c7fbd5f 100644
--- a/src/stream/stream/chain.rs
+++ b/src/stream/stream/chain.rs
@@ -1,7 +1,7 @@
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`chain`](super::StreamExt::chain) method.
#[pin_project]
@@ -42,20 +42,18 @@ where St1: Stream,
{
type Item = St1::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Chain { mut first, second } = self.project();
- if let Some(first) = first.as_mut().as_pin_mut() {
+ let mut this = self.project();
+ if let Some(first) = this.first.as_mut().as_pin_mut() {
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item))
}
}
- first.set(None);
- second.poll_next(cx)
+ this.first.set(None);
+ this.second.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index d24c31c..9b4ed93 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
@@ -41,21 +41,19 @@ impl<St: Stream> Chunks<St> where St: Stream {
impl<St: Stream> Stream for Chunks<St> {
type Item = Vec<St::Item>;
- #[project]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Chunks { mut stream, items, cap } = self.as_mut().project();
+ let mut this = self.as_mut().project();
loop {
- match ready!(stream.as_mut().poll_next(cx)) {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => {
- items.push(item);
- if items.len() >= *cap {
+ this.items.push(item);
+ if this.items.len() >= *this.cap {
return Poll::Ready(Some(self.take()))
}
}
@@ -63,10 +61,10 @@ impl<St: Stream> Stream for Chunks<St> {
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
- let last = if items.is_empty() {
+ let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(items, Vec::new());
+ let full_buf = mem::replace(this.items, Vec::new());
Some(full_buf)
};
diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs
index 349e42d..6d07660 100644
--- a/src/stream/stream/collect.rs
+++ b/src/stream/stream/collect.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`collect`](super::StreamExt::collect) method.
#[pin_project]
@@ -43,13 +43,11 @@ where St: Stream,
{
type Output = C;
- #[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
- #[project]
- let Collect { mut stream, collection } = self.as_mut().project();
+ let mut this = self.as_mut().project();
loop {
- match ready!(stream.as_mut().poll_next(cx)) {
- Some(e) => collection.extend(Some(e)),
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(e) => this.collection.extend(Some(e)),
None => return Poll::Ready(self.finish()),
}
}
diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs
index 647632b..9b37cd2 100644
--- a/src/stream/stream/concat.rs
+++ b/src/stream/stream/concat.rs
@@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`concat`](super::StreamExt::concat) method.
#[pin_project]
@@ -34,23 +34,21 @@ where St: Stream,
{
type Output = St::Item;
- #[project]
fn poll(
self: Pin<&mut Self>, cx: &mut Context<'_>
) -> Poll<Self::Output> {
- #[project]
- let Concat { mut stream, accum } = self.project();
+ let mut this = self.project();
loop {
- match ready!(stream.as_mut().poll_next(cx)) {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
None => {
- return Poll::Ready(accum.take().unwrap_or_default())
+ return Poll::Ready(this.accum.take().unwrap_or_default())
}
Some(e) => {
- if let Some(a) = accum {
+ if let Some(a) = this.accum {
a.extend(e)
} else {
- *accum = Some(e)
+ *this.accum = Some(e)
}
}
}
diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs
index 477a052..4e6bac2 100644
--- a/src/stream/stream/enumerate.rs
+++ b/src/stream/stream/enumerate.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
#[pin_project]
@@ -35,18 +35,16 @@ impl<St: Stream + FusedStream> FusedStream for Enumerate<St> {
impl<St: Stream> Stream for Enumerate<St> {
type Item = (usize, St::Item);
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Enumerate { stream, count } = self.project();
+ let this = self.project();
- match ready!(stream.poll_next(cx)) {
+ match ready!(this.stream.poll_next(cx)) {
Some(item) => {
- let prev_count = *count;
- *count += 1;
+ let prev_count = *this.count;
+ *this.count += 1;
Poll::Ready(Some((prev_count, item)))
}
None => Poll::Ready(None),
diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs
index 9d848ad..55493fe 100644
--- a/src/stream/stream/filter.rs
+++ b/src/stream/stream/filter.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use crate::fns::FnMut1;
/// Stream for the [`filter`](super::StreamExt::filter) method.
@@ -37,6 +37,7 @@ where
}
}
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, Fut, F> Filter<St, Fut, F>
where St: Stream,
F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
@@ -64,6 +65,7 @@ impl<St, Fut, F> FusedStream for Filter<St, Fut, F>
}
}
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, Fut, F> Stream for Filter<St, Fut, F>
where St: Stream,
F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
@@ -71,24 +73,22 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F>
{
type Item = St::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- #[project]
- let Filter { mut stream, f, mut pending_fut, pending_item } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let res = ready!(fut.poll(cx));
- pending_fut.set(None);
+ this.pending_fut.set(None);
if res {
- break pending_item.take();
+ break this.pending_item.take();
}
- *pending_item = None;
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- pending_fut.set(Some(f.call_mut(&item)));
- *pending_item = Some(item);
+ *this.pending_item = None;
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.pending_fut.set(Some(this.f.call_mut(&item)));
+ *this.pending_item = Some(item);
} else {
break None;
}
diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs
index 2d098ee..50c440f 100644
--- a/src/stream/stream/filter_map.rs
+++ b/src/stream/stream/filter_map.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use crate::fns::FnMut1;
/// Stream for the [`filter_map`](super::StreamExt::filter_map) method.
@@ -61,24 +61,22 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F>
{
type Item = T;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
- #[project]
- let FilterMap { mut stream, f, mut pending } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(p) = pending.as_mut().as_pin_mut() {
+ if let Some(p) = this.pending.as_mut().as_pin_mut() {
// We have an item in progress, poll that until it's done
let item = ready!(p.poll(cx));
- pending.set(None);
+ this.pending.set(None);
if item.is_some() {
break item;
}
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
// No item in progress, but the stream is still going
- pending.set(Some(f.call_mut(item)));
+ this.pending.set(Some(this.f.call_mut(item)));
} else {
// The stream is done
break None;
diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs
index 4db77e1..75bbc21 100644
--- a/src/stream/stream/flatten.rs
+++ b/src/stream/stream/flatten.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`flatten`](super::StreamExt::flatten) method.
#[pin_project]
@@ -41,19 +41,17 @@ where
{
type Item = <St::Item as Stream>::Item;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- #[project]
- let Flatten { mut stream, mut next } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(s) = this.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.poll_next(cx)) {
break Some(item);
} else {
- next.set(None);
+ this.next.set(None);
}
- } else if let Some(s) = ready!(stream.as_mut().poll_next(cx)) {
- next.set(Some(s));
+ } else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.next.set(Some(s));
} else {
break None;
}
diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs
index d4bec25..6fce256 100644
--- a/src/stream/stream/fold.rs
+++ b/src/stream/stream/fold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`fold`](super::StreamExt::fold) method.
#[pin_project]
@@ -64,21 +64,19 @@ impl<St, Fut, T, F> Future for Fold<St, Fut, T, F>
{
type Output = T;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
- #[project]
- let Fold { mut stream, f, accum, mut future } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
// we're currently processing a future to produce a new accum value
- *accum = Some(ready!(fut.poll(cx)));
- future.set(None);
- } else if accum.is_some() {
+ *this.accum = Some(ready!(fut.poll(cx)));
+ this.future.set(None);
+ } else if this.accum.is_some() {
// we're waiting on a new item from the stream
- let res = ready!(stream.as_mut().poll_next(cx));
- let a = accum.take().unwrap();
+ let res = ready!(this.stream.as_mut().poll_next(cx));
+ let a = this.accum.take().unwrap();
if let Some(item) = res {
- future.set(Some(f(a, item)));
+ this.future.set(Some((this.f)(a, item)));
} else {
break a;
}
diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs
index fb3f40f..c8af21b 100644
--- a/src/stream/stream/for_each.rs
+++ b/src/stream/stream/for_each.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`for_each`](super::StreamExt::for_each) method.
#[pin_project]
@@ -60,16 +60,14 @@ impl<St, Fut, F> Future for ForEach<St, Fut, F>
{
type Output = ();
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- #[project]
- let ForEach { mut stream, f, mut future } = self.project();
+ let mut this = self.project();
loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
ready!(fut.poll(cx));
- future.set(None);
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- future.set(Some(f(item)));
+ this.future.set(None);
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.future.set(Some((this.f)(item)));
} else {
break;
}
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index 88ff2d3..843ddaa 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -5,7 +5,7 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
/// method.
@@ -66,17 +66,15 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
{
type Output = ();
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- #[project]
- let ForEachConcurrent { mut stream, f, futures, limit } = self.project();
+ let mut this = self.project();
loop {
let mut made_progress_this_iter = false;
// Check if we've already created a number of futures greater than `limit`
- if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
+ if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
let mut stream_completed = false;
- let elem = if let Some(stream) = stream.as_mut().as_pin_mut() {
+ let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() {
match stream.poll_next(cx) {
Poll::Ready(Some(elem)) => {
made_progress_this_iter = true;
@@ -92,17 +90,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
None
};
if stream_completed {
- stream.set(None);
+ this.stream.set(None);
}
if let Some(elem) = elem {
- futures.push(f(elem));
+ this.futures.push((this.f)(elem));
}
}
- match futures.poll_next_unpin(cx) {
+ match this.futures.poll_next_unpin(cx) {
Poll::Ready(Some(())) => made_progress_this_iter = true,
Poll::Ready(None) => {
- if stream.is_none() {
+ if this.stream.is_none() {
return Poll::Ready(())
}
},
diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs
index 9776056..feef113 100644
--- a/src/stream/stream/forward.rs
+++ b/src/stream/stream/forward.rs
@@ -4,10 +4,10 @@ use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`forward`](super::StreamExt::forward) method.
-#[pin_project]
+#[pin_project(project = ForwardProj)]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Forward<St, Si, Item> {
@@ -45,13 +45,11 @@ where
{
type Output = Result<(), E>;
- #[project]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- #[project]
- let Forward { mut sink, mut stream, buffered_item } = self.project();
+ let ForwardProj { mut sink, mut stream, buffered_item } = self.project();
let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion");
loop {
@@ -61,7 +59,7 @@ where
ready!(si.as_mut().poll_ready(cx))?;
si.as_mut().start_send(buffered_item.take().unwrap())?;
}
-
+
match stream.as_mut().poll_next(cx)? {
Poll::Ready(Some(item)) => {
*buffered_item = Some(item);
diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs
index 971fe60..408ad81 100644
--- a/src/stream/stream/fuse.rs
+++ b/src/stream/stream/fuse.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`fuse`](super::StreamExt::fuse) method.
#[pin_project]
@@ -41,21 +41,19 @@ impl<S: Stream> FusedStream for Fuse<S> {
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<S::Item>> {
- #[project]
- let Fuse { stream, done } = self.project();
+ let this = self.project();
- if *done {
+ if *this.done {
return Poll::Ready(None);
}
- let item = ready!(stream.poll_next(cx));
+ let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
- *done = true;
+ *this.done = true;
}
Poll::Ready(item)
}
diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs
index 0d49384..8aa2b1e 100644
--- a/src/stream/stream/into_future.rs
+++ b/src/stream/stream/into_future.rs
@@ -52,7 +52,7 @@ impl<St: Stream + Unpin> StreamFuture<St> {
/// in order to return it to the caller of `Future::poll` if the stream yielded
/// an element.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
- Pin::get_mut(self).stream.as_mut().map(Pin::new)
+ self.get_mut().stream.as_mut().map(Pin::new)
}
/// Consumes this combinator, returning the underlying stream.
diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs
index 755f53a..c877512 100644
--- a/src/stream/stream/map.rs
+++ b/src/stream/stream/map.rs
@@ -4,7 +4,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use crate::fns::FnMut1;
@@ -51,15 +51,13 @@ impl<St, F> Stream for Map<St, F>
{
type Item = F::Output;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Map { stream, f } = self.project();
- let res = ready!(stream.poll_next(cx));
- Poll::Ready(res.map(|x| f.call_mut(x)))
+ let mut this = self.project();
+ let res = ready!(this.stream.as_mut().poll_next(cx));
+ Poll::Ready(res.map(|x| this.f.call_mut(x)))
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 359bb2f..f988468 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -3,7 +3,7 @@
//! This module contains a number of functions for working with `Stream`s,
//! including the `StreamExt` trait which adds methods to `Stream` types.
-use crate::future::Either;
+use crate::future::{assert_future, Either};
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::pin::Pin;
@@ -48,7 +48,7 @@ pub use self::filter_map::FilterMap;
mod flatten;
delegate_all!(
- /// Stream for the [`inspect`](StreamExt::inspect) method.
+ /// Stream for the [`flatten`](StreamExt::flatten) method.
Flatten<St>(
flatten::Flatten<St, St::Item>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
@@ -65,6 +65,7 @@ mod forward;
#[cfg(feature = "sink")]
delegate_all!(
/// Future for the [`forward`](super::StreamExt::forward) method.
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
Forward<St, Si>(
forward::Forward<St, Si, St::Ok>
): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
@@ -177,9 +178,11 @@ cfg_target_has_atomic! {
pub use self::for_each_concurrent::ForEachConcurrent;
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
mod split;
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::split::{SplitStream, SplitSink, ReuniteError};
@@ -190,6 +193,7 @@ mod catch_unwind;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::catch_unwind::CatchUnwind;
+use crate::stream::assert_stream;
impl<T: ?Sized> StreamExt for T where T: Stream {}
@@ -223,7 +227,7 @@ pub trait StreamExt: Stream {
where
Self: Unpin,
{
- Next::new(self)
+ assert_future::<Option<Self::Item>, _>(Next::new(self))
}
/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -258,7 +262,7 @@ pub trait StreamExt: Stream {
where
Self: Sized + Unpin,
{
- StreamFuture::new(self)
+ assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
}
/// Maps this stream's items to a different type, returning a new stream of
@@ -289,7 +293,7 @@ pub trait StreamExt: Stream {
F: FnMut(Self::Item) -> T,
Self: Sized,
{
- Map::new(self, f)
+ assert_stream::<T, _>(Map::new(self, f))
}
/// Creates a stream which gives the current iteration count as well as
@@ -306,7 +310,7 @@ pub trait StreamExt: Stream {
/// # Overflow Behavior
///
/// The method does no guarding against overflows, so enumerating more than
- /// [`usize::max_value()`] elements either produces the wrong result or panics. If
+ /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
/// debug assertions are enabled, a panic is guaranteed.
///
/// # Panics
@@ -334,7 +338,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Enumerate::new(self)
+ assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
}
/// Filters the values produced by this stream according to the provided
@@ -369,7 +373,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = bool>,
Self: Sized,
{
- Filter::new(self, f)
+ assert_stream::<Self::Item, _>(Filter::new(self, f))
}
/// Filters the values produced by this stream while simultaneously mapping
@@ -403,7 +407,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = Option<T>>,
Self: Sized,
{
- FilterMap::new(self, f)
+ assert_stream::<T, _>(FilterMap::new(self, f))
}
/// Computes from this stream's items new items of a different type using
@@ -434,7 +438,7 @@ pub trait StreamExt: Stream {
Fut: Future,
Self: Sized,
{
- Then::new(self, f)
+ assert_stream::<Fut::Output, _>(Then::new(self, f))
}
/// Transforms a stream into a collection, returning a
@@ -466,7 +470,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Collect::new(self)
+ assert_future::<C, _>(Collect::new(self))
}
/// Concatenate all items of a stream into a single extendable
@@ -506,7 +510,7 @@ pub trait StreamExt: Stream {
Self: Sized,
Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
{
- Concat::new(self)
+ assert_future::<Self::Item, _>(Concat::new(self))
}
/// Execute an accumulating asynchronous computation over a stream,
@@ -535,7 +539,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = T>,
Self: Sized,
{
- Fold::new(self, f, init)
+ assert_future::<T, _>(Fold::new(self, f, init))
}
/// Flattens a stream of streams into just one continuous stream.
@@ -574,7 +578,7 @@ pub trait StreamExt: Stream {
Self::Item: Stream,
Self: Sized,
{
- Flatten::new(self)
+ assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
}
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
@@ -611,7 +615,7 @@ pub trait StreamExt: Stream {
FlatMap::new(self, f)
}
- /// Combinator similar to [`StreamExt::fold`] that holds internal state
+ /// Combinator similar to [`StreamExt::fold`] that holds internal state
/// and produces a new stream.
///
/// Accepts initial state and closure which will be applied to each element
@@ -672,7 +676,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = bool>,
Self: Sized,
{
- SkipWhile::new(self, f)
+ assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
}
/// Take elements from this stream while the provided asynchronous predicate
@@ -702,7 +706,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = bool>,
Self: Sized,
{
- TakeWhile::new(self, f)
+ assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
}
/// Take elements from this stream until the provided future resolves.
@@ -788,7 +792,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = ()>,
Self: Sized,
{
- ForEach::new(self, f)
+ assert_future::<(), _>(ForEach::new(self, f))
}
/// Runs this stream to completion, executing the provided asynchronous
@@ -848,7 +852,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = ()>,
Self: Sized,
{
- ForEachConcurrent::new(self, limit.into(), f)
+ assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
}
/// Creates a new stream of at most `n` items of the underlying stream.
@@ -871,7 +875,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Take::new(self, n)
+ assert_stream::<Self::Item, _>(Take::new(self, n))
}
/// Creates a new stream which skips `n` items of the underlying stream.
@@ -894,7 +898,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Skip::new(self, n)
+ assert_stream::<Self::Item, _>(Skip::new(self, n))
}
/// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
@@ -940,7 +944,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Fuse::new(self)
+ assert_stream::<Self::Item, _>(Fuse::new(self))
}
/// Borrows a stream, rather than consuming it.
@@ -1018,7 +1022,7 @@ pub trait StreamExt: Stream {
where
Self: Sized + std::panic::UnwindSafe,
{
- CatchUnwind::new(self)
+ assert_stream(CatchUnwind::new(self))
}
/// Wrap the stream in a Box, pinning it.
@@ -1030,7 +1034,7 @@ pub trait StreamExt: Stream {
where
Self: Sized + Send + 'a,
{
- Box::pin(self)
+ assert_stream::<Self::Item, _>(Box::pin(self))
}
/// Wrap the stream in a Box, pinning it.
@@ -1044,7 +1048,7 @@ pub trait StreamExt: Stream {
where
Self: Sized + 'a,
{
- Box::pin(self)
+ assert_stream::<Self::Item, _>(Box::pin(self))
}
/// An adaptor for creating a buffered list of pending futures.
@@ -1066,7 +1070,7 @@ pub trait StreamExt: Stream {
Self::Item: Future,
Self: Sized,
{
- Buffered::new(self, n)
+ assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
}
/// An adaptor for creating a buffered list of pending futures (unordered).
@@ -1111,7 +1115,7 @@ pub trait StreamExt: Stream {
Self::Item: Future,
Self: Sized,
{
- BufferUnordered::new(self, n)
+ assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
}
/// An adapter for zipping two streams together.
@@ -1141,7 +1145,7 @@ pub trait StreamExt: Stream {
St: Stream,
Self: Sized,
{
- Zip::new(self, other)
+ assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
}
/// Adapter for chaining two streams.
@@ -1172,7 +1176,7 @@ pub trait StreamExt: Stream {
St: Stream<Item = Self::Item>,
Self: Sized,
{
- Chain::new(self, other)
+ assert_stream::<Self::Item, _>(Chain::new(self, other))
}
/// Creates a new stream which exposes a `peek` method.
@@ -1182,7 +1186,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Peekable::new(self)
+ assert_stream::<Self::Item, _>(Peekable::new(self))
}
/// An adaptor for chunking up items of the stream inside a vector.
@@ -1208,7 +1212,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- Chunks::new(self, capacity)
+ assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity))
}
/// An adaptor for chunking up ready items of the stream inside a vector.
@@ -1248,6 +1252,7 @@ pub trait StreamExt: Stream {
/// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
/// order to preserve access to the `Sink`.
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn forward<S>(self, sink: S) -> Forward<Self, S>
where
S: Sink<Self::Ok, Error = Self::Error>,
@@ -1266,13 +1271,15 @@ pub trait StreamExt: Stream {
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
#[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
where
Self: Sink<Item> + Sized,
{
- split::split(self)
+ let (sink, stream) = split::split(self);
+ (sink, assert_stream::<Self::Item, _>(stream))
}
/// Do something with each item of this stream, afterwards passing it on.
@@ -1285,7 +1292,7 @@ pub trait StreamExt: Stream {
F: FnMut(&Self::Item),
Self: Sized,
{
- Inspect::new(self, f)
+ assert_stream::<Self::Item, _>(Inspect::new(self, f))
}
/// Wrap this stream in an `Either` stream, making it the left-hand variant
@@ -1298,7 +1305,7 @@ pub trait StreamExt: Stream {
B: Stream<Item = Self::Item>,
Self: Sized,
{
- Either::Left(self)
+ assert_stream::<Self::Item, _>(Either::Left(self))
}
/// Wrap this stream in an `Either` stream, making it the right-hand variant
@@ -1311,7 +1318,7 @@ pub trait StreamExt: Stream {
B: Stream<Item = Self::Item>,
Self: Sized,
{
- Either::Right(self)
+ assert_stream::<Self::Item, _>(Either::Right(self))
}
/// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index fb0f874..1d8c342 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -6,7 +6,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// A `Stream` that implements a `peek` method.
///
@@ -42,19 +42,17 @@ impl<St: Stream> Peekable<St> {
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
- #[project]
pub fn poll_peek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
- #[project]
- let Peekable { mut stream, peeked } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if peeked.is_some() {
- break peeked.as_ref();
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- *peeked = Some(item);
+ if this.peeked.is_some() {
+ break this.peeked.as_ref();
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ *this.peeked = Some(item);
} else {
break None;
}
@@ -71,14 +69,12 @@ impl<St: Stream> FusedStream for Peekable<St> {
impl<S: Stream> Stream for Peekable<S> {
type Item = S::Item;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- #[project]
- let Peekable { stream, peeked } = self.project();
- if let Some(item) = peeked.take() {
+ let this = self.project();
+ if let Some(item) = this.peeked.take() {
return Poll::Ready(Some(item));
}
- stream.poll_next(cx)
+ this.stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
index 2152cb7..192d3c6 100644
--- a/src/stream/stream/ready_chunks.rs
+++ b/src/stream/stream/ready_chunks.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
@@ -36,23 +36,21 @@ impl<St: Stream> ReadyChunks<St> where St: Stream {
impl<St: Stream> Stream for ReadyChunks<St> {
type Item = Vec<St::Item>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let ReadyChunks { items, cap, mut stream } = self.project();
+ let mut this = self.project();
loop {
- match stream.as_mut().poll_next(cx) {
+ match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
- return if items.is_empty() {
+ return if this.items.is_empty() {
Poll::Pending
} else {
- Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+ Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
}
}
@@ -60,19 +58,19 @@ impl<St: Stream> Stream for ReadyChunks<St> {
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
- items.push(item);
- if items.len() >= *cap {
- return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+ this.items.push(item);
+ if this.items.len() >= *this.cap {
+ return Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
- let last = if items.is_empty() {
+ let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(items, Vec::new());
+ let full_buf = mem::replace(this.items, Vec::new());
Some(full_buf)
};
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 0cdfcbc..dd0316d 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
struct StateFn<S, F> {
state: S,
@@ -75,28 +75,26 @@ where
{
type Item = B;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
if self.is_done_taking() {
return Poll::Ready(None);
}
- #[project]
- let Scan { mut stream, state_f, mut future } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
let item = ready!(fut.poll(cx));
- future.set(None);
+ this.future.set(None);
if item.is_none() {
- *state_f = None;
+ *this.state_f = None;
}
break item;
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- let state_f = state_f.as_mut().unwrap();
- future.set(Some((state_f.f)(&mut state_f.state, item)))
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ let state_f = this.state_f.as_mut().unwrap();
+ this.future.set(Some((state_f.f)(&mut state_f.state, item)))
} else {
break None;
}
diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs
index c0f6611..ea31b17 100644
--- a/src/stream/stream/skip.rs
+++ b/src/stream/stream/skip.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`skip`](super::StreamExt::skip) method.
#[pin_project]
@@ -35,22 +35,21 @@ impl<St: FusedStream> FusedStream for Skip<St> {
impl<St: Stream> Stream for Skip<St> {
type Item = St::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- #[project]
- let Skip { mut stream, remaining } = self.project();
- while *remaining > 0 {
- if ready!(stream.as_mut().poll_next(cx)).is_some() {
- *remaining -= 1;
+ let mut this = self.project();
+
+ while *this.remaining > 0 {
+ if ready!(this.stream.as_mut().poll_next(cx)).is_some() {
+ *this.remaining -= 1;
} else {
return Poll::Ready(None);
}
}
- stream.poll_next(cx)
+ this.stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs
index 3d664f2..2c58102 100644
--- a/src/stream/stream/skip_while.rs
+++ b/src/stream/stream/skip_while.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
#[pin_project]
@@ -71,30 +71,28 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F>
{
type Item = St::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<St::Item>> {
- #[project]
- let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
+ let mut this = self.project();
- if *done_skipping {
- return stream.poll_next(cx);
+ if *this.done_skipping {
+ return this.stream.poll_next(cx);
}
Poll::Ready(loop {
- if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let skipped = ready!(fut.poll(cx));
- let item = pending_item.take();
- pending_fut.set(None);
+ let item = this.pending_item.take();
+ this.pending_fut.set(None);
if !skipped {
- *done_skipping = true;
+ *this.done_skipping = true;
break item;
}
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- pending_fut.set(Some(f(&item)));
- *pending_item = Some(item);
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
} else {
break None;
}
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index c7237a2..991eb16 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -9,6 +9,7 @@ use crate::lock::BiLock;
/// A `Stream` part of the split pair
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub struct SplitStream<S>(BiLock<S>);
impl<S> Unpin for SplitStream<S> {}
@@ -43,6 +44,7 @@ fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
/// A `Sink` part of the split pair
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub struct SplitSink<S, Item> {
lock: BiLock<S>,
slot: Option<Item>,
@@ -119,6 +121,7 @@ pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>,
/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
/// of a `Stream + Split`, and thus could not be `reunite`d.
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs
index 4a68920..1956a82 100644
--- a/src/stream/stream/take.rs
+++ b/src/stream/stream/take.rs
@@ -4,7 +4,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`take`](super::StreamExt::take) method.
#[pin_project]
@@ -32,7 +32,6 @@ impl<St> Stream for Take<St>
{
type Item = St::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -40,13 +39,12 @@ impl<St> Stream for Take<St>
if self.remaining == 0 {
Poll::Ready(None)
} else {
- #[project]
- let Take { stream, remaining } = self.project();
- let next = ready!(stream.poll_next(cx));
+ let this = self.project();
+ let next = ready!(this.stream.poll_next(cx));
if next.is_some() {
- *remaining -= 1;
+ *this.remaining -= 1;
} else {
- *remaining = 0;
+ *this.remaining = 0;
}
Poll::Ready(next)
}
diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs
index 3662620..4e32ad8 100644
--- a/src/stream/stream/take_until.rs
+++ b/src/stream/stream/take_until.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
// FIXME: docs, tests
@@ -121,26 +121,24 @@ where
{
type Item = St::Item;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
- #[project]
- let TakeUntil { stream, mut fut, fut_result, free } = self.project();
+ let mut this = self.project();
- if let Some(f) = fut.as_mut().as_pin_mut() {
+ if let Some(f) = this.fut.as_mut().as_pin_mut() {
if let Poll::Ready(result) = f.poll(cx) {
- fut.set(None);
- *fut_result = Some(result);
+ this.fut.set(None);
+ *this.fut_result = Some(result);
}
}
- if !*free && fut.is_none() {
+ if !*this.free && this.fut.is_none() {
// Future resolved, inner stream stopped
Poll::Ready(None)
} else {
// Future either not resolved yet or taken out by the user
- let item = ready!(stream.poll_next(cx));
+ let item = ready!(this.stream.poll_next(cx));
if item.is_none() {
- fut.set(None);
+ this.fut.set(None);
}
Poll::Ready(item)
}
diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs
index d90061e..e3a6d00 100644
--- a/src/stream/stream/take_while.rs
+++ b/src/stream/stream/take_while.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`take_while`](super::StreamExt::take_while) method.
#[pin_project]
@@ -61,7 +61,6 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
{
type Item = St::Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -70,23 +69,22 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
return Poll::Ready(None);
}
- #[project]
- let TakeWhile { mut stream, f, mut pending_fut, pending_item, done_taking } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let take = ready!(fut.poll(cx));
- let item = pending_item.take();
- pending_fut.set(None);
+ let item = this.pending_item.take();
+ this.pending_fut.set(None);
if take {
break item;
} else {
- *done_taking = true;
+ *this.done_taking = true;
break None;
}
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- pending_fut.set(Some(f(&item)));
- *pending_item = Some(item);
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
} else {
break None;
}
diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs
index d54512e..1334a6c 100644
--- a/src/stream/stream/then.rs
+++ b/src/stream/stream/then.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`then`](super::StreamExt::then) method.
#[pin_project]
@@ -63,21 +63,19 @@ impl<St, Fut, F> Stream for Then<St, Fut, F>
{
type Item = Fut::Output;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<Fut::Output>> {
- #[project]
- let Then { mut stream, f, mut future } = self.project();
+ ) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
let item = ready!(fut.poll(cx));
- future.set(None);
+ this.future.set(None);
break Some(item);
- } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
- future.set(Some(f(item)));
+ } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+ this.future.set(Some((this.f)(item)));
} else {
break None;
}
diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs
index 6c148fb..522f7e1 100644
--- a/src/stream/stream/zip.rs
+++ b/src/stream/stream/zip.rs
@@ -3,7 +3,7 @@ use core::cmp;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`zip`](super::StreamExt::zip) method.
#[pin_project]
@@ -49,10 +49,8 @@ impl<St1: Stream, St2: Stream> Zip<St1, St2> {
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
- unsafe {
- let Self { stream1, stream2, .. } = self.get_unchecked_mut();
- (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
- }
+ let this = self.project();
+ (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
}
/// Consumes this combinator, returning the underlying streams.
@@ -77,31 +75,29 @@ impl<St1, St2> Stream for Zip<St1, St2>
{
type Item = (St1::Item, St2::Item);
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project();
+ let mut this = self.project();
- if queued1.is_none() {
- match stream1.as_mut().poll_next(cx) {
- Poll::Ready(Some(item1)) => *queued1 = Some(item1),
+ if this.queued1.is_none() {
+ match this.stream1.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item1)) => *this.queued1 = Some(item1),
Poll::Ready(None) | Poll::Pending => {}
}
}
- if queued2.is_none() {
- match stream2.as_mut().poll_next(cx) {
- Poll::Ready(Some(item2)) => *queued2 = Some(item2),
+ if this.queued2.is_none() {
+ match this.stream2.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item2)) => *this.queued2 = Some(item2),
Poll::Ready(None) | Poll::Pending => {}
}
}
- if queued1.is_some() && queued2.is_some() {
- let pair = (queued1.take().unwrap(), queued2.take().unwrap());
+ if this.queued1.is_some() && this.queued2.is_some() {
+ let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap());
Poll::Ready(Some(pair))
- } else if stream1.is_done() || stream2.is_done() {
+ } else if this.stream1.is_done() || this.stream2.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index 563ed34..4d24c4f 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
#[pin_project]
@@ -50,21 +50,19 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
{
type Item = Result<Fut::Ok, St::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let AndThen { mut stream, mut future, f } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
let item = ready!(fut.try_poll(cx));
- future.set(None);
+ this.future.set(None);
break Some(item);
- } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
- future.set(Some(f(item)));
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.future.set(Some((this.f)(item)));
} else {
break None;
}
diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs
index 71bbfd1..10291b0 100644
--- a/src/stream/try_stream/into_async_read.rs
+++ b/src/stream/try_stream/into_async_read.rs
@@ -9,6 +9,7 @@ use std::io::{Error, Result};
/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
pub struct IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 99d5a6d..0c8e108 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -103,6 +103,10 @@ mod try_skip_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_skip_while::TrySkipWhile;
+mod try_take_while;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_take_while::TryTakeWhile;
+
cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
mod try_buffer_unordered;
@@ -121,9 +125,12 @@ cfg_target_has_atomic! {
#[cfg(feature = "std")]
mod into_async_read;
#[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_async_read::IntoAsyncRead;
+use crate::future::assert_future;
+use crate::stream::assert_stream;
impl<S: ?Sized + TryStream> TryStreamExt for S {}
@@ -151,7 +158,7 @@ pub trait TryStreamExt: TryStream {
Self: Sized,
Self::Error: Into<E>,
{
- ErrInto::new(self)
+ assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
}
/// Wraps the current stream in a new stream which maps the success value
@@ -176,7 +183,7 @@ pub trait TryStreamExt: TryStream {
Self: Sized,
F: FnMut(Self::Ok) -> T,
{
- MapOk::new(self, f)
+ assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
}
/// Wraps the current stream in a new stream which maps the error value
@@ -201,7 +208,7 @@ pub trait TryStreamExt: TryStream {
Self: Sized,
F: FnMut(Self::Error) -> E,
{
- MapErr::new(self, f)
+ assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
}
/// Chain on a computation for when a value is ready, passing the successful
@@ -248,7 +255,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Error = Self::Error>,
Self: Sized,
{
- AndThen::new(self, f)
+ assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
}
/// Chain on a computation for when an error happens, passing the
@@ -274,7 +281,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = Self::Ok>,
Self: Sized,
{
- OrElse::new(self, f)
+ assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
}
/// Do something with the success value of this stream, afterwards passing
@@ -288,7 +295,7 @@ pub trait TryStreamExt: TryStream {
F: FnMut(&Self::Ok),
Self: Sized,
{
- InspectOk::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
}
/// Do something with the error value of this stream, afterwards passing it on.
@@ -301,7 +308,7 @@ pub trait TryStreamExt: TryStream {
F: FnMut(&Self::Error),
Self: Sized,
{
- InspectErr::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
}
/// Wraps a [`TryStream`] into a type that implements
@@ -329,7 +336,7 @@ pub trait TryStreamExt: TryStream {
where
Self: Sized,
{
- IntoStream::new(self)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
}
/// Creates a future that attempts to resolve the next item in the stream.
@@ -356,7 +363,7 @@ pub trait TryStreamExt: TryStream {
where
Self: Unpin,
{
- TryNext::new(self)
+ assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
}
/// Attempts to run this stream to completion, executing the provided
@@ -398,7 +405,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = (), Error = Self::Error>,
Self: Sized,
{
- TryForEach::new(self, f)
+ assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
}
/// Skip elements on this stream while the provided asynchronous predicate
@@ -428,7 +435,37 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
{
- TrySkipWhile::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
+ }
+
+ /// Take elements on this stream while the provided asynchronous predicate
+ /// resolves to `true`.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future;
+ /// use futures::stream::{self, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
+ /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
+ ///
+ /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
+ /// assert_eq!(output, Ok(vec![1, 2]));
+ /// # })
+ /// ```
+ fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
+ where
+ F: FnMut(&Self::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = Self::Error>,
+ Self: Sized,
+ {
+ TryTakeWhile::new(self, f)
}
/// Attempts to run this stream to completion, executing the provided asynchronous
@@ -484,7 +521,11 @@ pub trait TryStreamExt: TryStream {
Fut: Future<Output = Result<(), Self::Error>>,
Self: Sized,
{
- TryForEachConcurrent::new(self, limit.into(), f)
+ assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
+ self,
+ limit.into(),
+ f,
+ ))
}
/// Attempt to transform a stream into a collection,
@@ -521,7 +562,7 @@ pub trait TryStreamExt: TryStream {
where
Self: Sized,
{
- TryCollect::new(self)
+ assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
}
/// Attempt to filter the values produced by this stream according to the
@@ -560,7 +601,7 @@ pub trait TryStreamExt: TryStream {
F: FnMut(&Self::Ok) -> Fut,
Self: Sized,
{
- TryFilter::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
}
/// Attempt to filter the values produced by this stream while
@@ -601,7 +642,7 @@ pub trait TryStreamExt: TryStream {
F: FnMut(Self::Ok) -> Fut,
Self: Sized,
{
- TryFilterMap::new(self, f)
+ assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
}
/// Flattens a stream of streams into just one continuous stream.
@@ -648,7 +689,9 @@ pub trait TryStreamExt: TryStream {
<Self::Ok as TryStream>::Error: From<Self::Error>,
Self: Sized,
{
- TryFlatten::new(self)
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlatten::new(self),
+ )
}
/// Attempt to execute an accumulating asynchronous computation over a
@@ -685,7 +728,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = T, Error = Self::Error>,
Self: Sized,
{
- TryFold::new(self, f, init)
+ assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
}
/// Attempt to concatenate all items of a stream into a single
@@ -727,7 +770,7 @@ pub trait TryStreamExt: TryStream {
Self: Sized,
Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
{
- TryConcat::new(self)
+ assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
}
/// Attempt to execute several futures from a stream concurrently.
@@ -794,7 +837,9 @@ pub trait TryStreamExt: TryStream {
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
{
- TryBufferUnordered::new(self, n)
+ assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
+ TryBufferUnordered::new(self, n),
+ )
}
// TODO: false positive warning from rustdoc. Verify once #43466 settles
@@ -831,6 +876,7 @@ pub trait TryStreamExt: TryStream {
/// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
/// ```
#[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> Compat<Self>
where
Self: Sized + Unpin,
@@ -864,6 +910,7 @@ pub trait TryStreamExt: TryStream {
/// # })
/// ```
#[cfg(feature = "io")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index 0bba0d0..d972c0f 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
#[pin_project]
@@ -50,24 +50,22 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
{
type Item = Result<St::Ok, Fut::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let OrElse { mut stream, mut future, f } = self.project();
+ let mut this = self.project();
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
let item = ready!(fut.try_poll(cx));
- future.set(None);
+ this.future.set(None);
break Some(item);
} else {
- match ready!(stream.as_mut().try_poll_next(cx)) {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(item)) => break Some(Ok(item)),
Some(Err(e)) => {
- future.set(Some(f(e)));
+ this.future.set(Some((this.f)(e)));
},
None => break None,
}
diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs
index 566868b..c1c931a 100644
--- a/src/stream/try_stream/try_buffer_unordered.rs
+++ b/src/stream/try_stream/try_buffer_unordered.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
use core::pin::Pin;
/// Stream for the
@@ -43,31 +43,29 @@ impl<St> Stream for TryBufferUnordered<St>
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project();
+ let mut this = self.project();
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures. Propagate errors from the stream immediately.
- while in_progress_queue.len() < *max {
- match stream.as_mut().poll_next(cx)? {
- Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()),
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- match in_progress_queue.poll_next_unpin(cx) {
+ match this.in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
// If more values are still coming from the stream, we're not done yet
- if stream.is_done() {
+ if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index 3c9aee2..76ce619 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, TryStream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
#[pin_project]
@@ -41,17 +41,15 @@ where
{
type Output = Result<C, St::Error>;
- #[project]
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- #[project]
- let TryCollect { mut stream, items } = self.project();
+ let mut this = self.project();
Poll::Ready(Ok(loop {
- match ready!(stream.as_mut().try_poll_next(cx)?) {
- Some(x) => items.extend(Some(x)),
- None => break mem::replace(items, Default::default()),
+ match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ Some(x) => this.items.extend(Some(x)),
+ None => break mem::replace(this.items, Default::default()),
}
}))
}
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 8c9710b..235a2c5 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
#[pin_project]
@@ -34,19 +34,18 @@ where
{
type Output = Result<St::Ok, St::Error>;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
- let TryConcat { mut stream, accum } = self.project();
+ let mut this = self.project();
+
Poll::Ready(Ok(loop {
- if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) {
- if let Some(a) = accum {
+ if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ if let Some(a) = this.accum {
a.extend(x)
} else {
- *accum = Some(x)
+ *this.accum = Some(x)
}
} else {
- break accum.take().unwrap_or_default();
+ break this.accum.take().unwrap_or_default();
}
}))
}
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 310f991..38d060c 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
/// method.
@@ -69,24 +69,23 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<Result<St::Ok, St::Error>>> {
- #[project]
- let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project();
+ ) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
Poll::Ready(loop {
- if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let res = ready!(fut.poll(cx));
- pending_fut.set(None);
+ this.pending_fut.set(None);
if res {
- break pending_item.take().map(Ok);
+ break this.pending_item.take().map(Ok);
}
- *pending_item = None;
- } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
- pending_fut.set(Some(f(&item)));
- *pending_item = Some(item);
+ *this.pending_item = None;
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
} else {
break None;
}
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index ba8e43a..6c1e48f 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
/// method.
@@ -57,24 +57,23 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
{
type Item = Result<T, St::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<Result<T, St::Error>>> {
- #[project]
- let TryFilterMap { mut stream, f, mut pending } = self.project();
+ ) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
Poll::Ready(loop {
- if let Some(p) = pending.as_mut().as_pin_mut() {
+ if let Some(p) = this.pending.as_mut().as_pin_mut() {
// We have an item in progress, poll that until it's done
let item = ready!(p.try_poll(cx)?);
- pending.set(None);
+ this.pending.set(None);
if item.is_some() {
break item.map(Ok);
}
- } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
// No item in progress, but the stream is still going
- pending.set(Some(f(item)));
+ this.pending.set(Some((this.f)(item)));
} else {
// The stream is done
break None;
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index a528639..5227903 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
#[pin_project]
@@ -51,19 +51,18 @@ where
{
type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
- #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- #[project]
- let TryFlatten { mut stream, mut next } = self.project();
+ let mut this = self.project();
+
Poll::Ready(loop {
- if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(s) = this.next.as_mut().as_pin_mut() {
if let Some(item) = ready!(s.try_poll_next(cx)?) {
break Some(Ok(item));
} else {
- next.set(None);
+ this.next.set(None);
}
- } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) {
- next.set(Some(s));
+ } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.next.set(Some(s));
} else {
break None;
}
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs
index d85c1fe..abeeea4 100644
--- a/src/stream/try_stream/try_fold.rs
+++ b/src/stream/try_stream/try_fold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
#[pin_project]
@@ -64,25 +64,24 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
{
type Output = Result<T, St::Error>;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
- let TryFold { mut stream, f, accum, mut future } = self.project();
+ let mut this = self.project();
+
Poll::Ready(loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
// we're currently processing a future to produce a new accum value
let res = ready!(fut.try_poll(cx));
- future.set(None);
+ this.future.set(None);
match res {
- Ok(a) => *accum = Some(a),
+ Ok(a) => *this.accum = Some(a),
Err(e) => break Err(e),
}
- } else if accum.is_some() {
+ } else if this.accum.is_some() {
// we're waiting on a new item from the stream
- let res = ready!(stream.as_mut().try_poll_next(cx));
- let a = accum.take().unwrap();
+ let res = ready!(this.stream.as_mut().try_poll_next(cx));
+ let a = this.accum.take().unwrap();
match res {
- Some(Ok(item)) => future.set(Some(f(a, item))),
+ Some(Ok(item)) => this.future.set(Some((this.f)(a, item))),
Some(Err(e)) => break Err(e),
None => break Ok(a),
}
diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs
index 5fc91df..a00e911 100644
--- a/src/stream/try_stream/try_for_each.rs
+++ b/src/stream/try_stream/try_for_each.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
#[pin_project]
@@ -50,17 +50,15 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
{
type Output = Result<(), St::Error>;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
- let TryForEach { mut stream, f, mut future } = self.project();
+ let mut this = self.project();
loop {
- if let Some(fut) = future.as_mut().as_pin_mut() {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
ready!(fut.try_poll(cx))?;
- future.set(None);
+ this.future.set(None);
} else {
- match ready!(stream.as_mut().try_poll_next(cx)?) {
- Some(e) => future.set(Some(f(e))),
+ match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ Some(e) => this.future.set(Some((this.f)(e))),
None => break,
}
}
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs
index 87fd465..db8e505 100644
--- a/src/stream/try_stream/try_for_each_concurrent.rs
+++ b/src/stream/try_stream/try_for_each_concurrent.rs
@@ -6,7 +6,7 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Future for the
/// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
@@ -68,16 +68,14 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
{
type Output = Result<(), St::Error>;
- #[project]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- #[project]
- let TryForEachConcurrent { mut stream, f, futures, limit } = self.project();
+ let mut this = self.project();
loop {
let mut made_progress_this_iter = false;
// Check if we've already created a number of futures greater than `limit`
- if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
- let poll_res = match stream.as_mut().as_pin_mut() {
+ if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
+ let poll_res = match this.stream.as_mut().as_pin_mut() {
Some(stream) => stream.try_poll_next(cx),
None => Poll::Ready(None),
};
@@ -88,28 +86,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Some(elem)
},
Poll::Ready(None) => {
- stream.set(None);
+ this.stream.set(None);
None
}
Poll::Pending => None,
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- stream.set(None);
- drop(mem::replace(futures, FuturesUnordered::new()));
+ this.stream.set(None);
+ drop(mem::replace(this.futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
};
if let Some(elem) = elem {
- futures.push(f(elem));
+ this.futures.push((this.f)(elem));
}
}
- match futures.poll_next_unpin(cx) {
+ match this.futures.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
Poll::Ready(None) => {
- if stream.is_none() {
+ if this.stream.is_none() {
return Poll::Ready(Ok(()))
}
},
@@ -117,8 +115,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- stream.set(None);
- drop(mem::replace(futures, FuturesUnordered::new()));
+ this.stream.set(None);
+ drop(mem::replace(this.futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
}
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index 624380f..35759d0 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
/// method.
@@ -62,30 +62,28 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
+ let mut this = self.project();
- if *done_skipping {
- return stream.try_poll_next(cx);
+ if *this.done_skipping {
+ return this.stream.try_poll_next(cx);
}
Poll::Ready(loop {
- if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let skipped = ready!(fut.try_poll(cx)?);
- let item = pending_item.take();
- pending_fut.set(None);
+ let item = this.pending_item.take();
+ this.pending_fut.set(None);
if !skipped {
- *done_skipping = true;
+ *this.done_skipping = true;
break item.map(Ok);
}
- } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
- pending_fut.set(Some(f(&item)));
- *pending_item = Some(item);
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
} else {
break None;
}
diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs
new file mode 100644
index 0000000..16bfb20
--- /dev/null
+++ b/src/stream/try_stream/try_take_while.rs
@@ -0,0 +1,132 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project::pin_project;
+
+/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
+/// method.
+#[pin_project]
+#[must_use = "streams do nothing unless polled"]
+pub struct TryTakeWhile<St, Fut, F>
+where
+ St: TryStream,
+{
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ done_taking: bool,
+}
+
+impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream + fmt::Debug,
+ St::Ok: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryTakeWhile")
+ .field("stream", &self.stream)
+ .field("pending_fut", &self.pending_fut)
+ .field("pending_item", &self.pending_item)
+ .field("done_taking", &self.done_taking)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> TryTakeWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> {
+ TryTakeWhile {
+ stream,
+ f,
+ pending_fut: None,
+ pending_item: None,
+ done_taking: false,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ type Item = Result<St::Ok, St::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ if *this.done_taking {
+ return Poll::Ready(None);
+ }
+
+ Poll::Ready(loop {
+ if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
+ let take = ready!(fut.try_poll(cx)?);
+ let item = this.pending_item.take();
+ this.pending_fut.set(None);
+ if take {
+ break item.map(Ok);
+ } else {
+ *this.done_taking = true;
+ break None;
+ }
+ } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+ this.pending_fut.set(Some((this.f)(&item)));
+ *this.pending_item = Some(item);
+ } else {
+ break None;
+ }
+ })
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done_taking {
+ return (0, Some(0));
+ }
+
+ let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let (_, upper) = self.stream.size_hint();
+ let upper = match upper {
+ Some(x) => x.checked_add(pending_len),
+ None => None,
+ };
+ (0, upper) // can't know a lower bound, due to the predicate
+ }
+}
+
+impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
+where
+ St: TryStream + FusedStream,
+ F: FnMut(&St::Ok) -> Fut,
+ Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
+where
+ S: TryStream + Sink<Item, Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index 8da1248..a6b31fe 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
///
@@ -96,30 +96,28 @@ where
{
type Item = Result<Item, Fut::Error>;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- ) -> Poll<Option<Result<Item, Fut::Error>>> {
- #[project]
- let TryUnfold {f, state, mut fut } = self.project();
+ ) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
- if let Some(state) = state.take() {
- fut.set(Some(f(state)));
+ if let Some(state) = this.state.take() {
+ this.fut.set(Some((this.f)(state)));
}
- match fut.as_mut().as_pin_mut() {
+ match this.fut.as_mut().as_pin_mut() {
None => {
// The future previously errored
Poll::Ready(None)
}
Some(future) => {
let step = ready!(future.try_poll(cx));
- fut.set(None);
+ this.fut.set(None);
match step {
Ok(Some((item, next_state))) => {
- *state = Some(next_state);
+ *this.state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 0279571..b6f8eae 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
/// Creates a `Stream` from a seed and a closure returning a `Future`.
///
@@ -93,24 +93,22 @@ impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
{
type Item = Item;
- #[project]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- #[project]
- let Unfold { state, f, mut fut } = self.project();
+ let mut this = self.project();
- if let Some(state) = state.take() {
- fut.set(Some(f(state)));
+ if let Some(state) = this.state.take() {
+ this.fut.set(Some((this.f)(state)));
}
- let step = ready!(fut.as_mut().as_pin_mut()
+ let step = ready!(this.fut.as_mut().as_pin_mut()
.expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
- fut.set(None);
+ this.fut.set(None);
if let Some((item, next_state)) = step {
- *state = Some(next_state);
+ *this.state = Some(next_state);
Poll::Ready(Some(item))
} else {
Poll::Ready(None)
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index f34445a..f877923 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -69,6 +69,7 @@ pub trait SpawnExt: Spawn {
/// assert_eq!(block_on(join_handle_fut), 1);
/// ```
#[cfg(feature = "channel")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
fn spawn_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
where
@@ -83,6 +84,7 @@ pub trait SpawnExt: Spawn {
/// Wraps a [`Spawn`] and makes it usable as a futures 0.1 `Executor`.
/// Requires the `compat` feature to enable.
#[cfg(feature = "compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> Compat<Self>
where
Self: Sized,
@@ -145,6 +147,7 @@ pub trait LocalSpawnExt: LocalSpawn {
/// assert_eq!(executor.run_until(join_handle_fut), 1);
/// ```
#[cfg(feature = "channel")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
#[cfg(feature = "std")]
fn spawn_local_with_handle<Fut>(
&self,