diff options
Diffstat (limited to 'src/stream/try_stream')
-rw-r--r-- | src/stream/try_stream/and_then.rs | 74 | ||||
-rw-r--r-- | src/stream/try_stream/err_into.rs | 98 | ||||
-rw-r--r-- | src/stream/try_stream/inspect_err.rs | 118 | ||||
-rw-r--r-- | src/stream/try_stream/inspect_ok.rs | 118 | ||||
-rw-r--r-- | src/stream/try_stream/into_stream.rs | 40 | ||||
-rw-r--r-- | src/stream/try_stream/map_err.rs | 112 | ||||
-rw-r--r-- | src/stream/try_stream/map_ok.rs | 112 | ||||
-rw-r--r-- | src/stream/try_stream/mod.rs | 58 | ||||
-rw-r--r-- | src/stream/try_stream/or_else.rs | 79 | ||||
-rw-r--r-- | src/stream/try_stream/try_buffer_unordered.rs | 59 | ||||
-rw-r--r-- | src/stream/try_stream/try_collect.rs | 28 | ||||
-rw-r--r-- | src/stream/try_stream/try_concat.rs | 36 | ||||
-rw-r--r-- | src/stream/try_stream/try_filter.rs | 83 | ||||
-rw-r--r-- | src/stream/try_stream/try_filter_map.rs | 80 | ||||
-rw-r--r-- | src/stream/try_stream/try_flatten.rs | 84 | ||||
-rw-r--r-- | src/stream/try_stream/try_fold.rs | 70 | ||||
-rw-r--r-- | src/stream/try_stream/try_for_each.rs | 34 | ||||
-rw-r--r-- | src/stream/try_stream/try_for_each_concurrent.rs | 42 | ||||
-rw-r--r-- | src/stream/try_stream/try_skip_while.rs | 94 | ||||
-rw-r--r-- | src/stream/try_stream/try_unfold.rs | 33 |
20 files changed, 300 insertions, 1152 deletions
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index 809c32a..563ed34 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct AndThen<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for AndThen<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> AndThen<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> AndThen<St, Fut, F> where St: TryStream, F: FnMut(St::Ok) -> Fut, @@ -45,37 +40,7 @@ impl<St, Fut, F> AndThen<St, Fut, F> Self { stream, future: None, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for AndThen<St, Fut, F> @@ -85,22 +50,25 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F> { type Item = Result<Fut::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - None => return Poll::Ready(None), - Some(e) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let AndThen { mut stream, mut future, f } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.try_poll(cx)); + future.set(None); + break Some(item); + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + future.set(Some(f(item))); + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/err_into.rs b/src/stream/try_stream/err_into.rs deleted file mode 100644 index f5d9294..0000000 --- a/src/stream/try_stream/err_into.rs +++ /dev/null @@ -1,98 +0,0 @@ -use core::marker::PhantomData; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::unsafe_pinned; - -/// Stream for the [`err_into`](super::TryStreamExt::err_into) method. -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct ErrInto<St, E> { - stream: St, - _marker: PhantomData<E>, -} - -impl<St: Unpin, E> Unpin for ErrInto<St, E> {} - -impl<St, E> ErrInto<St, E> { - unsafe_pinned!(stream: St); - - pub(super) fn new(stream: St) -> Self { - ErrInto { stream, _marker: PhantomData } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, E> FusedStream for ErrInto<St, E> -where - St: TryStream + FusedStream, - St::Error: Into<E>, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, E> Stream for ErrInto<St, E> -where - St: TryStream, - St::Error: Into<E>, -{ - type Item = Result<St::Ok, E>; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.stream().try_poll_next(cx) - .map(|res| res.map(|some| some.map_err(Into::into))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, E, Item> Sink<Item> for ErrInto<S, E> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/inspect_err.rs b/src/stream/try_stream/inspect_err.rs deleted file mode 100644 index 3c23ae0..0000000 --- a/src/stream/try_stream/inspect_err.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::stream::stream::inspect; -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. -#[must_use = "streams do nothing unless polled"] -pub struct InspectErr<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for InspectErr<St, F> {} - -impl<St, F> fmt::Debug for InspectErr<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("InspectErr") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> InspectErr<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); -} - -impl<St, F> InspectErr<St, F> -where - St: TryStream, - F: FnMut(&St::Error), -{ - pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F> FusedStream for InspectErr<St, F> -where - St: TryStream + FusedStream, - F: FnMut(&St::Error), -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F> Stream for InspectErr<St, F> -where - St: TryStream, - F: FnMut(&St::Error), -{ - type Item = Result<St::Ok, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f())))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for InspectErr<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/inspect_ok.rs b/src/stream/try_stream/inspect_ok.rs deleted file mode 100644 index 89fb459..0000000 --- a/src/stream/try_stream/inspect_ok.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::stream::stream::inspect; -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. -#[must_use = "streams do nothing unless polled"] -pub struct InspectOk<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for InspectOk<St, F> {} - -impl<St, F> fmt::Debug for InspectOk<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("InspectOk") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> InspectOk<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); -} - -impl<St, F> InspectOk<St, F> -where - St: TryStream, - F: FnMut(&St::Ok), -{ - pub(super) fn new(stream: St, f: F) -> Self { - Self { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F> FusedStream for InspectOk<St, F> -where - St: TryStream + FusedStream, - F: FnMut(&St::Ok), -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F> Stream for InspectOk<St, F> -where - St: TryStream, - F: FnMut(&St::Ok), -{ - type Item = Result<St::Ok, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f())))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for InspectOk<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs index b0fa07a..370a327 100644 --- a/src/stream/try_stream/into_stream.rs +++ b/src/stream/try_stream/into_stream.rs @@ -3,54 +3,24 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::pin_project; /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct IntoStream<St> { + #[pin] stream: St, } impl<St> IntoStream<St> { - unsafe_pinned!(stream: St); - #[inline] pub(super) fn new(stream: St) -> Self { IntoStream { stream } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> { @@ -67,7 +37,7 @@ impl<St: TryStream> Stream for IntoStream<St> { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - self.stream().try_poll_next(cx) + self.project().stream.try_poll_next(cx) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/map_err.rs b/src/stream/try_stream/map_err.rs deleted file mode 100644 index 1b98d6b..0000000 --- a/src/stream/try_stream/map_err.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`map_err`](super::TryStreamExt::map_err) method. -#[must_use = "streams do nothing unless polled"] -pub struct MapErr<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for MapErr<St, F> {} - -impl<St, F> fmt::Debug for MapErr<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MapErr") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> MapErr<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - /// Creates a new MapErr. - pub(super) fn new(stream: St, f: F) -> Self { - MapErr { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F, E> FusedStream for MapErr<St, F> -where - St: TryStream + FusedStream, - F: FnMut(St::Error) -> E, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F, E> Stream for MapErr<St, F> -where - St: TryStream, - F: FnMut(St::Error) -> E, -{ - type Item = Result<St::Ok, E>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e)))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for MapErr<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/map_ok.rs b/src/stream/try_stream/map_ok.rs deleted file mode 100644 index 19d01be..0000000 --- a/src/stream/try_stream/map_ok.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core::fmt; -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; - -/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. -#[must_use = "streams do nothing unless polled"] -pub struct MapOk<St, F> { - stream: St, - f: F, -} - -impl<St: Unpin, F> Unpin for MapOk<St, F> {} - -impl<St, F> fmt::Debug for MapOk<St, F> -where - St: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MapOk") - .field("stream", &self.stream) - .finish() - } -} - -impl<St, F> MapOk<St, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - - /// Creates a new MapOk. - pub(super) fn new(stream: St, f: F) -> Self { - MapOk { stream, f } - } - - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } -} - -impl<St, F, T> FusedStream for MapOk<St, F> -where - St: TryStream + FusedStream, - F: FnMut(St::Ok) -> T, -{ - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl<St, F, T> Stream for MapOk<St, F> -where - St: TryStream, - F: FnMut(St::Ok) -> T, -{ - type Item = Result<T, St::Error>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - self.as_mut() - .stream() - .try_poll_next(cx) - .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x)))) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl<S, F, Item> Sink<Item> for MapOk<S, F> -where - S: Sink<Item>, -{ - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 6a7ced4..99d5a6d 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -11,34 +11,53 @@ use futures_core::{ stream::TryStream, task::{Context, Poll}, }; +use crate::fns::{ + InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, +}; +use crate::stream::{Map, Inspect}; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::and_then::AndThen; -mod err_into; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::err_into::ErrInto; - -mod inspect_ok; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::inspect_ok::InspectOk; - -mod inspect_err; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::inspect_err::InspectErr; +delegate_all!( + /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. + ErrInto<St, E>( + MapErr<St, IntoFn<E>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] +); + +delegate_all!( + /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. + InspectOk<St, F>( + Inspect<IntoStream<St>, InspectOkFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] +); + +delegate_all!( + /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. + InspectErr<St, F>( + Inspect<IntoStream<St>, InspectErrFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] +); mod into_stream; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_stream::IntoStream; -mod map_ok; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::map_ok::MapOk; +delegate_all!( + /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. + MapOk<St, F>( + Map<IntoStream<St>, MapOkFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] +); -mod map_err; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::map_err::MapErr; +delegate_all!( + /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. + MapErr<St, F>( + Map<IntoStream<St>, MapErrFn<F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] +); mod or_else; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -385,8 +404,9 @@ pub trait TryStreamExt: TryStream { /// Skip elements on this stream while the provided asynchronous predicate /// resolves to `true`. /// - /// This function is similar to [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) - /// but exits early if an error occurs. + /// This function is similar to + /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits + /// early if an error occurs. /// /// # Examples /// diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index 33310d1..0bba0d0 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct OrElse<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for OrElse<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> OrElse<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> OrElse<St, Fut, F> where St: TryStream, F: FnMut(St::Error) -> Fut, @@ -45,37 +40,7 @@ impl<St, Fut, F> OrElse<St, Fut, F> Self { stream, future: None, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for OrElse<St, Fut, F> @@ -85,23 +50,29 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F> { type Item = Result<St::Ok, Fut::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { - None => return Poll::Ready(None), - Some(Ok(e)) => return Poll::Ready(Some(Ok(e))), - Some(Err(e)) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let OrElse { mut stream, mut future, f } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.try_poll(cx)); + future.set(None); + break Some(item); + } else { + match ready!(stream.as_mut().try_poll_next(cx)) { + Some(Ok(item)) => break Some(Ok(item)), + Some(Err(e)) => { + future.set(Some(f(e))); + }, + None => break None, + } + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs index d11e1b4..566868b 100644 --- a/src/stream/try_stream/try_buffer_unordered.rs +++ b/src/stream/try_stream/try_buffer_unordered.rs @@ -5,32 +5,27 @@ use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; use core::pin::Pin; /// Stream for the /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryBufferUnordered<St> where St: TryStream { + #[pin] stream: Fuse<IntoStream<St>>, in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>, max: usize, } -impl<St> Unpin for TryBufferUnordered<St> - where St: TryStream + Unpin -{} - impl<St> TryBufferUnordered<St> where St: TryStream, St::Ok: TryFuture, { - unsafe_pinned!(stream: Fuse<IntoStream<St>>); - unsafe_unpinned!(in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>); - pub(super) fn new(stream: St, n: usize) -> Self { TryBufferUnordered { stream: IntoStream::new(stream).fuse(), @@ -39,37 +34,7 @@ impl<St> TryBufferUnordered<St> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - self.stream.get_ref().get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - self.stream.get_mut().get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream().get_pin_mut().get_pin_mut() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream.into_inner().into_inner() - } + delegate_access_inner!(stream, St, (. .)); } impl<St> Stream for TryBufferUnordered<St> @@ -78,27 +43,31 @@ impl<St> Stream for TryBufferUnordered<St> { type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + #[project] + let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project(); + // First up, try to spawn off as many futures as possible by filling up // our queue of futures. Propagate errors from the stream immediately. - while self.in_progress_queue.len() < self.max { - match self.as_mut().stream().poll_next(cx)? { - Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut.into_future()), + while in_progress_queue.len() < *max { + match stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue - match self.as_mut().in_progress_queue().poll_next_unpin(cx) { + match in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet - if self.stream.is_done() { + if stream.is_done() { Poll::Ready(None) } else { Poll::Pending diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index d22e8e8..3c9aee2 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -3,34 +3,27 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, TryStream}; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryCollect<St, C> { + #[pin] stream: St, items: C, } impl<St: TryStream, C: Default> TryCollect<St, C> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(items: C); - pub(super) fn new(s: St) -> TryCollect<St, C> { TryCollect { stream: s, items: Default::default(), } } - - fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.items(), Default::default()) - } } -impl<St: Unpin + TryStream, C> Unpin for TryCollect<St, C> {} - impl<St, C> FusedFuture for TryCollect<St, C> where St: TryStream + FusedStream, @@ -48,15 +41,18 @@ where { type Output = Result<C, St::Error>; + #[project] fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => self.as_mut().items().extend(Some(x)), - None => return Poll::Ready(Ok(self.as_mut().finish())), + #[project] + let TryCollect { mut stream, items } = self.project(); + Poll::Ready(Ok(loop { + match ready!(stream.as_mut().try_poll_next(cx)?) { + Some(x) => items.extend(Some(x)), + None => break mem::replace(items, Default::default()), } - } + })) } } diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 395f166..8c9710b 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -2,26 +2,23 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryConcat<St: TryStream> { + #[pin] stream: St, accum: Option<St::Ok>, } -impl<St: TryStream + Unpin> Unpin for TryConcat<St> {} - impl<St> TryConcat<St> where St: TryStream, St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option<St::Ok>); - pub(super) fn new(stream: St) -> TryConcat<St> { TryConcat { stream, @@ -37,21 +34,20 @@ where { type Output = Result<St::Ok, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => { - let accum = self.as_mut().accum(); - if let Some(a) = accum { - a.extend(x) - } else { - *accum = Some(x) - } - }, - None => { - return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default())) + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryConcat { mut stream, accum } = self.project(); + Poll::Ready(Ok(loop { + if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) { + if let Some(a) = accum { + a.extend(x) + } else { + *accum = Some(x) } + } else { + break accum.take().unwrap_or_default(); } - } + })) } } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 24a9c32..310f991 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -5,24 +5,23 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilter<St, Fut, F> where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Ok>, } -impl<St, Fut, F> Unpin for TryFilter<St, Fut, F> - where St: TryStream + Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F> where St: TryStream + fmt::Debug, @@ -41,11 +40,6 @@ where impl<St, Fut, F> TryFilter<St, Fut, F> where St: TryStream { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Ok>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilter { stream, @@ -55,37 +49,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F> @@ -105,29 +69,28 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F> { type Item = Result<St::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<St::Ok, St::Error>>> { - loop { - if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); - - if yield_item { - return Poll::Ready(Some(Ok(item))); + #[project] + let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project(); + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let res = ready!(fut.poll(cx)); + pending_fut.set(None); + if res { + break pending_item.take().map(Ok); + } + *pending_item = None; + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index ed7eeb2..ba8e43a 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -5,21 +5,20 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilterMap<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] pending: Option<Fut>, } -impl<St, Fut, F> Unpin for TryFilterMap<St, Fut, F> - where St: Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F> where St: fmt::Debug, @@ -34,45 +33,11 @@ where } impl<St, Fut, F> TryFilterMap<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilterMap { stream, f, pending: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F> @@ -92,26 +57,29 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> { type Item = Result<T, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, St::Error>>> { - loop { - if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); - } - - let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().pending().set(None); - if let Some(x) = result? { - return Poll::Ready(Some(Ok(x))); + #[project] + let TryFilterMap { mut stream, f, mut pending } = self.project(); + Poll::Ready(loop { + if let Some(p) = pending.as_mut().as_pin_mut() { + // We have an item in progress, poll that until it's done + let item = ready!(p.try_poll(cx)?); + pending.set(None); + if item.is_some() { + break item.map(Ok); + } + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + // No item in progress, but the stream is still going + pending.set(Some(f(item))); + } else { + // The stream is done + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs index 5f81b22..a528639 100644 --- a/src/stream/try_stream/try_flatten.rs +++ b/src/stream/try_stream/try_flatten.rs @@ -3,34 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct TryFlatten<St> where St: TryStream, { + #[pin] stream: St, + #[pin] next: Option<St::Ok>, } -impl<St> Unpin for TryFlatten<St> -where - St: TryStream + Unpin, - St::Ok: Unpin, -{ -} - -impl<St> TryFlatten<St> -where - St: TryStream, -{ - unsafe_pinned!(stream: St); - unsafe_pinned!(next: Option<St::Ok>); -} - impl<St> TryFlatten<St> where St: TryStream, @@ -41,37 +29,7 @@ where Self { stream, next: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St> FusedStream for TryFlatten<St> @@ -93,27 +51,23 @@ where { type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - loop { - if self.next.is_none() { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => self.as_mut().next().set(Some(e)), - None => return Poll::Ready(None), + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let TryFlatten { mut stream, mut next } = self.project(); + Poll::Ready(loop { + if let Some(s) = next.as_mut().as_pin_mut() { + if let Some(item) = ready!(s.try_poll_next(cx)?) { + break Some(Ok(item)); + } else { + next.set(None); } - } - - if let Some(item) = ready!(self - .as_mut() - .next() - .as_pin_mut() - .unwrap() - .try_poll_next(cx)?) - { - return Poll::Ready(Some(Ok(item))); + } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) { + next.set(Some(s)); } else { - self.as_mut().next().set(None); + break None; } - } + }) } } diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index b8b8dc2..d85c1fe 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -3,19 +3,20 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryFold<St, Fut, T, F> { + #[pin] stream: St, f: F, accum: Option<T>, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, T, F> Unpin for TryFold<St, Fut, T, F> {} - impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> where St: fmt::Debug, @@ -36,11 +37,6 @@ where St: TryStream, F: FnMut(T, St::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = St::Error>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option<T>); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> { TryFold { stream, @@ -68,43 +64,31 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> { type Output = Result<T, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - // we're currently processing a future to produce a new accum value - if self.accum.is_none() { - let accum = match ready!( - self.as_mut().future().as_pin_mut() - .expect("TryFold polled after completion") - .try_poll(cx) - ) { - Ok(accum) => accum, - Err(e) => { - // Indicate that the future can no longer be polled. - self.as_mut().future().set(None); - return Poll::Ready(Err(e)); - } - }; - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); - } - - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { - Some(Ok(item)) => Some(item), - Some(Err(e)) => { - // Indicate that the future can no longer be polled. - *self.as_mut().accum() = None; - return Poll::Ready(Err(e)); + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryFold { mut stream, f, accum, mut future } = self.project(); + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let res = ready!(fut.try_poll(cx)); + future.set(None); + match res { + Ok(a) => *accum = Some(a), + Err(e) => break Err(e), + } + } else if accum.is_some() { + // we're waiting on a new item from the stream + let res = ready!(stream.as_mut().try_poll_next(cx)); + let a = accum.take().unwrap(); + match res { + Some(Ok(item)) => future.set(Some(f(a, item))), + Some(Err(e)) => break Err(e), + None => break Ok(a), } - None => None, - }; - let accum = self.as_mut().accum().take().unwrap(); - - if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); } else { - return Poll::Ready(Ok(accum)) + panic!("Fold polled after completion") } - } + }) } } diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs index 2c71107..5fc91df 100644 --- a/src/stream/try_stream/try_for_each.rs +++ b/src/stream/try_stream/try_for_each.rs @@ -3,18 +3,19 @@ use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEach<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, F> Unpin for TryForEach<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F> where St: fmt::Debug, @@ -33,10 +34,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = St::Error>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> { TryForEach { stream, @@ -53,20 +50,21 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F> { type Output = Result<(), St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryForEach { mut stream, f, mut future } = self.project(); loop { - if let Some(future) = self.as_mut().future().as_pin_mut() { - ready!(future.try_poll(cx))?; - } - self.as_mut().future().set(None); - - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => { - let future = (self.as_mut().f())(e); - self.as_mut().future().set(Some(future)); + if let Some(fut) = future.as_mut().as_pin_mut() { + ready!(fut.try_poll(cx))?; + future.set(None); + } else { + match ready!(stream.as_mut().try_poll_next(cx)?) { + Some(e) => future.set(Some(f(e))), + None => break, } - None => return Poll::Ready(Ok(())), } } + Poll::Ready(Ok(())) } } diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index 19c3e5b..87fd465 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -6,24 +6,21 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) /// method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEachConcurrent<St, Fut, F> { + #[pin] stream: Option<St>, f: F, futures: FuturesUnordered<Fut>, limit: Option<NonZeroUsize>, } -impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F> -where St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> where St: fmt::Debug, @@ -53,11 +50,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: Future<Output = Result<(), St::Error>>, { - unsafe_pinned!(stream: Option<St>); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered<Fut>); - unsafe_unpinned!(limit: Option<NonZeroUsize>); - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> { TryForEachConcurrent { stream: Some(stream), @@ -76,15 +68,16 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> { type Output = Result<(), St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryForEachConcurrent { mut stream, f, futures, limit } = self.project(); loop { let mut made_progress_this_iter = false; - // Try and pull an item from the stream - let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` - if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { - let poll_res = match self.as_mut().stream().as_pin_mut() { + if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { + let poll_res = match stream.as_mut().as_pin_mut() { Some(stream) => stream.try_poll_next(cx), None => Poll::Ready(None), }; @@ -95,29 +88,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Some(elem) }, Poll::Ready(None) => { - self.as_mut().stream().set(None); + stream.set(None); None } Poll::Pending => None, Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } }; if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + futures.push(f(elem)); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match futures.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { - if self.stream.is_none() { + if stream.is_none() { return Poll::Ready(Ok(())) } }, @@ -125,8 +117,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } } diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index a3d6803..624380f 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TrySkipWhile<St, Fut, F> where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Ok>, done_skipping: bool, } -impl<St: Unpin + TryStream, Fut: Unpin, F> Unpin for TrySkipWhile<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F> where St: TryStream + fmt::Debug, @@ -38,20 +39,9 @@ where impl<St, Fut, F> TrySkipWhile<St, Fut, F> where St: TryStream, -{ - unsafe_pinned!(stream: St); -} - -impl<St, Fut, F> TrySkipWhile<St, Fut, F> - where St: TryStream, F: FnMut(&St::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = St::Error>, { - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Ok>); - unsafe_unpinned!(done_skipping: bool); - pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> { TrySkipWhile { stream, @@ -62,37 +52,7 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> @@ -102,34 +62,34 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> { type Item = Result<St::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.done_skipping { - return self.as_mut().stream().try_poll_next(cx); - } + #[project] + let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project(); - loop { - if self.pending_item.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(e) => e, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?); - let item = self.as_mut().pending_item().take().unwrap(); - self.as_mut().pending_fut().set(None); + if *done_skipping { + return stream.try_poll_next(cx); + } - if !skipped { - *self.as_mut().done_skipping() = true; - return Poll::Ready(Some(Ok(item))) + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let skipped = ready!(fut.try_poll(cx)?); + let item = pending_item.take(); + pending_fut.set(None); + if !skipped { + *done_skipping = true; + break item.map(Ok); + } + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index 6266274..8da1248 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. /// @@ -67,15 +67,15 @@ where } /// Stream for the [`try_unfold`] function. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryUnfold<T, F, Fut> { f: F, state: Option<T>, + #[pin] fut: Option<Fut>, } -impl<T, F, Fut: Unpin> Unpin for TryUnfold<T, F, Fut> {} - impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> where T: fmt::Debug, @@ -89,12 +89,6 @@ where } } -impl<T, F, Fut> TryUnfold<T, F, Fut> { - unsafe_unpinned!(f: F); - unsafe_unpinned!(state: Option<T>); - unsafe_pinned!(fut: Option<Fut>); -} - impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, @@ -102,27 +96,30 @@ where { type Item = Result<Item, Fut::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Item, Fut::Error>>> { - if let Some(state) = self.as_mut().state().take() { - let fut = (self.as_mut().f())(state); - self.as_mut().fut().set(Some(fut)); + #[project] + let TryUnfold {f, state, mut fut } = self.project(); + + if let Some(state) = state.take() { + fut.set(Some(f(state))); } - match self.as_mut().fut().as_pin_mut() { + match fut.as_mut().as_pin_mut() { None => { // The future previously errored Poll::Ready(None) } - Some(fut) => { - let step = ready!(fut.try_poll(cx)); - self.as_mut().fut().set(None); + Some(future) => { + let step = ready!(future.try_poll(cx)); + fut.set(None); match step { Ok(Some((item, next_state))) => { - *self.as_mut().state() = Some(next_state); + *state = Some(next_state); Poll::Ready(Some(Ok(item))) } Ok(None) => Poll::Ready(None), |