diff options
author | Chih-Hung Hsieh <chh@google.com> | 2020-10-25 23:16:22 -0700 |
---|---|---|
committer | Chih-Hung Hsieh <chh@google.com> | 2020-10-25 23:16:22 -0700 |
commit | d45e96efa48a28ad7d831aa7da55f4b6fb9aa22d (patch) | |
tree | aed583542acdae126b0bf494e7a48c8bdbad44cd /src | |
parent | 10466ab5740c4fd628db8b98d4150f4867d09d38 (diff) | |
download | futures-util-d45e96efa48a28ad7d831aa7da55f4b6fb9aa22d.tar.gz |
Upgrade rust/crates/futures-util to 0.3.7
Test: make all rust crates
Change-Id: I9af426348ba630949d7f3facc0717f439cfe5a7e
Diffstat (limited to 'src')
96 files changed, 1119 insertions, 962 deletions
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs index 909cd3b..4200c08 100644 --- a/src/async_await/join_mod.rs +++ b/src/async_await/join_mod.rs @@ -22,8 +22,13 @@ macro_rules! document_join_macro { /// /// let a = async { 1 }; /// let b = async { 2 }; - /// /// assert_eq!(join!(a, b), (1, 2)); + /// + /// // `join!` is variadic, so you can pass any number of futures + /// let c = async { 3 }; + /// let d = async { 4 }; + /// let e = async { 5 }; + /// assert_eq!(join!(c, d, e), (3, 4, 5)); /// # }); /// ``` $join @@ -48,9 +53,14 @@ macro_rules! document_join_macro { /// use futures::try_join; /// /// let a = async { Ok::<i32, i32>(1) }; - /// let b = async { Ok::<u64, i32>(2) }; - /// + /// let b = async { Ok::<i32, i32>(2) }; /// assert_eq!(try_join!(a, b), Ok((1, 2))); + /// + /// // `try_join!` is variadic, so you can pass any number of futures + /// let c = async { Ok::<i32, i32>(3) }; + /// let d = async { Ok::<i32, i32>(4) }; + /// let e = async { Ok::<i32, i32>(5) }; + /// assert_eq!(try_join!(c, d, e), Ok((3, 4, 5))); /// # }); /// ``` /// @@ -83,7 +93,7 @@ document_join_macro! { #[macro_export] macro_rules! join { ($($tokens:tt)*) => {{ - use $crate::__reexport as __futures_crate; + use $crate::__private as __futures_crate; $crate::join_internal! { $( $tokens )* } @@ -93,7 +103,7 @@ document_join_macro! { #[macro_export] macro_rules! try_join { ($($tokens:tt)*) => {{ - use $crate::__reexport as __futures_crate; + use $crate::__private as __futures_crate; $crate::try_join_internal! { $( $tokens )* } diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs index 69cae13..bdaed95 100644 --- a/src/async_await/mod.rs +++ b/src/async_await/mod.rs @@ -3,37 +3,37 @@ //! This module contains a number of functions and combinators for working //! with `async`/`await` code. -use futures_core::future::Future; -use futures_core::stream::Stream; - -#[doc(hidden)] -pub use futures_core::future::FusedFuture; -#[doc(hidden)] -pub use futures_core::stream::FusedStream; +use futures_core::future::{Future, FusedFuture}; +use futures_core::stream::{Stream, FusedStream}; #[macro_use] mod poll; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 pub use self::poll::*; #[macro_use] mod pending; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 pub use self::pending::*; // Primary export is a macro #[cfg(feature = "async-await-macro")] mod join_mod; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 #[cfg(feature = "async-await-macro")] pub use self::join_mod::*; // Primary export is a macro #[cfg(feature = "async-await-macro")] mod select_mod; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 #[cfg(feature = "async-await-macro")] pub use self::select_mod::*; #[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] mod random; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762 #[cfg(feature = "std")] #[cfg(feature = "async-await-macro")] pub use self::random::*; diff --git a/src/async_await/pending.rs b/src/async_await/pending.rs index b143869..e0cf341 100644 --- a/src/async_await/pending.rs +++ b/src/async_await/pending.rs @@ -15,7 +15,7 @@ use futures_core::task::{Context, Poll}; #[macro_export] macro_rules! pending { () => { - $crate::async_await::pending_once().await + $crate::__private::async_await::pending_once().await } } diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs index dffa94b..ac70a53 100644 --- a/src/async_await/poll.rs +++ b/src/async_await/poll.rs @@ -12,7 +12,7 @@ use futures_core::task::{Context, Poll}; #[macro_export] macro_rules! poll { ($x:expr $(,)?) => { - $crate::async_await::poll($x).await + $crate::__private::async_await::poll($x).await } } diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs index 0471f09..47eca4d 100644 --- a/src/async_await/select_mod.rs +++ b/src/async_await/select_mod.rs @@ -14,7 +14,7 @@ macro_rules! document_select_macro { /// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin` /// requirement is relaxed, since the macro will pin the resulting `Future` /// on the stack. However the `Future` returned by the expression must - /// still implement `FusedFuture`. This difference is presented + /// still implement `FusedFuture`. /// /// Futures and streams which are not already fused can be fused using the /// `.fuse()` method. Note, though, that fusing a future or stream directly @@ -84,7 +84,7 @@ macro_rules! document_select_macro { /// a_res = async_identity_fn(62).fuse() => a_res + 1, /// b_res = async_identity_fn(13).fuse() => b_res, /// }; - /// assert!(res == 63 || res == 12); + /// assert!(res == 63 || res == 13); /// # }); /// ``` /// @@ -167,7 +167,7 @@ macro_rules! document_select_macro { /// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin` /// requirement is relaxed, since the macro will pin the resulting `Future` /// on the stack. However the `Future` returned by the expression must - /// still implement `FusedFuture`. This difference is presented + /// still implement `FusedFuture`. /// /// Futures and streams which are not already fused can be fused using the /// `.fuse()` method. Note, though, that fusing a future or stream directly @@ -322,7 +322,7 @@ document_select_macro! { #[macro_export] macro_rules! select { ($($tokens:tt)*) => {{ - use $crate::__reexport as __futures_crate; + use $crate::__private as __futures_crate; $crate::select_internal! { $( $tokens )* } @@ -332,7 +332,7 @@ document_select_macro! { #[macro_export] macro_rules! select_biased { ($($tokens:tt)*) => {{ - use $crate::__reexport as __futures_crate; + use $crate::__private as __futures_crate; $crate::select_biased_internal! { $( $tokens )* } diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs index 562a7ad..95025d2 100644 --- a/src/compat/compat01as03.rs +++ b/src/compat/compat01as03.rs @@ -15,6 +15,7 @@ use std::task::Context; use futures_sink::Sink as Sink03; #[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; @@ -115,6 +116,7 @@ impl<St: Stream01> Stream01CompatExt for St {} /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink) #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub trait Sink01CompatExt: Sink01 { /// Converts a futures 0.1 /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink) @@ -180,6 +182,7 @@ impl<St: Stream01> Stream03 for Compat01As03<St> { /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct Compat01As03Sink<S, SinkItem> { @@ -366,6 +369,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker { } #[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] mod io { use super::*; #[cfg(feature = "read-initializer")] @@ -375,6 +379,7 @@ mod io { use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead) + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] pub trait AsyncRead01CompatExt: AsyncRead01 { /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3 /// [`AsyncRead`](futures_io::AsyncRead). @@ -403,6 +408,7 @@ mod io { impl<R: AsyncRead01> AsyncRead01CompatExt for R {} /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] pub trait AsyncWrite01CompatExt: AsyncWrite01 { /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3 /// [`AsyncWrite`](futures_io::AsyncWrite). diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs index 3fd2ae0..4841c5e 100644 --- a/src/compat/compat03as01.rs +++ b/src/compat/compat03as01.rs @@ -40,6 +40,7 @@ pub struct Compat<T> { /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1 /// [`Sink`](futures_01::sink::Sink). #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct CompatSink<T, Item> { @@ -236,6 +237,7 @@ where } #[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] mod io { use super::*; use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; diff --git a/src/compat/mod.rs b/src/compat/mod.rs index 1826836..897a50a 100644 --- a/src/compat/mod.rs +++ b/src/compat/mod.rs @@ -9,11 +9,14 @@ pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; mod compat01as03; pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt}; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::compat01as03::{Compat01As03Sink, Sink01CompatExt}; #[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::compat03as01::CompatSink; @@ -140,6 +140,7 @@ trivial_fn_impls!(merge_result_fn <> MergeResultFn = "merge_result"); #[derive(Debug, Copy, Clone, Default)] pub struct InspectFn<F>(F); +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> FnOnce1<A> for InspectFn<F> where F: for<'a> FnOnce1<&'a A, Output=()>, @@ -150,6 +151,7 @@ where arg } } +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> FnMut1<A> for InspectFn<F> where F: for<'a> FnMut1<&'a A, Output=()>, @@ -159,6 +161,7 @@ where arg } } +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> Fn1<A> for InspectFn<F> where F: for<'a> Fn1<&'a A, Output=()>, diff --git a/src/future/either.rs b/src/future/either.rs index be28829..aa17fa7 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -4,11 +4,11 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Combines two different futures, streams, or sinks having the same associated types into a single /// type. -#[pin_project] +#[pin_project(project = EitherProj)] #[derive(Debug, Clone)] pub enum Either<A, B> { /// First branch of the type @@ -58,12 +58,10 @@ where { type Output = A::Output; - #[project] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<A::Output> { - #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.project() { - Either::Left(x) => x.poll(cx), - Either::Right(x) => x.poll(cx), + EitherProj::Left(x) => x.poll(cx), + EitherProj::Right(x) => x.poll(cx), } } } @@ -88,12 +86,10 @@ where { type Item = A::Item; - #[project] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<A::Item>> { - #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { match self.project() { - Either::Left(x) => x.poll_next(cx), - Either::Right(x) => x.poll_next(cx), + EitherProj::Left(x) => x.poll_next(cx), + EitherProj::Right(x) => x.poll_next(cx), } } } @@ -119,39 +115,31 @@ where { type Error = A::Error; - #[project] fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - #[project] match self.project() { - Either::Left(x) => x.poll_ready(cx), - Either::Right(x) => x.poll_ready(cx), + EitherProj::Left(x) => x.poll_ready(cx), + EitherProj::Right(x) => x.poll_ready(cx), } } - #[project] fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - #[project] match self.project() { - Either::Left(x) => x.start_send(item), - Either::Right(x) => x.start_send(item), + EitherProj::Left(x) => x.start_send(item), + EitherProj::Right(x) => x.start_send(item), } } - #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - #[project] match self.project() { - Either::Left(x) => x.poll_flush(cx), - Either::Right(x) => x.poll_flush(cx), + EitherProj::Left(x) => x.poll_flush(cx), + EitherProj::Right(x) => x.poll_flush(cx), } } - #[project] fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - #[project] match self.project() { - Either::Left(x) => x.poll_close(cx), - Either::Right(x) => x.poll_close(cx), + EitherProj::Left(x) => x.poll_close(cx), + EitherProj::Right(x) => x.poll_close(cx), } } } @@ -182,29 +170,25 @@ mod if_std { } } - #[project] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { - #[project] match self.project() { - Either::Left(x) => x.poll_read(cx, buf), - Either::Right(x) => x.poll_read(cx, buf), + EitherProj::Left(x) => x.poll_read(cx, buf), + EitherProj::Right(x) => x.poll_read(cx, buf), } } - #[project] fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>> { - #[project] match self.project() { - Either::Left(x) => x.poll_read_vectored(cx, bufs), - Either::Right(x) => x.poll_read_vectored(cx, bufs), + EitherProj::Left(x) => x.poll_read_vectored(cx, bufs), + EitherProj::Right(x) => x.poll_read_vectored(cx, bufs), } } } @@ -214,47 +198,39 @@ mod if_std { A: AsyncWrite, B: AsyncWrite, { - #[project] fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>> { - #[project] match self.project() { - Either::Left(x) => x.poll_write(cx, buf), - Either::Right(x) => x.poll_write(cx, buf), + EitherProj::Left(x) => x.poll_write(cx, buf), + EitherProj::Right(x) => x.poll_write(cx, buf), } } - #[project] fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>> { - #[project] match self.project() { - Either::Left(x) => x.poll_write_vectored(cx, bufs), - Either::Right(x) => x.poll_write_vectored(cx, bufs), + EitherProj::Left(x) => x.poll_write_vectored(cx, bufs), + EitherProj::Right(x) => x.poll_write_vectored(cx, bufs), } } - #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - #[project] match self.project() { - Either::Left(x) => x.poll_flush(cx), - Either::Right(x) => x.poll_flush(cx), + EitherProj::Left(x) => x.poll_flush(cx), + EitherProj::Right(x) => x.poll_flush(cx), } } - #[project] fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - #[project] match self.project() { - Either::Left(x) => x.poll_close(cx), - Either::Right(x) => x.poll_close(cx), + EitherProj::Left(x) => x.poll_close(cx), + EitherProj::Right(x) => x.poll_close(cx), } } } @@ -264,16 +240,14 @@ mod if_std { A: AsyncSeek, B: AsyncSeek, { - #[project] fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>> { - #[project] match self.project() { - Either::Left(x) => x.poll_seek(cx, pos), - Either::Right(x) => x.poll_seek(cx, pos), + EitherProj::Left(x) => x.poll_seek(cx, pos), + EitherProj::Right(x) => x.poll_seek(cx, pos), } } } @@ -283,24 +257,17 @@ mod if_std { A: AsyncBufRead, B: AsyncBufRead, { - #[project] - fn poll_fill_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<&[u8]>> { - #[project] + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { match self.project() { - Either::Left(x) => x.poll_fill_buf(cx), - Either::Right(x) => x.poll_fill_buf(cx), + EitherProj::Left(x) => x.poll_fill_buf(cx), + EitherProj::Right(x) => x.poll_fill_buf(cx), } } - #[project] fn consume(self: Pin<&mut Self>, amt: usize) { - #[project] match self.project() { - Either::Left(x) => x.consume(amt), - Either::Right(x) => x.consume(amt), + EitherProj::Left(x) => x.consume(amt), + EitherProj::Right(x) => x.consume(amt), } } } diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs index f59464c..53f75e2 100644 --- a/src/future/future/flatten.rs +++ b/src/future/future/flatten.rs @@ -4,9 +4,9 @@ use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; -#[pin_project] +#[pin_project(project = FlattenProj)] #[derive(Debug)] pub enum Flatten<Fut1, Fut2> { First(#[pin] Fut1), @@ -38,21 +38,19 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> { type Output = <Fut::Output as Future>::Output; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - Flatten::First(f) => { + FlattenProj::First(f) => { let f = ready!(f.poll(cx)); self.set(Flatten::Second(f)); }, - Flatten::Second(f) => { + FlattenProj::Second(f) => { let output = ready!(f.poll(cx)); self.set(Flatten::Empty); break output; }, - Flatten::Empty => panic!("Flatten polled after completion"), + FlattenProj::Empty => panic!("Flatten polled after completion"), } }) } @@ -76,23 +74,21 @@ impl<Fut> Stream for Flatten<Fut, Fut::Output> { type Item = <Fut::Output as Stream>::Item; - #[project] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - Flatten::First(f) => { + FlattenProj::First(f) => { let f = ready!(f.poll(cx)); self.set(Flatten::Second(f)); }, - Flatten::Second(f) => { + FlattenProj::Second(f) => { let output = ready!(f.poll_next(cx)); if output.is_none() { self.set(Flatten::Empty); } break output; }, - Flatten::Empty => break None, + FlattenProj::Empty => break None, } }) } @@ -107,54 +103,46 @@ where { type Error = <Fut::Output as Sink<Item>>::Error; - #[project] fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - Flatten::First(f) => { + FlattenProj::First(f) => { let f = ready!(f.poll(cx)); self.set(Flatten::Second(f)); }, - Flatten::Second(f) => { + FlattenProj::Second(f) => { break ready!(f.poll_ready(cx)); }, - Flatten::Empty => panic!("poll_ready called after eof"), + FlattenProj::Empty => panic!("poll_ready called after eof"), } }) } - #[project] fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - #[project] match self.project() { - Flatten::First(_) => panic!("poll_ready not called first"), - Flatten::Second(f) => f.start_send(item), - Flatten::Empty => panic!("start_send called after eof"), + FlattenProj::First(_) => panic!("poll_ready not called first"), + FlattenProj::Second(f) => f.start_send(item), + FlattenProj::Empty => panic!("start_send called after eof"), } } - #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - #[project] match self.project() { - Flatten::First(_) => Poll::Ready(Ok(())), - Flatten::Second(f) => f.poll_flush(cx), - Flatten::Empty => panic!("poll_flush called after eof"), + FlattenProj::First(_) => Poll::Ready(Ok(())), + FlattenProj::Second(f) => f.poll_flush(cx), + FlattenProj::Empty => panic!("poll_flush called after eof"), } } - #[project] fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - #[project] let res = match self.as_mut().project() { - Flatten::Second(f) => f.poll_close(cx), + FlattenProj::Second(f) => f.poll_close(cx), _ => Poll::Ready(Ok(())), }; if res.is_ready() { diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index 69a8a69..9f2e1ca 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs @@ -38,7 +38,7 @@ impl<Fut: Future> Fuse<Fut> { /// sender.unbounded_send(()).unwrap(); /// drop(sender); /// - /// // Use `Fuse::termianted()` to create an already-terminated future + /// // Use `Fuse::terminated()` to create an already-terminated future /// // which may be instantiated later. /// let foo_printer = Fuse::terminated(); /// pin_mut!(foo_printer); diff --git a/src/future/future/map.rs b/src/future/future/map.rs index 080f871..8e7f636 100644 --- a/src/future/future/map.rs +++ b/src/future/future/map.rs @@ -1,13 +1,12 @@ use core::pin::Pin; -use core::ptr; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use crate::fns::FnOnce1; /// Internal Map future -#[pin_project] +#[pin_project(project = MapProj, project_replace = MapProjOwn)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub enum Map<Fut, F> { @@ -19,17 +18,6 @@ pub enum Map<Fut, F> { Complete, } -// Helper type to mark a `Map` as complete without running its destructor. -struct UnsafeMarkAsComplete<Fut, F>(*mut Map<Fut, F>); - -impl<Fut, F> Drop for UnsafeMarkAsComplete<Fut, F> { - fn drop(&mut self) { - unsafe { - ptr::write(self.0, Map::Complete); - } - } -} - impl<Fut, F> Map<Fut, F> { /// Creates a new Map. pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> { @@ -55,35 +43,16 @@ impl<Fut, F, T> Future for Map<Fut, F> { type Output = T; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { - unsafe { - // Store this pointer for later... - let self_ptr: *mut Self = self.as_mut().get_unchecked_mut(); - - match &mut *self_ptr { - Map::Incomplete { future, f } => { - let mut future = Pin::new_unchecked(future); - let output = match future.as_mut().poll(cx) { - Poll::Ready(x) => x, - Poll::Pending => return Poll::Pending, - }; - - // Here be dragons - let f = ptr::read(f); - { - // The ordering here is important, the call to `drop_in_place` must be - // last as it may panic. Other lines must not panic. - let _cleanup = UnsafeMarkAsComplete(self_ptr); - ptr::drop_in_place(future.get_unchecked_mut()); - }; - - // Phew, everything is back to normal, and we should be in the - // `Complete` state! - Poll::Ready(f.call_once(output)) - }, - Map::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"), - } + match self.as_mut().project() { + MapProj::Incomplete { future, .. } => { + let output = ready!(future.poll(cx)); + match self.project_replace(Map::Complete) { + MapProjOwn::Incomplete { f, .. } => Poll::Ready(f.call_once(output)), + MapProjOwn::Complete => unreachable!(), + } + }, + MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"), } } } diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index c3e035b..f5d5dd2 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -3,11 +3,14 @@ //! This module contains a number of functions for working with `Future`s, //! including the `FutureExt` trait which adds methods to `Future` types. -use super::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; +use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; +use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; +use crate::never::Never; #[cfg(feature = "alloc")] use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::{ @@ -15,8 +18,7 @@ use futures_core::{ stream::Stream, task::{Context, Poll}, }; -use crate::never::Never; -use crate::fns::{OkFn, ok_fn, IntoFn, into_fn, InspectFn, inspect_fn}; +use pin_utils::pin_mut; // Combinators @@ -44,7 +46,7 @@ delegate_all!( pub use fuse::Fuse; delegate_all!( - /// Future for the [`flatten`](super::FutureExt::flatten) method. + /// Future for the [`map`](super::FutureExt::map) method. Map<Fut, F>( map::Map<Fut, F> ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)] @@ -99,9 +101,11 @@ mod catch_unwind; pub use self::catch_unwind::CatchUnwind; #[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] mod remote_handle; #[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::remote_handle::{Remote, RemoteHandle}; @@ -220,7 +224,7 @@ pub trait FutureExt: Future { B: Future<Output = Self::Output>, Self: Sized, { - Either::Left(self) + assert_future::<Self::Output, _>(Either::Left(self)) } /// Wrap this future in an `Either` future, making it the right-hand variant @@ -250,7 +254,7 @@ pub trait FutureExt: Future { A: Future<Output = Self::Output>, Self: Sized, { - Either::Right(self) + assert_future::<Self::Output, _>(Either::Right(self)) } /// Convert this future into a single element stream. @@ -275,7 +279,7 @@ pub trait FutureExt: Future { where Self: Sized, { - IntoStream::new(self) + assert_stream::<Self::Output, _>(IntoStream::new(self)) } /// Flatten the execution of this future when the output of this @@ -339,7 +343,7 @@ pub trait FutureExt: Future { Self::Output: Stream, Self: Sized, { - FlattenStream::new(self) + assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self)) } /// Fuse a future such that `poll` will never again be called once it has @@ -428,7 +432,9 @@ pub trait FutureExt: Future { where Self: Sized + ::std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new( + self, + )) } /// Create a cloneable handle to this future where all handles will resolve @@ -482,7 +488,7 @@ pub trait FutureExt: Future { Self: Sized, Self::Output: Clone, { - Shared::new(self) + assert_future::<Self::Output, _>(Shared::new(self)) } /// Turn this future into a future that yields `()` on completion and sends @@ -494,6 +500,7 @@ pub trait FutureExt: Future { /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. #[cfg(feature = "channel")] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) where @@ -511,7 +518,7 @@ pub trait FutureExt: Future { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_future::<Self::Output, _>(Box::pin(self)) } /// Wrap the future in a Box, pinning it. @@ -525,7 +532,7 @@ pub trait FutureExt: Future { where Self: Sized + 'a, { - Box::pin(self) + assert_future::<Self::Output, _>(Box::pin(self)) } /// Turns a [`Future<Output = T>`](Future) into a @@ -534,7 +541,7 @@ pub trait FutureExt: Future { where Self: Sized, { - UnitError::new(self) + assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self)) } /// Turns a [`Future<Output = T>`](Future) into a @@ -543,7 +550,7 @@ pub trait FutureExt: Future { where Self: Sized, { - NeverError::new(self) + assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self)) } /// A convenience for calling `Future::poll` on `Unpin` future types. @@ -585,19 +592,16 @@ pub trait FutureExt: Future { /// /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar"); /// ``` - fn now_or_never(mut self) -> Option<Self::Output> + fn now_or_never(self) -> Option<Self::Output> where Self: Sized, { let noop_waker = crate::task::noop_waker(); let mut cx = Context::from_waker(&noop_waker); - // SAFETY: This is safe because this method consumes the future, so `poll` is - // only going to be called once. Thus it doesn't matter to us if the - // future is `Unpin` or not. - let pinned = unsafe { Pin::new_unchecked(&mut self) }; - - match pinned.poll(&mut cx) { + let this = self; + pin_mut!(this); + match this.poll(&mut cx) { Poll::Ready(x) => Some(x), _ => None, } diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs index 9495bec..598f63c 100644 --- a/src/future/future/remote_handle.rs +++ b/src/future/future/remote_handle.rs @@ -16,7 +16,7 @@ use { }, thread, }, - pin_project::{pin_project, project}, + pin_project::pin_project, }; /// The handle to a remote future returned by @@ -37,6 +37,7 @@ use { /// will unwind. #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub struct RemoteHandle<T> { rx: Receiver<thread::Result<T>>, keep_running: Arc<AtomicBool>, @@ -72,6 +73,7 @@ type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'stati /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). #[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub struct Remote<Fut: Future> { tx: Option<Sender<SendMsg<Fut>>>, keep_running: Arc<AtomicBool>, @@ -90,23 +92,21 @@ impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { impl<Fut: Future> Future for Remote<Fut> { type Output = (); - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - #[project] - let Remote { tx, keep_running, future } = self.project(); + let this = self.project(); - if let Poll::Ready(_) = tx.as_mut().unwrap().poll_canceled(cx) { - if !keep_running.load(Ordering::SeqCst) { + if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) { + if !this.keep_running.load(Ordering::SeqCst) { // Cancelled, bail out return Poll::Ready(()) } } - let output = ready!(future.poll(cx)); + let output = ready!(this.future.poll(cx)); // if the receiving end has gone away then that's ok, we just ignore the // send error here. - drop(tx.take().unwrap().send(output)); + drop(this.tx.take().unwrap().send(output)); Poll::Ready(()) } } diff --git a/src/future/join_all.rs b/src/future/join_all.rs index df62f3a..0c8357c 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -56,8 +56,10 @@ where /// /// This is purposefully a very simple API for basic use-cases. In a lot of /// cases you will want to use the more powerful -/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some -/// examples of additional functionality that provides: +/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does +/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered]. +/// +/// Some examples for additional functionality provided by these are: /// /// * Adding new futures to the set even after it has been started. /// diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs index 71cb6fa..5120a9b 100644 --- a/src/future/maybe_done.rs +++ b/src/future/maybe_done.rs @@ -1,15 +1,14 @@ //! Definition of the MaybeDone combinator -use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// A future that may have completed. /// /// This is created by the [`maybe_done()`] function. -#[pin_project] +#[pin_project(project = MaybeDoneProj, project_replace = MaybeDoneProjOwn)] #[derive(Debug)] pub enum MaybeDone<Fut: Future> { /// A not-yet-completed future @@ -47,12 +46,10 @@ impl<Fut: Future> MaybeDone<Fut> { /// The output of this method will be [`Some`] if and only if the inner /// future has been completed and [`take_output`](MaybeDone::take_output) /// has not yet been called. - #[project] #[inline] pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { - #[project] match self.project() { - MaybeDone::Done(res) => Some(res), + MaybeDoneProj::Done(res) => Some(res), _ => None, } } @@ -61,22 +58,13 @@ impl<Fut: Future> MaybeDone<Fut> { /// towards completion. #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { - // Safety: we return immediately unless we are in the `Done` - // state, which does not have any pinning guarantees to uphold. - // - // Hopefully `pin_project` will support this safely soon: - // https://github.com/taiki-e/pin-project/issues/184 - unsafe { - let this = self.get_unchecked_mut(); - match this { - MaybeDone::Done(_) => {}, - MaybeDone::Future(_) | MaybeDone::Gone => return None, - }; - if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { - Some(output) - } else { - unreachable!() - } + match &*self { + MaybeDone::Done(_) => {} + MaybeDone::Future(_) | MaybeDone::Gone => return None, + } + match self.project_replace(MaybeDone::Gone) { + MaybeDoneProjOwn::Done(output) => Some(output), + _ => unreachable!(), } } } @@ -93,16 +81,14 @@ impl<Fut: Future> FusedFuture for MaybeDone<Fut> { impl<Fut: Future> Future for MaybeDone<Fut> { type Output = (); - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] match self.as_mut().project() { - MaybeDone::Future(f) => { + MaybeDoneProj::Future(f) => { let res = ready!(f.poll(cx)); self.set(MaybeDone::Done(res)); - }, - MaybeDone::Done(_) => {}, - MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + MaybeDoneProj::Done(_) => {} + MaybeDoneProj::Gone => panic!("MaybeDone polled after value taken"), } Poll::Ready(()) } diff --git a/src/future/mod.rs b/src/future/mod.rs index 7962894..3f19c19 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -23,6 +23,7 @@ pub use self::future::FlattenStream; pub use self::future::CatchUnwind; #[cfg(feature = "channel")] +#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] pub use self::future::{Remote, RemoteHandle}; @@ -36,6 +37,7 @@ pub use self::try_future::{ }; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::try_future::FlattenSink; // Primitive futures @@ -107,7 +109,7 @@ cfg_target_has_atomic! { // Just a helper function to ensure the futures we're returning all have the // right implementations. -fn assert_future<T, F>(future: F) -> F +pub(crate) fn assert_future<T, F>(future: F) -> F where F: Future<Output = T>, { diff --git a/src/future/select.rs b/src/future/select.rs index 3e0a447..bc24779 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -27,6 +27,34 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// /// # Examples /// +/// A simple example +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future::{self, Either}; +/// use futures::pin_mut; +/// +/// // These two futures have different types even though their outputs have the same type +/// let future1 = async { 1 }; +/// let future2 = async { 2 }; +/// +/// // 'select' requires Future + Unpin bounds +/// pin_mut!(future1); +/// pin_mut!(future2); +/// +/// let value = match future::select(future1, future2).await { +/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` +/// // `_` represents `future2` +/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2` +/// // `_` represents `future1` +/// }; +/// +/// assert!(value == 1 || value == 2); +/// # }); +/// ``` +/// +/// A more complex example +/// /// ``` /// use futures::future::{self, Either, Future, FutureExt}; /// diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs index bd1ab33..1ce01d2 100644 --- a/src/future/try_future/mod.rs +++ b/src/future/try_future/mod.rs @@ -14,13 +14,13 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use super::assert_future; -use crate::future::{Map, Inspect}; use crate::fns::{ - MapOkFn, map_ok_fn, MapErrFn, map_err_fn, MapOkOrElseFn, - map_ok_or_else_fn, IntoFn, UnwrapOrElseFn, unwrap_or_else_fn, InspectOkFn, inspect_ok_fn, InspectErrFn, - inspect_err_fn, into_fn + inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn, + unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn, + UnwrapOrElseFn, }; +use crate::future::{assert_future, Inspect, Map}; +use crate::stream::assert_stream; // Combinators mod into_future; @@ -52,6 +52,7 @@ delegate_all!( #[cfg(feature = "sink")] delegate_all!( /// Sink for the [`flatten_sink`](TryFutureExt::flatten_sink) method. + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] FlattenSink<Fut, Si>( try_flatten::TryFlatten<Fut, Si> ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)] @@ -166,6 +167,7 @@ pub trait TryFutureExt: TryFuture { /// take_sink(fut.flatten_sink()) /// ``` #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok> where Self::Ok: Sink<Item, Error = Self::Error>, @@ -228,7 +230,7 @@ pub trait TryFutureExt: TryFuture { /// The provided closure `f` will only be called if this future is resolved /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then /// the provided closure will never be invoked. - /// + /// /// The provided closure `e` will only be called if this future is resolved /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then /// the provided closure will never be invoked. @@ -245,13 +247,13 @@ pub trait TryFutureExt: TryFuture { /// let future = async { Ok::<i32, i32>(5) }; /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); /// assert_eq!(future.await, 8); - /// + /// /// let future = async { Err::<i32, i32>(5) }; /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3); /// assert_eq!(future.await, 10); /// # }); /// ``` - /// + /// fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E> where F: FnOnce(Self::Ok) -> T, @@ -532,7 +534,9 @@ pub trait TryFutureExt: TryFuture { Self::Ok: TryStream<Error = Self::Error>, Self: Sized, { - TryFlattenStream::new(self) + assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new( + self, + )) } /// Unwraps this future's ouput, producing a future with this future's @@ -568,6 +572,7 @@ pub trait TryFutureExt: TryFuture { /// Wraps a [`TryFuture`] into a future compatable with libraries using /// futures 0.1 future definitons. Requires the `compat` feature to enable. #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> Compat<Self> where Self: Sized + Unpin, @@ -600,7 +605,7 @@ pub trait TryFutureExt: TryFuture { where Self: Sized, { - IntoFuture::new(self) + assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self)) } /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs index 661d3ad..2bcadc5 100644 --- a/src/future/try_future/try_flatten.rs +++ b/src/future/try_future/try_flatten.rs @@ -4,9 +4,9 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; #[cfg(feature = "sink")] use futures_sink::Sink; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; -#[pin_project] +#[pin_project(project = TryFlattenProj)] #[derive(Debug)] pub enum TryFlatten<Fut1, Fut2> { First(#[pin] Fut1), @@ -38,12 +38,10 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok> { type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - TryFlatten::First(f) => { + TryFlattenProj::First(f) => { match ready!(f.try_poll(cx)) { Ok(f) => self.set(TryFlatten::Second(f)), Err(e) => { @@ -52,12 +50,12 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok> } } }, - TryFlatten::Second(f) => { + TryFlattenProj::Second(f) => { let output = ready!(f.try_poll(cx)); self.set(TryFlatten::Empty); break output; }, - TryFlatten::Empty => panic!("TryFlatten polled after completion"), + TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), } }) } @@ -81,12 +79,10 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> { type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; - #[project] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - TryFlatten::First(f) => { + TryFlattenProj::First(f) => { match ready!(f.try_poll(cx)) { Ok(f) => self.set(TryFlatten::Second(f)), Err(e) => { @@ -95,14 +91,14 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> } } }, - TryFlatten::Second(f) => { + TryFlattenProj::Second(f) => { let output = ready!(f.try_poll_next(cx)); if output.is_none() { self.set(TryFlatten::Empty); } break output; }, - TryFlatten::Empty => break None, + TryFlattenProj::Empty => break None, } }) } @@ -117,15 +113,13 @@ where { type Error = Fut::Error; - #[project] fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - TryFlatten::First(f) => { + TryFlattenProj::First(f) => { match ready!(f.try_poll(cx)) { Ok(f) => self.set(TryFlatten::Second(f)), Err(e) => { @@ -134,42 +128,36 @@ where } } }, - TryFlatten::Second(f) => { + TryFlattenProj::Second(f) => { break ready!(f.poll_ready(cx)); }, - TryFlatten::Empty => panic!("poll_ready called after eof"), + TryFlattenProj::Empty => panic!("poll_ready called after eof"), } }) } - #[project] fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - #[project] match self.project() { - TryFlatten::First(_) => panic!("poll_ready not called first"), - TryFlatten::Second(f) => f.start_send(item), - TryFlatten::Empty => panic!("start_send called after eof"), + TryFlattenProj::First(_) => panic!("poll_ready not called first"), + TryFlattenProj::Second(f) => f.start_send(item), + TryFlattenProj::Empty => panic!("start_send called after eof"), } } - #[project] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - #[project] match self.project() { - TryFlatten::First(_) => Poll::Ready(Ok(())), - TryFlatten::Second(f) => f.poll_flush(cx), - TryFlatten::Empty => panic!("poll_flush called after eof"), + TryFlattenProj::First(_) => Poll::Ready(Ok(())), + TryFlattenProj::Second(f) => f.poll_flush(cx), + TryFlattenProj::Empty => panic!("poll_flush called after eof"), } } - #[project] fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - #[project] let res = match self.as_mut().project() { - TryFlatten::Second(f) => f.poll_close(cx), + TryFlattenProj::Second(f) => f.poll_close(cx), _ => Poll::Ready(Ok(())), }; if res.is_ready() { diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs index fbb413d..480f8c3 100644 --- a/src/future/try_future/try_flatten_err.rs +++ b/src/future/try_future/try_flatten_err.rs @@ -1,9 +1,9 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; -#[pin_project] +#[pin_project(project = TryFlattenErrProj)] #[derive(Debug)] pub enum TryFlattenErr<Fut1, Fut2> { First(#[pin] Fut1), @@ -35,12 +35,10 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> { type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { - #[project] match self.as_mut().project() { - TryFlattenErr::First(f) => { + TryFlattenErrProj::First(f) => { match ready!(f.try_poll(cx)) { Err(f) => self.set(TryFlattenErr::Second(f)), Ok(e) => { @@ -49,12 +47,12 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> } } }, - TryFlattenErr::Second(f) => { + TryFlattenErrProj::Second(f) => { let output = ready!(f.try_poll(cx)); self.set(TryFlattenErr::Empty); break output; }, - TryFlattenErr::Empty => panic!("TryFlattenErr polled after completion"), + TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"), } }) } diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs index a249c5c..b38b038 100644 --- a/src/future/try_maybe_done.rs +++ b/src/future/try_maybe_done.rs @@ -1,15 +1,14 @@ //! Definition of the TryMaybeDone combinator -use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// A future that may have completed with an error. /// /// This is created by the [`try_maybe_done()`] function. -#[pin_project] +#[pin_project(project = TryMaybeDoneProj, project_replace = TryMaybeDoneProjOwn)] #[derive(Debug)] pub enum TryMaybeDone<Fut: TryFuture> { /// A not-yet-completed future @@ -32,12 +31,10 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { /// The output of this method will be [`Some`] if and only if the inner /// future has completed successfully and [`take_output`](TryMaybeDone::take_output) /// has not yet been called. - #[project] #[inline] pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> { - #[project] match self.project() { - TryMaybeDone::Done(res) => Some(res), + TryMaybeDoneProj::Done(res) => Some(res), _ => None, } } @@ -46,22 +43,13 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { /// towards completion. #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> { - // Safety: we return immediately unless we are in the `Done` - // state, which does not have any pinning guarantees to uphold. - // - // Hopefully `pin_project` will support this safely soon: - // https://github.com/taiki-e/pin-project/issues/184 - unsafe { - let this = self.get_unchecked_mut(); - match this { - TryMaybeDone::Done(_) => {}, - TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None, - }; - if let TryMaybeDone::Done(output) = mem::replace(this, TryMaybeDone::Gone) { - Some(output) - } else { - unreachable!() - } + match &*self { + TryMaybeDone::Done(_) => {}, + TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None, + } + match self.project_replace(TryMaybeDone::Gone) { + TryMaybeDoneProjOwn::Done(output) => Some(output), + _ => unreachable!() } } } @@ -78,11 +66,9 @@ impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> { impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { type Output = Result<(), Fut::Error>; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] match self.as_mut().project() { - TryMaybeDone::Future(f) => { + TryMaybeDoneProj::Future(f) => { match ready!(f.try_poll(cx)) { Ok(res) => self.set(TryMaybeDone::Done(res)), Err(e) => { @@ -91,8 +77,8 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { } } }, - TryMaybeDone::Done(_) => {}, - TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"), + TryMaybeDoneProj::Done(_) => {}, + TryMaybeDoneProj::Gone => panic!("TryMaybeDone polled after value taken"), } Poll::Ready(Ok(())) } diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 94b3d2a..2755667 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use std::io::{self, Read}; use std::pin::Pin; use std::{cmp, fmt}; @@ -68,13 +68,11 @@ impl<R: AsyncRead> BufReader<R> { } /// Invalidates all data in the internal buffer. - #[project] #[inline] fn discard_buffer(self: Pin<&mut Self>) { - #[project] - let BufReader { pos, cap, .. } = self.project(); - *pos = 0; - *cap = 0; + let this = self.project(); + *this.pos = 0; + *this.cap = 0; } } @@ -123,24 +121,22 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> { } impl<R: AsyncRead> AsyncBufRead for BufReader<R> { - #[project] fn poll_fill_buf( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<&[u8]>> { - #[project] - let BufReader { inner, buffer, cap, pos } = self.project(); + let this = self.project(); // If we've reached the end of our internal buffer then we need to fetch // some more data from the underlying reader. // Branch using `>=` instead of the more correct `==` // to tell the compiler that the pos..cap slice is always valid. - if *pos >= *cap { - debug_assert!(*pos == *cap); - *cap = ready!(inner.poll_read(cx, buffer))?; - *pos = 0; + if *this.pos >= *this.cap { + debug_assert!(*this.pos == *this.cap); + *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?; + *this.pos = 0; } - Poll::Ready(Ok(&buffer[*pos..*cap])) + Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap])) } fn consume(self: Pin<&mut Self>, amt: usize) { diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 66df81f..ed9196d 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,6 +1,6 @@ use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; @@ -51,15 +51,13 @@ impl<W: AsyncWrite> BufWriter<W> { } } - #[project] fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - #[project] - let BufWriter { mut inner, buf, written } = self.project(); + let mut this = self.project(); - let len = buf.len(); + let len = this.buf.len(); let mut ret = Ok(()); - while *written < len { - match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { + while *this.written < len { + match ready!(this.inner.as_mut().poll_write(cx, &this.buf[*this.written..])) { Ok(0) => { ret = Err(io::Error::new( io::ErrorKind::WriteZero, @@ -67,17 +65,17 @@ impl<W: AsyncWrite> BufWriter<W> { )); break; } - Ok(n) => *written += n, + Ok(n) => *this.written += n, Err(e) => { ret = Err(e); break; } } } - if *written > 0 { - buf.drain(..*written); + if *this.written > 0 { + this.buf.drain(..*this.written); } - *written = 0; + *this.written = 0; Poll::Ready(ret) } diff --git a/src/io/chain.rs b/src/io/chain.rs index 4e85854..336307f 100644 --- a/src/io/chain.rs +++ b/src/io/chain.rs @@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use std::fmt; use std::io; use std::pin::Pin; @@ -51,10 +51,8 @@ where /// underlying readers as doing so may corrupt the internal state of this /// `Chain`. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) { - unsafe { - let Self { first, second, .. } = self.get_unchecked_mut(); - (Pin::new_unchecked(first), Pin::new_unchecked(second)) - } + let this = self.project(); + (this.first, this.second) } /// Consumes the `Chain`, returning the wrapped readers. @@ -82,42 +80,38 @@ where T: AsyncRead, U: AsyncRead, { - #[project] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - #[project] - let Chain { first, second, done_first } = self.project(); + let this = self.project(); - if !*done_first { - match ready!(first.poll_read(cx, buf)?) { - 0 if !buf.is_empty() => *done_first = true, + if !*this.done_first { + match ready!(this.first.poll_read(cx, buf)?) { + 0 if !buf.is_empty() => *this.done_first = true, n => return Poll::Ready(Ok(n)), } } - second.poll_read(cx, buf) + this.second.poll_read(cx, buf) } - #[project] fn poll_read_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<io::Result<usize>> { - #[project] - let Chain { first, second, done_first } = self.project(); + let this = self.project(); - if !*done_first { - let n = ready!(first.poll_read_vectored(cx, bufs)?); + if !*this.done_first { + let n = ready!(this.first.poll_read_vectored(cx, bufs)?); if n == 0 && bufs.iter().any(|b| !b.is_empty()) { - *done_first = true + *this.done_first = true } else { return Poll::Ready(Ok(n)); } } - second.poll_read_vectored(cx, bufs) + this.second.poll_read_vectored(cx, bufs) } #[cfg(feature = "read-initializer")] @@ -136,31 +130,27 @@ where T: AsyncBufRead, U: AsyncBufRead, { - #[project] fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - #[project] - let Chain { first, second, done_first } = self.project(); + let this = self.project(); - if !*done_first { - match ready!(first.poll_fill_buf(cx)?) { + if !*this.done_first { + match ready!(this.first.poll_fill_buf(cx)?) { buf if buf.is_empty() => { - *done_first = true; + *this.done_first = true; } buf => return Poll::Ready(Ok(buf)), } } - second.poll_fill_buf(cx) + this.second.poll_fill_buf(cx) } - #[project] fn consume(self: Pin<&mut Self>, amt: usize) { - #[project] - let Chain { first, second, done_first } = self.project(); + let this = self.project(); - if !*done_first { - first.consume(amt) + if !*this.done_first { + this.first.consume(amt) } else { - second.consume(amt) + this.second.consume(amt) } } } diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs index f8bb49e..f47144a 100644 --- a/src/io/copy_buf.rs +++ b/src/io/copy_buf.rs @@ -3,7 +3,7 @@ use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncWrite}; use std::io; use std::pin::Pin; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Creates a future which copies all the bytes from one object to another. /// @@ -59,23 +59,21 @@ impl<R, W> Future for CopyBuf<'_, R, W> { type Output = io::Result<u64>; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] - let CopyBuf { mut reader, mut writer, amt } = self.project(); + let mut this = self.project(); loop { - let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?; + let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?; if buffer.is_empty() { - ready!(Pin::new(&mut writer).poll_flush(cx))?; - return Poll::Ready(Ok(*amt)); + ready!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(*this.amt)); } - let i = ready!(Pin::new(&mut writer).poll_write(cx, buffer))?; + let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) } - *amt += i as u64; - reader.as_mut().consume(i); + *this.amt += i as u64; + this.reader.as_mut().consume(i); } } } diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs new file mode 100644 index 0000000..015547e --- /dev/null +++ b/src/io/fill_buf.rs @@ -0,0 +1,50 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::pin::Pin; + +/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct FillBuf<'a, R: ?Sized> { + reader: Option<&'a mut R>, +} + +impl<R: ?Sized> Unpin for FillBuf<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> { + pub(super) fn new(reader: &'a mut R) -> Self { + FillBuf { reader: Some(reader) } + } +} + +impl<'a, R> Future for FillBuf<'a, R> + where R: AsyncBufRead + ?Sized + Unpin, +{ + type Output = io::Result<&'a [u8]>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + let reader = this.reader.take().expect("Polled FillBuf after completion"); + + match Pin::new(&mut *reader).poll_fill_buf(cx) { + // With polinius it is possible to remove this inner match and just have the correct + // lifetime of the reference inferred based on which branch is taken + Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { + Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), + Poll::Ready(Err(err)) => { + unreachable!("reader indicated readiness but then returned an error: {:?}", err) + } + Poll::Pending => { + unreachable!("reader indicated readiness but then returned pending") + } + }, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => { + this.reader = Some(reader); + Poll::Pending + } + } + } +} diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs index 0589f3a..082c581 100644 --- a/src/io/into_sink.rs +++ b/src/io/into_sink.rs @@ -3,7 +3,7 @@ use futures_io::AsyncWrite; use futures_sink::Sink; use std::io; use std::pin::Pin; -use pin_project::{pin_project, project}; +use pin_project::pin_project; #[derive(Debug)] struct Block<Item> { @@ -15,6 +15,7 @@ struct Block<Item> { #[pin_project] #[must_use = "sinks do nothing unless polled"] #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct IntoSink<W, Item> { #[pin] writer: W, @@ -30,26 +31,24 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_ /// flush the writer after it succeeds in pushing the block into it. - #[project] fn poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), io::Error>> { - #[project] - let IntoSink { mut writer, buffer } = self.project(); + let mut this = self.project(); - if let Some(buffer) = buffer { + if let Some(buffer) = this.buffer { loop { let bytes = buffer.bytes.as_ref(); - let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?; + let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?; buffer.offset += written; if buffer.offset == bytes.len() { break; } } } - *buffer = None; + *this.buffer = None; Poll::Ready(Ok(())) } diff --git a/src/io/lines.rs b/src/io/lines.rs index af0b491..90b993d 100644 --- a/src/io/lines.rs +++ b/src/io/lines.rs @@ -5,7 +5,7 @@ use std::io; use std::mem; use std::pin::Pin; use super::read_line::read_line_internal; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. @@ -34,20 +34,18 @@ impl<R: AsyncBufRead> Lines<R> { impl<R: AsyncBufRead> Stream for Lines<R> { type Item = io::Result<String>; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - #[project] - let Lines { reader, buf, bytes, read } = self.project(); - let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?; - if n == 0 && buf.is_empty() { + let this = self.project(); + let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?; + if n == 0 && this.buf.is_empty() { return Poll::Ready(None) } - if buf.ends_with('\n') { - buf.pop(); - if buf.ends_with('\r') { - buf.pop(); + if this.buf.ends_with('\n') { + this.buf.pop(); + if this.buf.ends_with('\r') { + this.buf.pop(); } } - Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) + Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index 29b6418..51ee995 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,14 +10,16 @@ //! library is activated, and it is activated by default. #[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] use crate::compat::Compat; -use std::ptr; +use std::{ptr, pin::Pin}; pub use futures_io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom, }; #[cfg(feature = "read-initializer")] +#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] pub use futures_io::Initializer; // used by `BufReader` and `BufWriter` @@ -65,12 +67,17 @@ pub use self::cursor::Cursor; mod empty; pub use self::empty::{empty, Empty}; +mod fill_buf; +pub use self::fill_buf::FillBuf; + mod flush; pub use self::flush::Flush; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] mod into_sink; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::into_sink::IntoSink; mod lines; @@ -124,9 +131,9 @@ pub use self::write_vectored::WriteVectored; mod write_all; pub use self::write_all::WriteAll; -#[cfg(feature = "write_all_vectored")] +#[cfg(feature = "write-all-vectored")] mod write_all_vectored; -#[cfg(feature = "write_all_vectored")] +#[cfg(feature = "write-all-vectored")] pub use self::write_all_vectored::WriteAllVectored; /// An extension trait which adds utility methods to `AsyncRead` types. @@ -372,6 +379,7 @@ pub trait AsyncReadExt: AsyncRead { /// /// Requires the `io-compat` feature to enable. #[cfg(feature = "io-compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] fn compat(self) -> Compat<Self> where Self: Sized + Unpin, { @@ -493,9 +501,10 @@ pub trait AsyncWriteExt: AsyncWrite { /// ``` /// # futures::executor::block_on(async { /// use futures::io::AsyncWriteExt; - /// use std::io::{Cursor, IoSlice}; + /// use futures_util::io::Cursor; + /// use std::io::IoSlice; /// - /// let mut writer = Cursor::new([0u8; 7]); + /// let mut writer = Cursor::new(Vec::new()); /// let bufs = &mut [ /// IoSlice::new(&[1]), /// IoSlice::new(&[2, 3]), @@ -503,12 +512,12 @@ pub trait AsyncWriteExt: AsyncWrite { /// ]; /// /// writer.write_all_vectored(bufs).await?; - /// // Note: the contents of `bufs` is now undefined, see the Notes section. + /// // Note: the contents of `bufs` is now unspecified, see the Notes section. /// - /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]); + /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - #[cfg(feature = "write_all_vectored")] + #[cfg(feature = "write-all-vectored")] fn write_all_vectored<'a>( &'a mut self, bufs: &'a mut [IoSlice<'a>], @@ -523,6 +532,7 @@ pub trait AsyncWriteExt: AsyncWrite { /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`. /// Requires the `io-compat` feature to enable. #[cfg(feature = "io-compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] fn compat_write(self) -> Compat<Self> where Self: Sized + Unpin, { @@ -556,6 +566,7 @@ pub trait AsyncWriteExt: AsyncWrite { /// # Ok::<(), Box<dyn std::error::Error>>(()) /// ``` #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized, { @@ -583,6 +594,58 @@ impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which will wait for a non-empty buffer to be available from this I/O + /// object or EOF to be reached. + /// + /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf). + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}}; + /// + /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read(); + /// + /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]); + /// stream.consume_unpin(2); + /// + /// assert_eq!(stream.fill_buf().await?, vec![3]); + /// stream.consume_unpin(1); + /// + /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]); + /// stream.consume_unpin(3); + /// + /// assert_eq!(stream.fill_buf().await?, vec![]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn fill_buf(&mut self) -> FillBuf<'_, Self> + where Self: Unpin, + { + FillBuf::new(self) + } + + /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types. + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}}; + /// + /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read(); + /// + /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]); + /// stream.consume_unpin(2); + /// + /// assert_eq!(stream.fill_buf().await?, vec![3]); + /// stream.consume_unpin(1); + /// + /// assert_eq!(stream.fill_buf().await?, vec![]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn consume_unpin(&mut self, amt: usize) + where Self: Unpin, + { + Pin::new(self).consume(amt) + } + /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). diff --git a/src/io/take.rs b/src/io/take.rs index 39088b7..6179486 100644 --- a/src/io/take.rs +++ b/src/io/take.rs @@ -2,7 +2,7 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncRead, AsyncBufRead}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use std::{cmp, io}; use std::pin::Pin; @@ -83,22 +83,20 @@ impl<R: AsyncRead> Take<R> { } impl<R: AsyncRead> AsyncRead for Take<R> { - #[project] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>> { - #[project] - let Take { inner, limit_ } = self.project(); + let this = self.project(); - if *limit_ == 0 { + if *this.limit_ == 0 { return Poll::Ready(Ok(0)); } - let max = std::cmp::min(buf.len() as u64, *limit_) as usize; - let n = ready!(inner.poll_read(cx, &mut buf[..max]))?; - *limit_ -= n as u64; + let max = cmp::min(buf.len() as u64, *this.limit_) as usize; + let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?; + *this.limit_ -= n as u64; Poll::Ready(Ok(n)) } @@ -109,29 +107,25 @@ impl<R: AsyncRead> AsyncRead for Take<R> { } impl<R: AsyncBufRead> AsyncBufRead for Take<R> { - #[project] fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { - #[project] - let Take { inner, limit_ } = self.project(); + let this = self.project(); // Don't call into inner reader at all at EOF because it may still block - if *limit_ == 0 { + if *this.limit_ == 0 { return Poll::Ready(Ok(&[])); } - let buf = ready!(inner.poll_fill_buf(cx)?); - let cap = cmp::min(buf.len() as u64, *limit_) as usize; + let buf = ready!(this.inner.poll_fill_buf(cx)?); + let cap = cmp::min(buf.len() as u64, *this.limit_) as usize; Poll::Ready(Ok(&buf[..cap])) } - #[project] fn consume(self: Pin<&mut Self>, amt: usize) { - #[project] - let Take { inner, limit_ } = self.project(); + let this = self.project(); // Don't let callers reset the limit by passing an overlarge value - let amt = cmp::min(amt as u64, *limit_) as usize; - *limit_ -= amt as u64; - inner.consume(amt); + let amt = cmp::min(amt as u64, *this.limit_) as usize; + *this.limit_ -= amt as u64; + this.inner.consume(amt); } } diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs index fbe7e73..ec28798 100644 --- a/src/io/write_all_vectored.rs +++ b/src/io/write_all_vectored.rs @@ -19,7 +19,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { - WriteAllVectored { writer, bufs } + WriteAllVectored { writer, bufs: IoSlice::advance(bufs, 0) } } } @@ -171,6 +171,7 @@ mod tests { #[rustfmt::skip] // Becomes unreadable otherwise. let tests: Vec<(_, &'static [u8])> = vec![ (vec![], &[]), + (vec![IoSlice::new(&[]), IoSlice::new(&[])], &[]), (vec![IoSlice::new(&[1])], &[1]), (vec![IoSlice::new(&[1, 2])], &[1, 2]), (vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]), @@ -7,16 +7,20 @@ #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] +// It cannot be included in the published code because this lints have false positives in the minimum required version. +#![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] -// The solution for this lint is not available on 1.39 which is the current minimum supported version. -// Can be removed as of minimum supported 1.40 or if https://github.com/rust-lang/rust-clippy/issues/3941 +// mem::take requires Rust 1.40, matches! requires Rust 1.42 +// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941 // get's implemented. -#![allow(clippy::mem_replace_with_default)] +#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#![doc(html_root_url = "https://docs.rs/futures-util/0.3.5")] +#![doc(html_root_url = "https://docs.rs/futures-util/0.3.7")] + +#![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); @@ -37,25 +41,27 @@ extern crate futures_core; pub use futures_core::ready; pub use pin_utils::pin_mut; -// Not public API. #[cfg(feature = "async-await")] #[macro_use] -#[doc(hidden)] -pub mod async_await; +mod async_await; #[cfg(feature = "async-await")] #[doc(hidden)] pub use self::async_await::*; // Not public API. -#[doc(hidden)] -pub use futures_core::core_reexport; - -// Not public API. #[cfg(feature = "async-await")] #[doc(hidden)] -pub mod __reexport { - #[doc(hidden)] +pub mod __private { pub use crate::*; + pub use core::{ + option::Option::{self, Some, None}, + pin::Pin, + result::Result::{Err, Ok}, + }; + + pub mod async_await { + pub use crate::async_await::*; + } } macro_rules! cfg_target_has_atomic { @@ -70,8 +76,8 @@ macro_rules! delegate_sink { ($field:ident, $item:ty) => { fn poll_ready( self: core::pin::Pin<&mut Self>, - cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_ready(cx) } @@ -84,15 +90,15 @@ macro_rules! delegate_sink { fn poll_flush( self: core::pin::Pin<&mut Self>, - cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_flush(cx) } fn poll_close( self: core::pin::Pin<&mut Self>, - cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> { + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_close(cx) } } @@ -102,8 +108,8 @@ macro_rules! delegate_future { ($field:ident) => { fn poll( self: core::pin::Pin<&mut Self>, - cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll<Self::Output> { + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<Self::Output> { self.project().$field.poll(cx) } } @@ -113,8 +119,8 @@ macro_rules! delegate_stream { ($field:ident) => { fn poll_next( self: core::pin::Pin<&mut Self>, - cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll<Option<Self::Item>> { + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<Option<Self::Item>> { self.project().$field.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { @@ -183,7 +189,7 @@ macro_rules! delegate_async_buf_read { ) -> core::task::Poll<std::io::Result<&[u8]>> { self.project().$field.poll_fill_buf(cx) } - + fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { self.project().$field.consume(amt) } @@ -308,6 +314,7 @@ pub mod stream; #[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt}; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub mod sink; #[cfg(feature = "sink")] #[doc(hidden)] pub use crate::sink::SinkExt; @@ -317,9 +324,11 @@ pub mod task; pub mod never; #[cfg(feature = "compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "compat")))] pub mod compat; #[cfg(feature = "io")] +#[cfg_attr(docsrs, doc(cfg(feature = "io")))] #[cfg(feature = "std")] pub mod io; #[cfg(feature = "io")] diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index cccd0be..3698406 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs @@ -34,6 +34,7 @@ use alloc::sync::Arc; /// This type is only available when the `bilock` feature of this /// library is activated. #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLock<T> { arc: Arc<Inner<T>>, } @@ -142,6 +143,7 @@ impl<T> BiLock<T> { /// /// Note that the returned future will never resolve to an error. #[cfg(feature = "bilock")] + #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub fn lock(&self) -> BiLockAcquire<'_, T> { BiLockAcquire { bilock: self, @@ -198,6 +200,7 @@ impl<T> Drop for Inner<T> { /// Error indicating two `BiLock<T>`s were not two halves of a whole, and /// thus could not be `reunite`d. +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); impl<T> fmt::Debug for ReuniteError<T> { @@ -223,6 +226,7 @@ impl<T: core::any::Any> std::error::Error for ReuniteError<T> {} /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be /// unlocked. #[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLockGuard<'a, T> { bilock: &'a BiLock<T>, } @@ -258,6 +262,7 @@ impl<T> Drop for BiLockGuard<'_, T> { /// Future returned by `BiLock::lock` which will resolve when the lock is /// acquired. #[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct BiLockAcquire<'a, T> { diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 3db5e5b..b252613 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -9,9 +9,11 @@ mod mutex; pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(any(feature = "sink", feature = "io"))] #[cfg(not(feature = "bilock"))] diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs index 96549ea..84aeeda 100644 --- a/src/lock/mutex.rs +++ b/src/lock/mutex.rs @@ -3,15 +3,16 @@ use futures_core::task::{Context, Poll, Waker}; use slab::Slab; use std::{fmt, mem}; use std::cell::UnsafeCell; +use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicUsize, Ordering}; /// A futures-aware mutex. -/// +/// /// # Fairness -/// +/// /// This mutex provides no fairness guarantees. Tasks may not acquire the mutex /// in the order that they requested the lock, and it's possible for a single task /// which repeatedly takes the lock to starve other tasks, which may be left waiting @@ -288,7 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { // Don't run the `drop` method for MutexGuard. The ownership of the underlying // locked state is being moved to the returned MappedMutexGuard. mem::forget(this); - MappedMutexGuard { mutex, value } + MappedMutexGuard { mutex, value, _marker: PhantomData } } } @@ -325,6 +326,7 @@ impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> { mutex: &'a Mutex<T>, value: *mut U, + _marker: PhantomData<&'a mut U>, } impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> { @@ -354,7 +356,7 @@ impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> { // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying // locked state is being moved to the returned MappedMutexGuard. mem::forget(this); - MappedMutexGuard { mutex, value } + MappedMutexGuard { mutex, value, _marker: PhantomData } } } @@ -401,8 +403,8 @@ unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {} // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} -unsafe impl<T: ?Sized + Send, U: ?Sized> Send for MappedMutexGuard<'_, T, U> {} -unsafe impl<T: ?Sized + Sync, U: ?Sized> Sync for MappedMutexGuard<'_, T, U> {} +unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {} +unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {} #[test] fn test_mutex_guard_debug_not_recurse() { diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs index c3df3b9..8176abd 100644 --- a/src/sink/buffer.rs +++ b/src/sink/buffer.rs @@ -1,7 +1,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::pin::Pin; use alloc::collections::VecDeque; @@ -29,18 +29,16 @@ impl<Si: Sink<Item>, Item> Buffer<Si, Item> { delegate_access_inner!(sink, Si, ()); - #[project] fn try_empty_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Si::Error>> { - #[project] - let Buffer { mut sink, buf, .. } = self.project(); - ready!(sink.as_mut().poll_ready(cx))?; - while let Some(item) = buf.pop_front() { - sink.as_mut().start_send(item)?; - if !buf.is_empty() { - ready!(sink.as_mut().poll_ready(cx))?; + let mut this = self.project(); + ready!(this.sink.as_mut().poll_ready(cx))?; + while let Some(item) = this.buf.pop_front() { + this.sink.as_mut().start_send(item)?; + if !this.buf.is_empty() { + ready!(this.sink.as_mut().poll_ready(cx))?; } } Poll::Ready(Ok(())) diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs index 530e1cc..b23ada7 100644 --- a/src/sink/err_into.rs +++ b/src/sink/err_into.rs @@ -1,7 +1,7 @@ use crate::sink::{SinkExt, SinkMapErr}; use futures_core::stream::{Stream, FusedStream}; use futures_sink::{Sink}; -use pin_project::{pin_project}; +use pin_project::pin_project; /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. #[pin_project] diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs index 7066e21..d71d793 100644 --- a/src/sink/fanout.rs +++ b/src/sink/fanout.rs @@ -2,7 +2,7 @@ use core::fmt::{Debug, Formatter, Result as FmtResult}; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Sink that clones incoming items and forwards them to two sinks at the same time. /// @@ -33,11 +33,9 @@ impl<Si1, Si2> Fanout<Si1, Si2> { } /// Get a pinned mutable reference to the inner sinks. - #[project] pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { - #[project] - let Fanout { sink1, sink2 } = self.project(); - (sink1, sink2) + let this = self.project(); + (this.sink1, this.sink2) } /// Consumes this combinator, returning the underlying sinks. @@ -65,57 +63,49 @@ impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> { type Error = Si1::Error; - #[project] fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - #[project] - let Fanout { sink1, sink2 } = self.project(); + let this = self.project(); - let sink1_ready = sink1.poll_ready(cx)?.is_ready(); - let sink2_ready = sink2.poll_ready(cx)?.is_ready(); + let sink1_ready = this.sink1.poll_ready(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_ready(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } - #[project] fn start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error> { - #[project] - let Fanout { sink1, sink2 } = self.project(); + let this = self.project(); - sink1.start_send(item.clone())?; - sink2.start_send(item)?; + this.sink1.start_send(item.clone())?; + this.sink2.start_send(item)?; Ok(()) } - #[project] fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - #[project] - let Fanout { sink1, sink2 } = self.project(); + let this = self.project(); - let sink1_ready = sink1.poll_flush(cx)?.is_ready(); - let sink2_ready = sink2.poll_flush(cx)?.is_ready(); + let sink1_ready = this.sink1.poll_flush(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_flush(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } - #[project] fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - #[project] - let Fanout { sink1, sink2 } = self.project(); + let this = self.project(); - let sink1_ready = sink1.poll_close(cx)?.is_ready(); - let sink2_ready = sink2.poll_close(cx)?.is_ready(); + let sink1_ready = this.sink1.poll_close(cx)?.is_ready(); + let sink2_ready = this.sink2.poll_close(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index ebdb999..b0e2c83 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -258,6 +258,7 @@ pub trait SinkExt<Item>: Sink<Item> { /// Wraps a [`Sink`] into a sink compatible with libraries using /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled. #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> CompatSink<Self, Item> where Self: Sized + Unpin, { diff --git a/src/sink/with.rs b/src/sink/with.rs index 802123f..6329a0c 100644 --- a/src/sink/with.rs +++ b/src/sink/with.rs @@ -5,7 +5,7 @@ use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Sink for the [`with`](super::SinkExt::with) method. #[pin_project] @@ -71,20 +71,18 @@ impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> delegate_access_inner!(sink, Si, ()); /// Completes the processing of previous item if any. - #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), E>> { - #[project] - let With { mut state, sink, .. } = self.project(); + let mut this = self.project(); - let item = match state.as_mut().as_pin_mut() { + let item = match this.state.as_mut().as_pin_mut() { None => return Poll::Ready(Ok(())), Some(fut) => ready!(fut.poll(cx))?, }; - state.set(None); - sink.start_send(item)?; + this.state.set(None); + this.sink.start_send(item)?; Poll::Ready(Ok(())) } } @@ -106,16 +104,14 @@ impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> Poll::Ready(Ok(())) } - #[project] fn start_send( self: Pin<&mut Self>, item: U, ) -> Result<(), Self::Error> { - #[project] - let With { mut state, f, .. } = self.project(); + let mut this = self.project(); - assert!(state.is_none()); - state.set(Some(f(item))); + assert!(this.state.is_none()); + this.state.set(Some((this.f)(item))); Ok(()) } diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs index f260a0b..cf213e6 100644 --- a/src/sink/with_flat_map.rs +++ b/src/sink/with_flat_map.rs @@ -4,7 +4,7 @@ use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. #[pin_project] @@ -52,31 +52,29 @@ where delegate_access_inner!(sink, Si, ()); - #[project] fn try_empty_stream( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Si::Error>> { - #[project] - let WithFlatMap { mut sink, mut stream, buffer, .. } = self.project(); + let mut this = self.project(); - if buffer.is_some() { - ready!(sink.as_mut().poll_ready(cx))?; - let item = buffer.take().unwrap(); - sink.as_mut().start_send(item)?; + if this.buffer.is_some() { + ready!(this.sink.as_mut().poll_ready(cx))?; + let item = this.buffer.take().unwrap(); + this.sink.as_mut().start_send(item)?; } - if let Some(mut some_stream) = stream.as_mut().as_pin_mut() { + if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() { while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) { - match sink.as_mut().poll_ready(cx)? { - Poll::Ready(()) => sink.as_mut().start_send(item)?, + match this.sink.as_mut().poll_ready(cx)? { + Poll::Ready(()) => this.sink.as_mut().start_send(item)?, Poll::Pending => { - *buffer = Some(item); + *this.buffer = Some(item); return Poll::Pending; } }; } } - stream.set(None); + this.stream.set(None); Poll::Ready(Ok(())) } } @@ -119,16 +117,14 @@ where self.try_empty_stream(cx) } - #[project] fn start_send( self: Pin<&mut Self>, item: U, ) -> Result<(), Self::Error> { - #[project] - let WithFlatMap { mut stream, f, .. } = self.project(); + let mut this = self.project(); - assert!(stream.is_none()); - stream.set(Some(f(item))); + assert!(this.stream.is_none()); + this.stream.set(Some((this.f)(item))); Ok(()) } diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index 6dc07ad..5dbd4ae 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -1,7 +1,7 @@ use crate::stream::{FuturesUnordered, StreamExt}; use futures_core::future::Future; use futures_core::stream::Stream; -use futures_core::task::{Context, Poll}; +use futures_core::{FusedStream, task::{Context, Poll}}; use pin_project::pin_project; use core::cmp::Ordering; use core::fmt::{self, Debug}; @@ -203,6 +203,12 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { } } +impl<Fut: Future> FusedStream for FuturesOrdered<Fut> { + fn is_terminated(&self) -> bool { + self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty() + } +} + impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { fn extend<I>(&mut self, iter: I) where diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs index 9277229..abb0264 100644 --- a/src/stream/futures_unordered/task.rs +++ b/src/stream/futures_unordered/task.rs @@ -70,7 +70,7 @@ impl<Fut> ArcWake for Task<Fut> { impl<Fut> Task<Fut> { /// Returns a waker reference for this task without cloning the Arc. - pub(super) fn waker_ref<'a>(this: &'a Arc<Task<Fut>>) -> WakerRef<'a> { + pub(super) fn waker_ref(this: &Arc<Task<Fut>>) -> WakerRef<'_> { waker_ref(this) } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 2a5ecf9..ca9bc89 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -28,6 +28,7 @@ pub use self::stream::Chunks; pub use self::stream::ReadyChunks; #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] @@ -36,6 +37,7 @@ pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] pub use self::stream::{ReuniteError, SplitSink, SplitStream}; @@ -43,10 +45,11 @@ mod try_stream; pub use self::try_stream::{ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext, - TrySkipWhile, TryStreamExt, TryUnfold, + TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] +#[cfg_attr(docsrs, doc(cfg(feature = "io")))] #[cfg(feature = "std")] pub use self::try_stream::IntoAsyncRead; @@ -97,3 +100,12 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; } + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +pub(crate) fn assert_stream<T, S>(stream: S) -> S + where + S: Stream<Item = T>, +{ + stream +} diff --git a/src/stream/once.rs b/src/stream/once.rs index 21cd14b..3a8fef6 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Creates a stream of a single element. /// @@ -20,8 +20,6 @@ pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { } /// A stream which emits single element and then EOF. -/// -/// This stream will never block and is always ready. #[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] @@ -39,16 +37,14 @@ impl<Fut> Once<Fut> { impl<Fut: Future> Stream for Once<Fut> { type Item = Fut::Output; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - #[project] - let Once { mut future } = self.project(); - let v = match future.as_mut().as_pin_mut() { + let mut this = self.project(); + let v = match this.future.as_mut().as_pin_mut() { Some(fut) => ready!(fut.poll(cx)), None => return Poll::Ready(None), }; - future.set(None); + this.future.set(None); Poll::Ready(Some(v)) } diff --git a/src/stream/select.rs b/src/stream/select.rs index 36503e4..7666386 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -2,7 +2,7 @@ use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`select()`] function. #[pin_project] @@ -58,11 +58,9 @@ impl<St1, St2> Select<St1, St2> { /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. - #[project] pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { - #[project] - let Select { stream1, stream2, .. } = self.project(); - (stream1.get_pin_mut(), stream2.get_pin_mut()) + let this = self.project(); + (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) } /// Consumes this combinator, returning the underlying streams. @@ -89,18 +87,15 @@ impl<St1, St2> Stream for Select<St1, St2> { type Item = St1::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> { - #[project] - let Select { flag, stream1, stream2 } = self.project(); - - if !*flag { - poll_inner(flag, stream1, stream2, cx) + let this = self.project(); + if !*this.flag { + poll_inner(this.flag, this.stream1, this.stream2, cx) } else { - poll_inner(flag, stream2, stream1, cx) + poll_inner(this.flag, this.stream2, this.stream1, cx) } } } diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs index a822576..24f4853 100644 --- a/src/stream/stream/buffer_unordered.rs +++ b/src/stream/stream/buffer_unordered.rs @@ -4,7 +4,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::fmt; use core::pin::Pin; @@ -62,31 +62,29 @@ where { type Item = <St::Item as Future>::Output; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let BufferUnordered { mut stream, in_progress_queue, max } = self.project(); + let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up // our queue of futures. - while in_progress_queue.len() < *max { - match stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => in_progress_queue.push(fut), + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match in_progress_queue.poll_next_unpin(cx) { + match this.in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet - if stream.is_done() { + if this.stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 9dff01f..626ead1 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -4,7 +4,7 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::fmt; use core::pin::Pin; @@ -59,31 +59,29 @@ where { type Item = <St::Item as Future>::Output; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Buffered { mut stream, in_progress_queue, max } = self.project(); + let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up // our queue of futures. - while in_progress_queue.len() < *max { - match stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => in_progress_queue.push(fut), + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - let res = in_progress_queue.poll_next_unpin(cx); + let res = this.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { return Poll::Ready(Some(val)) } // If more values are still coming from the stream, we're not done yet - if stream.is_done() { + if this.stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs index 1bb43b2..b9eb4ba 100644 --- a/src/stream/stream/catch_unwind.rs +++ b/src/stream/stream/catch_unwind.rs @@ -1,6 +1,6 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use std::any::Any; use std::pin::Pin; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; @@ -26,25 +26,23 @@ impl<St: Stream + UnwindSafe> CatchUnwind<St> { impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> { type Item = Result<St::Item, Box<dyn Any + Send>>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let CatchUnwind { stream, caught_unwind } = self.project(); + let mut this = self.project(); - if *caught_unwind { + if *this.caught_unwind { Poll::Ready(None) } else { let res = catch_unwind(AssertUnwindSafe(|| { - stream.poll_next(cx) + this.stream.as_mut().poll_next(cx) })); match res { Ok(poll) => poll.map(|opt| opt.map(Ok)), Err(e) => { - *caught_unwind = true; + *this.caught_unwind = true; Poll::Ready(Some(Err(e))) }, } diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index 720903c..c7fbd5f 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -1,7 +1,7 @@ use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`chain`](super::StreamExt::chain) method. #[pin_project] @@ -42,20 +42,18 @@ where St1: Stream, { type Item = St1::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Chain { mut first, second } = self.project(); - if let Some(first) = first.as_mut().as_pin_mut() { + let mut this = self.project(); + if let Some(first) = this.first.as_mut().as_pin_mut() { if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)) } } - first.set(None); - second.poll_next(cx) + this.first.set(None); + this.second.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index d24c31c..9b4ed93 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::mem; use core::pin::Pin; use alloc::vec::Vec; @@ -41,21 +41,19 @@ impl<St: Stream> Chunks<St> where St: Stream { impl<St: Stream> Stream for Chunks<St> { type Item = Vec<St::Item>; - #[project] fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Chunks { mut stream, items, cap } = self.as_mut().project(); + let mut this = self.as_mut().project(); loop { - match ready!(stream.as_mut().poll_next(cx)) { + match ready!(this.stream.as_mut().poll_next(cx)) { // Push the item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Some(item) => { - items.push(item); - if items.len() >= *cap { + this.items.push(item); + if this.items.len() >= *this.cap { return Poll::Ready(Some(self.take())) } } @@ -63,10 +61,10 @@ impl<St: Stream> Stream for Chunks<St> { // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. None => { - let last = if items.is_empty() { + let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(items, Vec::new()); + let full_buf = mem::replace(this.items, Vec::new()); Some(full_buf) }; diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs index 349e42d..6d07660 100644 --- a/src/stream/stream/collect.rs +++ b/src/stream/stream/collect.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`collect`](super::StreamExt::collect) method. #[pin_project] @@ -43,13 +43,11 @@ where St: Stream, { type Output = C; - #[project] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { - #[project] - let Collect { mut stream, collection } = self.as_mut().project(); + let mut this = self.as_mut().project(); loop { - match ready!(stream.as_mut().poll_next(cx)) { - Some(e) => collection.extend(Some(e)), + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(e) => this.collection.extend(Some(e)), None => return Poll::Ready(self.finish()), } } diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs index 647632b..9b37cd2 100644 --- a/src/stream/stream/concat.rs +++ b/src/stream/stream/concat.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`concat`](super::StreamExt::concat) method. #[pin_project] @@ -34,23 +34,21 @@ where St: Stream, { type Output = St::Item; - #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output> { - #[project] - let Concat { mut stream, accum } = self.project(); + let mut this = self.project(); loop { - match ready!(stream.as_mut().poll_next(cx)) { + match ready!(this.stream.as_mut().poll_next(cx)) { None => { - return Poll::Ready(accum.take().unwrap_or_default()) + return Poll::Ready(this.accum.take().unwrap_or_default()) } Some(e) => { - if let Some(a) = accum { + if let Some(a) = this.accum { a.extend(e) } else { - *accum = Some(e) + *this.accum = Some(e) } } } diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 477a052..4e6bac2 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. #[pin_project] @@ -35,18 +35,16 @@ impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { impl<St: Stream> Stream for Enumerate<St> { type Item = (usize, St::Item); - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Enumerate { stream, count } = self.project(); + let this = self.project(); - match ready!(stream.poll_next(cx)) { + match ready!(this.stream.poll_next(cx)) { Some(item) => { - let prev_count = *count; - *count += 1; + let prev_count = *this.count; + *this.count += 1; Poll::Ready(Some((prev_count, item))) } None => Poll::Ready(None), diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 9d848ad..55493fe 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use crate::fns::FnMut1; /// Stream for the [`filter`](super::StreamExt::filter) method. @@ -37,6 +37,7 @@ where } } +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<St, Fut, F> Filter<St, Fut, F> where St: Stream, F: for<'a> FnMut1<&'a St::Item, Output=Fut>, @@ -64,6 +65,7 @@ impl<St, Fut, F> FusedStream for Filter<St, Fut, F> } } +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<St, Fut, F> Stream for Filter<St, Fut, F> where St: Stream, F: for<'a> FnMut1<&'a St::Item, Output=Fut>, @@ -71,24 +73,22 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F> { type Item = St::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - #[project] - let Filter { mut stream, f, mut pending_fut, pending_item } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { let res = ready!(fut.poll(cx)); - pending_fut.set(None); + this.pending_fut.set(None); if res { - break pending_item.take(); + break this.pending_item.take(); } - *pending_item = None; - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - pending_fut.set(Some(f.call_mut(&item))); - *pending_item = Some(item); + *this.pending_item = None; + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.pending_fut.set(Some(this.f.call_mut(&item))); + *this.pending_item = Some(item); } else { break None; } diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 2d098ee..50c440f 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use crate::fns::FnMut1; /// Stream for the [`filter_map`](super::StreamExt::filter_map) method. @@ -61,24 +61,22 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F> { type Item = T; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<T>> { - #[project] - let FilterMap { mut stream, f, mut pending } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(p) = pending.as_mut().as_pin_mut() { + if let Some(p) = this.pending.as_mut().as_pin_mut() { // We have an item in progress, poll that until it's done let item = ready!(p.poll(cx)); - pending.set(None); + this.pending.set(None); if item.is_some() { break item; } - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { // No item in progress, but the stream is still going - pending.set(Some(f.call_mut(item))); + this.pending.set(Some(this.f.call_mut(item))); } else { // The stream is done break None; diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 4db77e1..75bbc21 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`flatten`](super::StreamExt::flatten) method. #[pin_project] @@ -41,19 +41,17 @@ where { type Item = <St::Item as Stream>::Item; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - #[project] - let Flatten { mut stream, mut next } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(s) = next.as_mut().as_pin_mut() { + if let Some(s) = this.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.poll_next(cx)) { break Some(item); } else { - next.set(None); + this.next.set(None); } - } else if let Some(s) = ready!(stream.as_mut().poll_next(cx)) { - next.set(Some(s)); + } else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)) { + this.next.set(Some(s)); } else { break None; } diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index d4bec25..6fce256 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`fold`](super::StreamExt::fold) method. #[pin_project] @@ -64,21 +64,19 @@ impl<St, Fut, T, F> Future for Fold<St, Fut, T, F> { type Output = T; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { - #[project] - let Fold { mut stream, f, accum, mut future } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value - *accum = Some(ready!(fut.poll(cx))); - future.set(None); - } else if accum.is_some() { + *this.accum = Some(ready!(fut.poll(cx))); + this.future.set(None); + } else if this.accum.is_some() { // we're waiting on a new item from the stream - let res = ready!(stream.as_mut().poll_next(cx)); - let a = accum.take().unwrap(); + let res = ready!(this.stream.as_mut().poll_next(cx)); + let a = this.accum.take().unwrap(); if let Some(item) = res { - future.set(Some(f(a, item))); + this.future.set(Some((this.f)(a, item))); } else { break a; } diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index fb3f40f..c8af21b 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`for_each`](super::StreamExt::for_each) method. #[pin_project] @@ -60,16 +60,14 @@ impl<St, Fut, F> Future for ForEach<St, Fut, F> { type Output = (); - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - #[project] - let ForEach { mut stream, f, mut future } = self.project(); + let mut this = self.project(); loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { ready!(fut.poll(cx)); - future.set(None); - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - future.set(Some(f(item))); + this.future.set(None); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.future.set(Some((this.f)(item))); } else { break; } diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index 88ff2d3..843ddaa 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -5,7 +5,7 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) /// method. @@ -66,17 +66,15 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> { type Output = (); - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - #[project] - let ForEachConcurrent { mut stream, f, futures, limit } = self.project(); + let mut this = self.project(); loop { let mut made_progress_this_iter = false; // Check if we've already created a number of futures greater than `limit` - if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { + if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { let mut stream_completed = false; - let elem = if let Some(stream) = stream.as_mut().as_pin_mut() { + let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() { match stream.poll_next(cx) { Poll::Ready(Some(elem)) => { made_progress_this_iter = true; @@ -92,17 +90,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> None }; if stream_completed { - stream.set(None); + this.stream.set(None); } if let Some(elem) = elem { - futures.push(f(elem)); + this.futures.push((this.f)(elem)); } } - match futures.poll_next_unpin(cx) { + match this.futures.poll_next_unpin(cx) { Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { - if stream.is_none() { + if this.stream.is_none() { return Poll::Ready(()) } }, diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs index 9776056..feef113 100644 --- a/src/stream/stream/forward.rs +++ b/src/stream/stream/forward.rs @@ -4,10 +4,10 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`forward`](super::StreamExt::forward) method. -#[pin_project] +#[pin_project(project = ForwardProj)] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Forward<St, Si, Item> { @@ -45,13 +45,11 @@ where { type Output = Result<(), E>; - #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - #[project] - let Forward { mut sink, mut stream, buffered_item } = self.project(); + let ForwardProj { mut sink, mut stream, buffered_item } = self.project(); let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion"); loop { @@ -61,7 +59,7 @@ where ready!(si.as_mut().poll_ready(cx))?; si.as_mut().start_send(buffered_item.take().unwrap())?; } - + match stream.as_mut().poll_next(cx)? { Poll::Ready(Some(item)) => { *buffered_item = Some(item); diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 971fe60..408ad81 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`fuse`](super::StreamExt::fuse) method. #[pin_project] @@ -41,21 +41,19 @@ impl<S: Stream> FusedStream for Fuse<S> { impl<S: Stream> Stream for Fuse<S> { type Item = S::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<S::Item>> { - #[project] - let Fuse { stream, done } = self.project(); + let this = self.project(); - if *done { + if *this.done { return Poll::Ready(None); } - let item = ready!(stream.poll_next(cx)); + let item = ready!(this.stream.poll_next(cx)); if item.is_none() { - *done = true; + *this.done = true; } Poll::Ready(item) } diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs index 0d49384..8aa2b1e 100644 --- a/src/stream/stream/into_future.rs +++ b/src/stream/stream/into_future.rs @@ -52,7 +52,7 @@ impl<St: Stream + Unpin> StreamFuture<St> { /// in order to return it to the caller of `Future::poll` if the stream yielded /// an element. pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> { - Pin::get_mut(self).stream.as_mut().map(Pin::new) + self.get_mut().stream.as_mut().map(Pin::new) } /// Consumes this combinator, returning the underlying stream. diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 755f53a..c877512 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -4,7 +4,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use crate::fns::FnMut1; @@ -51,15 +51,13 @@ impl<St, F> Stream for Map<St, F> { type Item = F::Output; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Map { stream, f } = self.project(); - let res = ready!(stream.poll_next(cx)); - Poll::Ready(res.map(|x| f.call_mut(x))) + let mut this = self.project(); + let res = ready!(this.stream.as_mut().poll_next(cx)); + Poll::Ready(res.map(|x| this.f.call_mut(x))) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 359bb2f..f988468 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -3,7 +3,7 @@ //! This module contains a number of functions for working with `Stream`s, //! including the `StreamExt` trait which adds methods to `Stream` types. -use crate::future::Either; +use crate::future::{assert_future, Either}; #[cfg(feature = "alloc")] use alloc::boxed::Box; use core::pin::Pin; @@ -48,7 +48,7 @@ pub use self::filter_map::FilterMap; mod flatten; delegate_all!( - /// Stream for the [`inspect`](StreamExt::inspect) method. + /// Stream for the [`flatten`](StreamExt::flatten) method. Flatten<St>( flatten::Flatten<St, St::Item> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] @@ -65,6 +65,7 @@ mod forward; #[cfg(feature = "sink")] delegate_all!( /// Future for the [`forward`](super::StreamExt::forward) method. + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] Forward<St, Si>( forward::Forward<St, Si, St::Ok> ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] @@ -177,9 +178,11 @@ cfg_target_has_atomic! { pub use self::for_each_concurrent::ForEachConcurrent; #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] mod split; #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::split::{SplitStream, SplitSink, ReuniteError}; @@ -190,6 +193,7 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; +use crate::stream::assert_stream; impl<T: ?Sized> StreamExt for T where T: Stream {} @@ -223,7 +227,7 @@ pub trait StreamExt: Stream { where Self: Unpin, { - Next::new(self) + assert_future::<Option<Self::Item>, _>(Next::new(self)) } /// Converts this stream into a future of `(next_item, tail_of_stream)`. @@ -258,7 +262,7 @@ pub trait StreamExt: Stream { where Self: Sized + Unpin, { - StreamFuture::new(self) + assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) } /// Maps this stream's items to a different type, returning a new stream of @@ -289,7 +293,7 @@ pub trait StreamExt: Stream { F: FnMut(Self::Item) -> T, Self: Sized, { - Map::new(self, f) + assert_stream::<T, _>(Map::new(self, f)) } /// Creates a stream which gives the current iteration count as well as @@ -306,7 +310,7 @@ pub trait StreamExt: Stream { /// # Overflow Behavior /// /// The method does no guarding against overflows, so enumerating more than - /// [`usize::max_value()`] elements either produces the wrong result or panics. If + /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If /// debug assertions are enabled, a panic is guaranteed. /// /// # Panics @@ -334,7 +338,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Enumerate::new(self) + assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) } /// Filters the values produced by this stream according to the provided @@ -369,7 +373,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = bool>, Self: Sized, { - Filter::new(self, f) + assert_stream::<Self::Item, _>(Filter::new(self, f)) } /// Filters the values produced by this stream while simultaneously mapping @@ -403,7 +407,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = Option<T>>, Self: Sized, { - FilterMap::new(self, f) + assert_stream::<T, _>(FilterMap::new(self, f)) } /// Computes from this stream's items new items of a different type using @@ -434,7 +438,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - Then::new(self, f) + assert_stream::<Fut::Output, _>(Then::new(self, f)) } /// Transforms a stream into a collection, returning a @@ -466,7 +470,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Collect::new(self) + assert_future::<C, _>(Collect::new(self)) } /// Concatenate all items of a stream into a single extendable @@ -506,7 +510,7 @@ pub trait StreamExt: Stream { Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, { - Concat::new(self) + assert_future::<Self::Item, _>(Concat::new(self)) } /// Execute an accumulating asynchronous computation over a stream, @@ -535,7 +539,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = T>, Self: Sized, { - Fold::new(self, f, init) + assert_future::<T, _>(Fold::new(self, f, init)) } /// Flattens a stream of streams into just one continuous stream. @@ -574,7 +578,7 @@ pub trait StreamExt: Stream { Self::Item: Stream, Self: Sized, { - Flatten::new(self) + assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) } /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. @@ -611,7 +615,7 @@ pub trait StreamExt: Stream { FlatMap::new(self, f) } - /// Combinator similar to [`StreamExt::fold`] that holds internal state + /// Combinator similar to [`StreamExt::fold`] that holds internal state /// and produces a new stream. /// /// Accepts initial state and closure which will be applied to each element @@ -672,7 +676,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = bool>, Self: Sized, { - SkipWhile::new(self, f) + assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) } /// Take elements from this stream while the provided asynchronous predicate @@ -702,7 +706,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = bool>, Self: Sized, { - TakeWhile::new(self, f) + assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) } /// Take elements from this stream until the provided future resolves. @@ -788,7 +792,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = ()>, Self: Sized, { - ForEach::new(self, f) + assert_future::<(), _>(ForEach::new(self, f)) } /// Runs this stream to completion, executing the provided asynchronous @@ -848,7 +852,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = ()>, Self: Sized, { - ForEachConcurrent::new(self, limit.into(), f) + assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) } /// Creates a new stream of at most `n` items of the underlying stream. @@ -871,7 +875,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Take::new(self, n) + assert_stream::<Self::Item, _>(Take::new(self, n)) } /// Creates a new stream which skips `n` items of the underlying stream. @@ -894,7 +898,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Skip::new(self, n) + assert_stream::<Self::Item, _>(Skip::new(self, n)) } /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never @@ -940,7 +944,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Fuse::new(self) + assert_stream::<Self::Item, _>(Fuse::new(self)) } /// Borrows a stream, rather than consuming it. @@ -1018,7 +1022,7 @@ pub trait StreamExt: Stream { where Self: Sized + std::panic::UnwindSafe, { - CatchUnwind::new(self) + assert_stream(CatchUnwind::new(self)) } /// Wrap the stream in a Box, pinning it. @@ -1030,7 +1034,7 @@ pub trait StreamExt: Stream { where Self: Sized + Send + 'a, { - Box::pin(self) + assert_stream::<Self::Item, _>(Box::pin(self)) } /// Wrap the stream in a Box, pinning it. @@ -1044,7 +1048,7 @@ pub trait StreamExt: Stream { where Self: Sized + 'a, { - Box::pin(self) + assert_stream::<Self::Item, _>(Box::pin(self)) } /// An adaptor for creating a buffered list of pending futures. @@ -1066,7 +1070,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - Buffered::new(self, n) + assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) } /// An adaptor for creating a buffered list of pending futures (unordered). @@ -1111,7 +1115,7 @@ pub trait StreamExt: Stream { Self::Item: Future, Self: Sized, { - BufferUnordered::new(self, n) + assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) } /// An adapter for zipping two streams together. @@ -1141,7 +1145,7 @@ pub trait StreamExt: Stream { St: Stream, Self: Sized, { - Zip::new(self, other) + assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) } /// Adapter for chaining two streams. @@ -1172,7 +1176,7 @@ pub trait StreamExt: Stream { St: Stream<Item = Self::Item>, Self: Sized, { - Chain::new(self, other) + assert_stream::<Self::Item, _>(Chain::new(self, other)) } /// Creates a new stream which exposes a `peek` method. @@ -1182,7 +1186,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Peekable::new(self) + assert_stream::<Self::Item, _>(Peekable::new(self)) } /// An adaptor for chunking up items of the stream inside a vector. @@ -1208,7 +1212,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Chunks::new(self, capacity) + assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity)) } /// An adaptor for chunking up ready items of the stream inside a vector. @@ -1248,6 +1252,7 @@ pub trait StreamExt: Stream { /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in /// order to preserve access to the `Sink`. #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, @@ -1266,13 +1271,15 @@ pub trait StreamExt: Stream { /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. #[cfg(feature = "sink")] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized, { - split::split(self) + let (sink, stream) = split::split(self); + (sink, assert_stream::<Self::Item, _>(stream)) } /// Do something with each item of this stream, afterwards passing it on. @@ -1285,7 +1292,7 @@ pub trait StreamExt: Stream { F: FnMut(&Self::Item), Self: Sized, { - Inspect::new(self, f) + assert_stream::<Self::Item, _>(Inspect::new(self, f)) } /// Wrap this stream in an `Either` stream, making it the left-hand variant @@ -1298,7 +1305,7 @@ pub trait StreamExt: Stream { B: Stream<Item = Self::Item>, Self: Sized, { - Either::Left(self) + assert_stream::<Self::Item, _>(Either::Left(self)) } /// Wrap this stream in an `Either` stream, making it the right-hand variant @@ -1311,7 +1318,7 @@ pub trait StreamExt: Stream { B: Stream<Item = Self::Item>, Self: Sized, { - Either::Right(self) + assert_stream::<Self::Item, _>(Either::Right(self)) } /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index fb0f874..1d8c342 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -6,7 +6,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// A `Stream` that implements a `peek` method. /// @@ -42,19 +42,17 @@ impl<St: Stream> Peekable<St> { /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. - #[project] pub fn poll_peek( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&St::Item>> { - #[project] - let Peekable { mut stream, peeked } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if peeked.is_some() { - break peeked.as_ref(); - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - *peeked = Some(item); + if this.peeked.is_some() { + break this.peeked.as_ref(); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + *this.peeked = Some(item); } else { break None; } @@ -71,14 +69,12 @@ impl<St: Stream> FusedStream for Peekable<St> { impl<S: Stream> Stream for Peekable<S> { type Item = S::Item; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - #[project] - let Peekable { stream, peeked } = self.project(); - if let Some(item) = peeked.take() { + let this = self.project(); + if let Some(item) = this.peeked.take() { return Poll::Ready(Some(item)); } - stream.poll_next(cx) + this.stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs index 2152cb7..192d3c6 100644 --- a/src/stream/stream/ready_chunks.rs +++ b/src/stream/stream/ready_chunks.rs @@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::mem; use core::pin::Pin; use alloc::vec::Vec; @@ -36,23 +36,21 @@ impl<St: Stream> ReadyChunks<St> where St: Stream { impl<St: Stream> Stream for ReadyChunks<St> { type Item = Vec<St::Item>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let ReadyChunks { items, cap, mut stream } = self.project(); + let mut this = self.project(); loop { - match stream.as_mut().poll_next(cx) { + match this.stream.as_mut().poll_next(cx) { // Flush all collected data if underlying stream doesn't contain // more ready values Poll::Pending => { - return if items.is_empty() { + return if this.items.is_empty() { Poll::Pending } else { - Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) + Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) } } @@ -60,19 +58,19 @@ impl<St: Stream> Stream for ReadyChunks<St> { // If so, replace our buffer with a new and empty one and return // the full one. Poll::Ready(Some(item)) => { - items.push(item); - if items.len() >= *cap { - return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap)))) + this.items.push(item); + if this.items.len() >= *this.cap { + return Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. Poll::Ready(None) => { - let last = if items.is_empty() { + let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(items, Vec::new()); + let full_buf = mem::replace(this.items, Vec::new()); Some(full_buf) }; diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 0cdfcbc..dd0316d 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; struct StateFn<S, F> { state: S, @@ -75,28 +75,26 @@ where { type Item = B; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { if self.is_done_taking() { return Poll::Ready(None); } - #[project] - let Scan { mut stream, state_f, mut future } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { let item = ready!(fut.poll(cx)); - future.set(None); + this.future.set(None); if item.is_none() { - *state_f = None; + *this.state_f = None; } break item; - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - let state_f = state_f.as_mut().unwrap(); - future.set(Some((state_f.f)(&mut state_f.state, item))) + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + let state_f = this.state_f.as_mut().unwrap(); + this.future.set(Some((state_f.f)(&mut state_f.state, item))) } else { break None; } diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index c0f6611..ea31b17 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`skip`](super::StreamExt::skip) method. #[pin_project] @@ -35,22 +35,21 @@ impl<St: FusedStream> FusedStream for Skip<St> { impl<St: Stream> Stream for Skip<St> { type Item = St::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - #[project] - let Skip { mut stream, remaining } = self.project(); - while *remaining > 0 { - if ready!(stream.as_mut().poll_next(cx)).is_some() { - *remaining -= 1; + let mut this = self.project(); + + while *this.remaining > 0 { + if ready!(this.stream.as_mut().poll_next(cx)).is_some() { + *this.remaining -= 1; } else { return Poll::Ready(None); } } - stream.poll_next(cx) + this.stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 3d664f2..2c58102 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. #[pin_project] @@ -71,30 +71,28 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F> { type Item = St::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>> { - #[project] - let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); + let mut this = self.project(); - if *done_skipping { - return stream.poll_next(cx); + if *this.done_skipping { + return this.stream.poll_next(cx); } Poll::Ready(loop { - if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { let skipped = ready!(fut.poll(cx)); - let item = pending_item.take(); - pending_fut.set(None); + let item = this.pending_item.take(); + this.pending_fut.set(None); if !skipped { - *done_skipping = true; + *this.done_skipping = true; break item; } - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - pending_fut.set(Some(f(&item))); - *pending_item = Some(item); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); } else { break None; } diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index c7237a2..991eb16 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -9,6 +9,7 @@ use crate::lock::BiLock; /// A `Stream` part of the split pair #[derive(Debug)] #[must_use = "streams do nothing unless polled"] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct SplitStream<S>(BiLock<S>); impl<S> Unpin for SplitStream<S> {} @@ -43,6 +44,7 @@ fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { /// A `Sink` part of the split pair #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct SplitSink<S, Item> { lock: BiLock<S>, slot: Option<Item>, @@ -119,6 +121,7 @@ pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves /// of a `Stream + Split`, and thus could not be `reunite`d. +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>); impl<T, Item> fmt::Debug for ReuniteError<T, Item> { diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 4a68920..1956a82 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -4,7 +4,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`take`](super::StreamExt::take) method. #[pin_project] @@ -32,7 +32,6 @@ impl<St> Stream for Take<St> { type Item = St::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -40,13 +39,12 @@ impl<St> Stream for Take<St> if self.remaining == 0 { Poll::Ready(None) } else { - #[project] - let Take { stream, remaining } = self.project(); - let next = ready!(stream.poll_next(cx)); + let this = self.project(); + let next = ready!(this.stream.poll_next(cx)); if next.is_some() { - *remaining -= 1; + *this.remaining -= 1; } else { - *remaining = 0; + *this.remaining = 0; } Poll::Ready(next) } diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs index 3662620..4e32ad8 100644 --- a/src/stream/stream/take_until.rs +++ b/src/stream/stream/take_until.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; // FIXME: docs, tests @@ -121,26 +121,24 @@ where { type Item = St::Item; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { - #[project] - let TakeUntil { stream, mut fut, fut_result, free } = self.project(); + let mut this = self.project(); - if let Some(f) = fut.as_mut().as_pin_mut() { + if let Some(f) = this.fut.as_mut().as_pin_mut() { if let Poll::Ready(result) = f.poll(cx) { - fut.set(None); - *fut_result = Some(result); + this.fut.set(None); + *this.fut_result = Some(result); } } - if !*free && fut.is_none() { + if !*this.free && this.fut.is_none() { // Future resolved, inner stream stopped Poll::Ready(None) } else { // Future either not resolved yet or taken out by the user - let item = ready!(stream.poll_next(cx)); + let item = ready!(this.stream.poll_next(cx)); if item.is_none() { - fut.set(None); + this.fut.set(None); } Poll::Ready(item) } diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index d90061e..e3a6d00 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`take_while`](super::StreamExt::take_while) method. #[pin_project] @@ -61,7 +61,6 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> { type Item = St::Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -70,23 +69,22 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> return Poll::Ready(None); } - #[project] - let TakeWhile { mut stream, f, mut pending_fut, pending_item, done_taking } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { let take = ready!(fut.poll(cx)); - let item = pending_item.take(); - pending_fut.set(None); + let item = this.pending_item.take(); + this.pending_fut.set(None); if take { break item; } else { - *done_taking = true; + *this.done_taking = true; break None; } - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - pending_fut.set(Some(f(&item))); - *pending_item = Some(item); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); } else { break None; } diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs index d54512e..1334a6c 100644 --- a/src/stream/stream/then.rs +++ b/src/stream/stream/then.rs @@ -5,7 +5,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`then`](super::StreamExt::then) method. #[pin_project] @@ -63,21 +63,19 @@ impl<St, Fut, F> Stream for Then<St, Fut, F> { type Item = Fut::Output; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Option<Fut::Output>> { - #[project] - let Then { mut stream, f, mut future } = self.project(); + ) -> Poll<Option<Self::Item>> { + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { let item = ready!(fut.poll(cx)); - future.set(None); + this.future.set(None); break Some(item); - } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - future.set(Some(f(item))); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + this.future.set(Some((this.f)(item))); } else { break None; } diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 6c148fb..522f7e1 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -3,7 +3,7 @@ use core::cmp; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`zip`](super::StreamExt::zip) method. #[pin_project] @@ -49,10 +49,8 @@ impl<St1: Stream, St2: Stream> Zip<St1, St2> { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { - unsafe { - let Self { stream1, stream2, .. } = self.get_unchecked_mut(); - (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut()) - } + let this = self.project(); + (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) } /// Consumes this combinator, returning the underlying streams. @@ -77,31 +75,29 @@ impl<St1, St2> Stream for Zip<St1, St2> { type Item = (St1::Item, St2::Item); - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project(); + let mut this = self.project(); - if queued1.is_none() { - match stream1.as_mut().poll_next(cx) { - Poll::Ready(Some(item1)) => *queued1 = Some(item1), + if this.queued1.is_none() { + match this.stream1.as_mut().poll_next(cx) { + Poll::Ready(Some(item1)) => *this.queued1 = Some(item1), Poll::Ready(None) | Poll::Pending => {} } } - if queued2.is_none() { - match stream2.as_mut().poll_next(cx) { - Poll::Ready(Some(item2)) => *queued2 = Some(item2), + if this.queued2.is_none() { + match this.stream2.as_mut().poll_next(cx) { + Poll::Ready(Some(item2)) => *this.queued2 = Some(item2), Poll::Ready(None) | Poll::Pending => {} } } - if queued1.is_some() && queued2.is_some() { - let pair = (queued1.take().unwrap(), queued2.take().unwrap()); + if this.queued1.is_some() && this.queued2.is_some() { + let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap()); Poll::Ready(Some(pair)) - } else if stream1.is_done() || stream2.is_done() { + } else if this.stream1.is_done() || this.stream2.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index 563ed34..4d24c4f 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. #[pin_project] @@ -50,21 +50,19 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F> { type Item = Result<Fut::Ok, St::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let AndThen { mut stream, mut future, f } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { let item = ready!(fut.try_poll(cx)); - future.set(None); + this.future.set(None); break Some(item); - } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { - future.set(Some(f(item))); + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.future.set(Some((this.f)(item))); } else { break None; } diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs index 71bbfd1..10291b0 100644 --- a/src/stream/try_stream/into_async_read.rs +++ b/src/stream/try_stream/into_async_read.rs @@ -9,6 +9,7 @@ use std::io::{Error, Result}; /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. #[derive(Debug)] #[must_use = "readers do nothing unless polled"] +#[cfg_attr(docsrs, doc(cfg(feature = "io")))] pub struct IntoAsyncRead<St> where St: TryStream<Error = Error> + Unpin, diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 99d5a6d..0c8e108 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -103,6 +103,10 @@ mod try_skip_while; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_skip_while::TrySkipWhile; +mod try_take_while; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_take_while::TryTakeWhile; + cfg_target_has_atomic! { #[cfg(feature = "alloc")] mod try_buffer_unordered; @@ -121,9 +125,12 @@ cfg_target_has_atomic! { #[cfg(feature = "std")] mod into_async_read; #[cfg(feature = "io")] +#[cfg_attr(docsrs, doc(cfg(feature = "io")))] #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; +use crate::future::assert_future; +use crate::stream::assert_stream; impl<S: ?Sized + TryStream> TryStreamExt for S {} @@ -151,7 +158,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Error: Into<E>, { - ErrInto::new(self) + assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self)) } /// Wraps the current stream in a new stream which maps the success value @@ -176,7 +183,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Ok) -> T, { - MapOk::new(self, f) + assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f)) } /// Wraps the current stream in a new stream which maps the error value @@ -201,7 +208,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, F: FnMut(Self::Error) -> E, { - MapErr::new(self, f) + assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) } /// Chain on a computation for when a value is ready, passing the successful @@ -248,7 +255,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Error = Self::Error>, Self: Sized, { - AndThen::new(self, f) + assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) } /// Chain on a computation for when an error happens, passing the @@ -274,7 +281,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = Self::Ok>, Self: Sized, { - OrElse::new(self, f) + assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f)) } /// Do something with the success value of this stream, afterwards passing @@ -288,7 +295,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok), Self: Sized, { - InspectOk::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) } /// Do something with the error value of this stream, afterwards passing it on. @@ -301,7 +308,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Error), Self: Sized, { - InspectErr::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) } /// Wraps a [`TryStream`] into a type that implements @@ -329,7 +336,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - IntoStream::new(self) + assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self)) } /// Creates a future that attempts to resolve the next item in the stream. @@ -356,7 +363,7 @@ pub trait TryStreamExt: TryStream { where Self: Unpin, { - TryNext::new(self) + assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self)) } /// Attempts to run this stream to completion, executing the provided @@ -398,7 +405,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized, { - TryForEach::new(self, f) + assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -428,7 +435,37 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized, { - TrySkipWhile::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f)) + } + + /// Take elements on this stream while the provided asynchronous predicate + /// resolves to `true`. + /// + /// This function is similar to + /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future; + /// use futures::stream::{self, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]); + /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3))); + /// + /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; + /// assert_eq!(output, Ok(vec![1, 2])); + /// # }) + /// ``` + fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> + where + F: FnMut(&Self::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = Self::Error>, + Self: Sized, + { + TryTakeWhile::new(self, f) } /// Attempts to run this stream to completion, executing the provided asynchronous @@ -484,7 +521,11 @@ pub trait TryStreamExt: TryStream { Fut: Future<Output = Result<(), Self::Error>>, Self: Sized, { - TryForEachConcurrent::new(self, limit.into(), f) + assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new( + self, + limit.into(), + f, + )) } /// Attempt to transform a stream into a collection, @@ -521,7 +562,7 @@ pub trait TryStreamExt: TryStream { where Self: Sized, { - TryCollect::new(self) + assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) } /// Attempt to filter the values produced by this stream according to the @@ -560,7 +601,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(&Self::Ok) -> Fut, Self: Sized, { - TryFilter::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f)) } /// Attempt to filter the values produced by this stream while @@ -601,7 +642,7 @@ pub trait TryStreamExt: TryStream { F: FnMut(Self::Ok) -> Fut, Self: Sized, { - TryFilterMap::new(self, f) + assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) } /// Flattens a stream of streams into just one continuous stream. @@ -648,7 +689,9 @@ pub trait TryStreamExt: TryStream { <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized, { - TryFlatten::new(self) + assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( + TryFlatten::new(self), + ) } /// Attempt to execute an accumulating asynchronous computation over a @@ -685,7 +728,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized, { - TryFold::new(self, f, init) + assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init)) } /// Attempt to concatenate all items of a stream into a single @@ -727,7 +770,7 @@ pub trait TryStreamExt: TryStream { Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default, { - TryConcat::new(self) + assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) } /// Attempt to execute several futures from a stream concurrently. @@ -794,7 +837,9 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture<Error = Self::Error>, Self: Sized, { - TryBufferUnordered::new(self, n) + assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>( + TryBufferUnordered::new(self, n), + ) } // TODO: false positive warning from rustdoc. Verify once #43466 settles @@ -831,6 +876,7 @@ pub trait TryStreamExt: TryStream { /// # assert_eq!(42, futures::executor::block_on(rx).unwrap()); /// ``` #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> Compat<Self> where Self: Sized + Unpin, @@ -864,6 +910,7 @@ pub trait TryStreamExt: TryStream { /// # }) /// ``` #[cfg(feature = "io")] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead<Self> where diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index 0bba0d0..d972c0f 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. #[pin_project] @@ -50,24 +50,22 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F> { type Item = Result<St::Ok, Fut::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let OrElse { mut stream, mut future, f } = self.project(); + let mut this = self.project(); Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { let item = ready!(fut.try_poll(cx)); - future.set(None); + this.future.set(None); break Some(item); } else { - match ready!(stream.as_mut().try_poll_next(cx)) { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(item)) => break Some(Ok(item)), Some(Err(e)) => { - future.set(Some(f(e))); + this.future.set(Some((this.f)(e))); }, None => break None, } diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs index 566868b..c1c931a 100644 --- a/src/stream/try_stream/try_buffer_unordered.rs +++ b/src/stream/try_stream/try_buffer_unordered.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; use core::pin::Pin; /// Stream for the @@ -43,31 +43,29 @@ impl<St> Stream for TryBufferUnordered<St> { type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project(); + let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up // our queue of futures. Propagate errors from the stream immediately. - while in_progress_queue.len() < *max { - match stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()), + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match in_progress_queue.poll_next_unpin(cx) { + match this.in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet - if stream.is_done() { + if this.stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index 3c9aee2..76ce619 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, TryStream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. #[pin_project] @@ -41,17 +41,15 @@ where { type Output = Result<C, St::Error>; - #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - #[project] - let TryCollect { mut stream, items } = self.project(); + let mut this = self.project(); Poll::Ready(Ok(loop { - match ready!(stream.as_mut().try_poll_next(cx)?) { - Some(x) => items.extend(Some(x)), - None => break mem::replace(items, Default::default()), + match ready!(this.stream.as_mut().try_poll_next(cx)?) { + Some(x) => this.items.extend(Some(x)), + None => break mem::replace(this.items, Default::default()), } })) } diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 8c9710b..235a2c5 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. #[pin_project] @@ -34,19 +34,18 @@ where { type Output = Result<St::Ok, St::Error>; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] - let TryConcat { mut stream, accum } = self.project(); + let mut this = self.project(); + Poll::Ready(Ok(loop { - if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) { - if let Some(a) = accum { + if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + if let Some(a) = this.accum { a.extend(x) } else { - *accum = Some(x) + *this.accum = Some(x) } } else { - break accum.take().unwrap_or_default(); + break this.accum.take().unwrap_or_default(); } })) } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 310f991..38d060c 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) /// method. @@ -69,24 +69,23 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F> { type Item = Result<St::Ok, St::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Option<Result<St::Ok, St::Error>>> { - #[project] - let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project(); + ) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + Poll::Ready(loop { - if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { let res = ready!(fut.poll(cx)); - pending_fut.set(None); + this.pending_fut.set(None); if res { - break pending_item.take().map(Ok); + break this.pending_item.take().map(Ok); } - *pending_item = None; - } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { - pending_fut.set(Some(f(&item))); - *pending_item = Some(item); + *this.pending_item = None; + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); } else { break None; } diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index ba8e43a..6c1e48f 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) /// method. @@ -57,24 +57,23 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> { type Item = Result<T, St::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Option<Result<T, St::Error>>> { - #[project] - let TryFilterMap { mut stream, f, mut pending } = self.project(); + ) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + Poll::Ready(loop { - if let Some(p) = pending.as_mut().as_pin_mut() { + if let Some(p) = this.pending.as_mut().as_pin_mut() { // We have an item in progress, poll that until it's done let item = ready!(p.try_poll(cx)?); - pending.set(None); + this.pending.set(None); if item.is_some() { break item.map(Ok); } - } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { // No item in progress, but the stream is still going - pending.set(Some(f(item))); + this.pending.set(Some((this.f)(item))); } else { // The stream is done break None; diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs index a528639..5227903 100644 --- a/src/stream/try_stream/try_flatten.rs +++ b/src/stream/try_stream/try_flatten.rs @@ -3,7 +3,7 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. #[pin_project] @@ -51,19 +51,18 @@ where { type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>; - #[project] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - #[project] - let TryFlatten { mut stream, mut next } = self.project(); + let mut this = self.project(); + Poll::Ready(loop { - if let Some(s) = next.as_mut().as_pin_mut() { + if let Some(s) = this.next.as_mut().as_pin_mut() { if let Some(item) = ready!(s.try_poll_next(cx)?) { break Some(Ok(item)); } else { - next.set(None); + this.next.set(None); } - } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) { - next.set(Some(s)); + } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.next.set(Some(s)); } else { break None; } diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index d85c1fe..abeeea4 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. #[pin_project] @@ -64,25 +64,24 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> { type Output = Result<T, St::Error>; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] - let TryFold { mut stream, f, accum, mut future } = self.project(); + let mut this = self.project(); + Poll::Ready(loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { // we're currently processing a future to produce a new accum value let res = ready!(fut.try_poll(cx)); - future.set(None); + this.future.set(None); match res { - Ok(a) => *accum = Some(a), + Ok(a) => *this.accum = Some(a), Err(e) => break Err(e), } - } else if accum.is_some() { + } else if this.accum.is_some() { // we're waiting on a new item from the stream - let res = ready!(stream.as_mut().try_poll_next(cx)); - let a = accum.take().unwrap(); + let res = ready!(this.stream.as_mut().try_poll_next(cx)); + let a = this.accum.take().unwrap(); match res { - Some(Ok(item)) => future.set(Some(f(a, item))), + Some(Ok(item)) => this.future.set(Some((this.f)(a, item))), Some(Err(e)) => break Err(e), None => break Ok(a), } diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs index 5fc91df..a00e911 100644 --- a/src/stream/try_stream/try_for_each.rs +++ b/src/stream/try_stream/try_for_each.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. #[pin_project] @@ -50,17 +50,15 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F> { type Output = Result<(), St::Error>; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] - let TryForEach { mut stream, f, mut future } = self.project(); + let mut this = self.project(); loop { - if let Some(fut) = future.as_mut().as_pin_mut() { + if let Some(fut) = this.future.as_mut().as_pin_mut() { ready!(fut.try_poll(cx))?; - future.set(None); + this.future.set(None); } else { - match ready!(stream.as_mut().try_poll_next(cx)?) { - Some(e) => future.set(Some(f(e))), + match ready!(this.stream.as_mut().try_poll_next(cx)?) { + Some(e) => this.future.set(Some((this.f)(e))), None => break, } } diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index 87fd465..db8e505 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -6,7 +6,7 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Future for the /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) @@ -68,16 +68,14 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> { type Output = Result<(), St::Error>; - #[project] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - #[project] - let TryForEachConcurrent { mut stream, f, futures, limit } = self.project(); + let mut this = self.project(); loop { let mut made_progress_this_iter = false; // Check if we've already created a number of futures greater than `limit` - if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { - let poll_res = match stream.as_mut().as_pin_mut() { + if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) { + let poll_res = match this.stream.as_mut().as_pin_mut() { Some(stream) => stream.try_poll_next(cx), None => Poll::Ready(None), }; @@ -88,28 +86,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Some(elem) }, Poll::Ready(None) => { - stream.set(None); + this.stream.set(None); None } Poll::Pending => None, Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - stream.set(None); - drop(mem::replace(futures, FuturesUnordered::new())); + this.stream.set(None); + drop(mem::replace(this.futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } }; if let Some(elem) = elem { - futures.push(f(elem)); + this.futures.push((this.f)(elem)); } } - match futures.poll_next_unpin(cx) { + match this.futures.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { - if stream.is_none() { + if this.stream.is_none() { return Poll::Ready(Ok(())) } }, @@ -117,8 +115,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - stream.set(None); - drop(mem::replace(futures, FuturesUnordered::new())); + this.stream.set(None); + drop(mem::replace(this.futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } } diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index 624380f..35759d0 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -5,7 +5,7 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) /// method. @@ -62,30 +62,28 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> { type Item = Result<St::Ok, St::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); + let mut this = self.project(); - if *done_skipping { - return stream.try_poll_next(cx); + if *this.done_skipping { + return this.stream.try_poll_next(cx); } Poll::Ready(loop { - if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { let skipped = ready!(fut.try_poll(cx)?); - let item = pending_item.take(); - pending_fut.set(None); + let item = this.pending_item.take(); + this.pending_fut.set(None); if !skipped { - *done_skipping = true; + *this.done_skipping = true; break item.map(Ok); } - } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { - pending_fut.set(Some(f(&item))); - *pending_item = Some(item); + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); } else { break None; } diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs new file mode 100644 index 0000000..16bfb20 --- /dev/null +++ b/src/stream/try_stream/try_take_while.rs @@ -0,0 +1,132 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::TryFuture; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project::pin_project; + +/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while) +/// method. +#[pin_project] +#[must_use = "streams do nothing unless polled"] +pub struct TryTakeWhile<St, Fut, F> +where + St: TryStream, +{ + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Ok>, + done_taking: bool, +} + +impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F> +where + St: TryStream + fmt::Debug, + St::Ok: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryTakeWhile") + .field("stream", &self.stream) + .field("pending_fut", &self.pending_fut) + .field("pending_item", &self.pending_item) + .field("done_taking", &self.done_taking) + .finish() + } +} + +impl<St, Fut, F> TryTakeWhile<St, Fut, F> +where + St: TryStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, +{ + pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> { + TryTakeWhile { + stream, + f, + pending_fut: None, + pending_item: None, + done_taking: false, + } + } + + delegate_access_inner!(stream, St, ()); +} + +impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F> +where + St: TryStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, +{ + type Item = Result<St::Ok, St::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + if *this.done_taking { + return Poll::Ready(None); + } + + Poll::Ready(loop { + if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { + let take = ready!(fut.try_poll(cx)?); + let item = this.pending_item.take(); + this.pending_fut.set(None); + if take { + break item.map(Ok); + } else { + *this.done_taking = true; + break None; + } + } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) { + this.pending_fut.set(Some((this.f)(&item))); + *this.pending_item = Some(item); + } else { + break None; + } + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.done_taking { + return (0, Some(0)); + } + + let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let (_, upper) = self.stream.size_hint(); + let upper = match upper { + Some(x) => x.checked_add(pending_len), + None => None, + }; + (0, upper) // can't know a lower bound, due to the predicate + } +} + +impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F> +where + St: TryStream + FusedStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, +{ + fn is_terminated(&self) -> bool { + self.done_taking || self.pending_item.is_none() && self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F> +where + S: TryStream + Sink<Item, Error = E>, +{ + type Error = E; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index 8da1248..a6b31fe 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. /// @@ -96,30 +96,28 @@ where { type Item = Result<Item, Fut::Error>; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Option<Result<Item, Fut::Error>>> { - #[project] - let TryUnfold {f, state, mut fut } = self.project(); + ) -> Poll<Option<Self::Item>> { + let mut this = self.project(); - if let Some(state) = state.take() { - fut.set(Some(f(state))); + if let Some(state) = this.state.take() { + this.fut.set(Some((this.f)(state))); } - match fut.as_mut().as_pin_mut() { + match this.fut.as_mut().as_pin_mut() { None => { // The future previously errored Poll::Ready(None) } Some(future) => { let step = ready!(future.try_poll(cx)); - fut.set(None); + this.fut.set(None); match step { Ok(Some((item, next_state))) => { - *state = Some(next_state); + *this.state = Some(next_state); Poll::Ready(Some(Ok(item))) } Ok(None) => Poll::Ready(None), diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 0279571..b6f8eae 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::{pin_project, project}; +use pin_project::pin_project; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// @@ -93,24 +93,22 @@ impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> { type Item = Item; - #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - #[project] - let Unfold { state, f, mut fut } = self.project(); + let mut this = self.project(); - if let Some(state) = state.take() { - fut.set(Some(f(state))); + if let Some(state) = this.state.take() { + this.fut.set(Some((this.f)(state))); } - let step = ready!(fut.as_mut().as_pin_mut() + let step = ready!(this.fut.as_mut().as_pin_mut() .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); - fut.set(None); + this.fut.set(None); if let Some((item, next_state)) = step { - *state = Some(next_state); + *this.state = Some(next_state); Poll::Ready(Some(item)) } else { Poll::Ready(None) diff --git a/src/task/spawn.rs b/src/task/spawn.rs index f34445a..f877923 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -69,6 +69,7 @@ pub trait SpawnExt: Spawn { /// assert_eq!(block_on(join_handle_fut), 1); /// ``` #[cfg(feature = "channel")] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] fn spawn_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError> where @@ -83,6 +84,7 @@ pub trait SpawnExt: Spawn { /// Wraps a [`Spawn`] and makes it usable as a futures 0.1 `Executor`. /// Requires the `compat` feature to enable. #[cfg(feature = "compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> Compat<Self> where Self: Sized, @@ -145,6 +147,7 @@ pub trait LocalSpawnExt: LocalSpawn { /// assert_eq!(executor.run_until(join_handle_fut), 1); /// ``` #[cfg(feature = "channel")] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] #[cfg(feature = "std")] fn spawn_local_with_handle<Fut>( &self, |