aboutsummaryrefslogtreecommitdiff
path: root/src/stream/try_stream
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/try_stream')
-rw-r--r--src/stream/try_stream/and_then.rs74
-rw-r--r--src/stream/try_stream/err_into.rs98
-rw-r--r--src/stream/try_stream/inspect_err.rs118
-rw-r--r--src/stream/try_stream/inspect_ok.rs118
-rw-r--r--src/stream/try_stream/into_stream.rs40
-rw-r--r--src/stream/try_stream/map_err.rs112
-rw-r--r--src/stream/try_stream/map_ok.rs112
-rw-r--r--src/stream/try_stream/mod.rs58
-rw-r--r--src/stream/try_stream/or_else.rs79
-rw-r--r--src/stream/try_stream/try_buffer_unordered.rs59
-rw-r--r--src/stream/try_stream/try_collect.rs28
-rw-r--r--src/stream/try_stream/try_concat.rs36
-rw-r--r--src/stream/try_stream/try_filter.rs83
-rw-r--r--src/stream/try_stream/try_filter_map.rs80
-rw-r--r--src/stream/try_stream/try_flatten.rs84
-rw-r--r--src/stream/try_stream/try_fold.rs70
-rw-r--r--src/stream/try_stream/try_for_each.rs34
-rw-r--r--src/stream/try_stream/try_for_each_concurrent.rs42
-rw-r--r--src/stream/try_stream/try_skip_while.rs94
-rw-r--r--src/stream/try_stream/try_unfold.rs33
20 files changed, 300 insertions, 1152 deletions
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index 809c32a..563ed34 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct AndThen<St, Fut, F> {
+ #[pin]
stream: St,
+ #[pin]
future: Option<Fut>,
f: F,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for AndThen<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
where
St: fmt::Debug,
@@ -30,12 +31,6 @@ where
}
}
-impl<St, Fut, F> AndThen<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_pinned!(future: Option<Fut>);
- unsafe_unpinned!(f: F);
-}
-
impl<St, Fut, F> AndThen<St, Fut, F>
where St: TryStream,
F: FnMut(St::Ok) -> Fut,
@@ -45,37 +40,7 @@ impl<St, Fut, F> AndThen<St, Fut, F>
Self { stream, future: None, f }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for AndThen<St, Fut, F>
@@ -85,22 +50,25 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
{
type Item = Result<Fut::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- None => return Poll::Ready(None),
- Some(e) => e,
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().future().set(Some(fut));
- }
-
- let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().future().set(None);
- Poll::Ready(Some(e))
+ #[project]
+ let AndThen { mut stream, mut future, f } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ future.set(None);
+ break Some(item);
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ future.set(Some(f(item)));
+ } else {
+ break None;
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/err_into.rs b/src/stream/try_stream/err_into.rs
deleted file mode 100644
index f5d9294..0000000
--- a/src/stream/try_stream/err_into.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-use core::marker::PhantomData;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
-
-/// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct ErrInto<St, E> {
- stream: St,
- _marker: PhantomData<E>,
-}
-
-impl<St: Unpin, E> Unpin for ErrInto<St, E> {}
-
-impl<St, E> ErrInto<St, E> {
- unsafe_pinned!(stream: St);
-
- pub(super) fn new(stream: St) -> Self {
- ErrInto { stream, _marker: PhantomData }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, E> FusedStream for ErrInto<St, E>
-where
- St: TryStream + FusedStream,
- St::Error: Into<E>,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, E> Stream for ErrInto<St, E>
-where
- St: TryStream,
- St::Error: Into<E>,
-{
- type Item = Result<St::Ok, E>;
-
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.stream().try_poll_next(cx)
- .map(|res| res.map(|some| some.map_err(Into::into)))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, E, Item> Sink<Item> for ErrInto<S, E>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/inspect_err.rs b/src/stream/try_stream/inspect_err.rs
deleted file mode 100644
index 3c23ae0..0000000
--- a/src/stream/try_stream/inspect_err.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-use crate::stream::stream::inspect;
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct InspectErr<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for InspectErr<St, F> {}
-
-impl<St, F> fmt::Debug for InspectErr<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("InspectErr")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> InspectErr<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-}
-
-impl<St, F> InspectErr<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Error),
-{
- pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F> FusedStream for InspectErr<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(&St::Error),
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F> Stream for InspectErr<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Error),
-{
- type Item = Result<St::Ok, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map_err(|e| inspect(e, self.as_mut().f()))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for InspectErr<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/inspect_ok.rs b/src/stream/try_stream/inspect_ok.rs
deleted file mode 100644
index 89fb459..0000000
--- a/src/stream/try_stream/inspect_ok.rs
+++ /dev/null
@@ -1,118 +0,0 @@
-use crate::stream::stream::inspect;
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct InspectOk<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for InspectOk<St, F> {}
-
-impl<St, F> fmt::Debug for InspectOk<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("InspectOk")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> InspectOk<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-}
-
-impl<St, F> InspectOk<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Ok),
-{
- pub(super) fn new(stream: St, f: F) -> Self {
- Self { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F> FusedStream for InspectOk<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(&St::Ok),
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F> Stream for InspectOk<St, F>
-where
- St: TryStream,
- F: FnMut(&St::Ok),
-{
- type Item = Result<St::Ok, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map(|e| inspect(e, self.as_mut().f()))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for InspectOk<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs
index b0fa07a..370a327 100644
--- a/src/stream/try_stream/into_stream.rs
+++ b/src/stream/try_stream/into_stream.rs
@@ -3,54 +3,24 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::pin_project;
/// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct IntoStream<St> {
+ #[pin]
stream: St,
}
impl<St> IntoStream<St> {
- unsafe_pinned!(stream: St);
-
#[inline]
pub(super) fn new(stream: St) -> Self {
IntoStream { stream }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> {
@@ -67,7 +37,7 @@ impl<St: TryStream> Stream for IntoStream<St> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- self.stream().try_poll_next(cx)
+ self.project().stream.try_poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/map_err.rs b/src/stream/try_stream/map_err.rs
deleted file mode 100644
index 1b98d6b..0000000
--- a/src/stream/try_stream/map_err.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct MapErr<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for MapErr<St, F> {}
-
-impl<St, F> fmt::Debug for MapErr<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MapErr")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> MapErr<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- /// Creates a new MapErr.
- pub(super) fn new(stream: St, f: F) -> Self {
- MapErr { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F, E> FusedStream for MapErr<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(St::Error) -> E,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F, E> Stream for MapErr<St, F>
-where
- St: TryStream,
- F: FnMut(St::Error) -> E,
-{
- type Item = Result<St::Ok, E>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map_err(|e| self.as_mut().f()(e))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for MapErr<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/map_ok.rs b/src/stream/try_stream/map_ok.rs
deleted file mode 100644
index 19d01be..0000000
--- a/src/stream/try_stream/map_ok.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-use core::fmt;
-use core::pin::Pin;
-use futures_core::stream::{FusedStream, Stream, TryStream};
-use futures_core::task::{Context, Poll};
-#[cfg(feature = "sink")]
-use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
-
-/// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
-#[must_use = "streams do nothing unless polled"]
-pub struct MapOk<St, F> {
- stream: St,
- f: F,
-}
-
-impl<St: Unpin, F> Unpin for MapOk<St, F> {}
-
-impl<St, F> fmt::Debug for MapOk<St, F>
-where
- St: fmt::Debug,
-{
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MapOk")
- .field("stream", &self.stream)
- .finish()
- }
-}
-
-impl<St, F> MapOk<St, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
-
- /// Creates a new MapOk.
- pub(super) fn new(stream: St, f: F) -> Self {
- MapOk { stream, f }
- }
-
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
-}
-
-impl<St, F, T> FusedStream for MapOk<St, F>
-where
- St: TryStream + FusedStream,
- F: FnMut(St::Ok) -> T,
-{
- fn is_terminated(&self) -> bool {
- self.stream.is_terminated()
- }
-}
-
-impl<St, F, T> Stream for MapOk<St, F>
-where
- St: TryStream,
- F: FnMut(St::Ok) -> T,
-{
- type Item = Result<T, St::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.as_mut()
- .stream()
- .try_poll_next(cx)
- .map(|opt| opt.map(|res| res.map(|x| self.as_mut().f()(x))))
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
-// Forwarding impl of Sink from the underlying stream
-#[cfg(feature = "sink")]
-impl<S, F, Item> Sink<Item> for MapOk<S, F>
-where
- S: Sink<Item>,
-{
- type Error = S::Error;
-
- delegate_sink!(stream, Item);
-}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 6a7ced4..99d5a6d 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -11,34 +11,53 @@ use futures_core::{
stream::TryStream,
task::{Context, Poll},
};
+use crate::fns::{
+ InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn,
+};
+use crate::stream::{Map, Inspect};
mod and_then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::and_then::AndThen;
-mod err_into;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::err_into::ErrInto;
-
-mod inspect_ok;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::inspect_ok::InspectOk;
-
-mod inspect_err;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::inspect_err::InspectErr;
+delegate_all!(
+ /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
+ ErrInto<St, E>(
+ MapErr<St, IntoFn<E>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
+ InspectOk<St, F>(
+ Inspect<IntoStream<St>, InspectOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
+);
+
+delegate_all!(
+ /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
+ InspectErr<St, F>(
+ Inspect<IntoStream<St>, InspectErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
+);
mod into_stream;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_stream::IntoStream;
-mod map_ok;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::map_ok::MapOk;
+delegate_all!(
+ /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
+ MapOk<St, F>(
+ Map<IntoStream<St>, MapOkFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
+);
-mod map_err;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::map_err::MapErr;
+delegate_all!(
+ /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
+ MapErr<St, F>(
+ Map<IntoStream<St>, MapErrFn<F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
+);
mod or_else;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -385,8 +404,9 @@ pub trait TryStreamExt: TryStream {
/// Skip elements on this stream while the provided asynchronous predicate
/// resolves to `true`.
///
- /// This function is similar to [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while)
- /// but exits early if an error occurs.
+ /// This function is similar to
+ /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
+ /// early if an error occurs.
///
/// # Examples
///
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index 33310d1..0bba0d0 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct OrElse<St, Fut, F> {
+ #[pin]
stream: St,
+ #[pin]
future: Option<Fut>,
f: F,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for OrElse<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
where
St: fmt::Debug,
@@ -30,12 +31,6 @@ where
}
}
-impl<St, Fut, F> OrElse<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_pinned!(future: Option<Fut>);
- unsafe_unpinned!(f: F);
-}
-
impl<St, Fut, F> OrElse<St, Fut, F>
where St: TryStream,
F: FnMut(St::Error) -> Fut,
@@ -45,37 +40,7 @@ impl<St, Fut, F> OrElse<St, Fut, F>
Self { stream, future: None, f }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for OrElse<St, Fut, F>
@@ -85,23 +50,29 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
{
type Item = Result<St::Ok, Fut::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.future.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
- None => return Poll::Ready(None),
- Some(Ok(e)) => return Poll::Ready(Some(Ok(e))),
- Some(Err(e)) => e,
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().future().set(Some(fut));
- }
-
- let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().future().set(None);
- Poll::Ready(Some(e))
+ #[project]
+ let OrElse { mut stream, mut future, f } = self.project();
+
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ let item = ready!(fut.try_poll(cx));
+ future.set(None);
+ break Some(item);
+ } else {
+ match ready!(stream.as_mut().try_poll_next(cx)) {
+ Some(Ok(item)) => break Some(Ok(item)),
+ Some(Err(e)) => {
+ future.set(Some(f(e)));
+ },
+ None => break None,
+ }
+ }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs
index d11e1b4..566868b 100644
--- a/src/stream/try_stream/try_buffer_unordered.rs
+++ b/src/stream/try_stream/try_buffer_unordered.rs
@@ -5,32 +5,27 @@ use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
use core::pin::Pin;
/// Stream for the
/// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryBufferUnordered<St>
where St: TryStream
{
+ #[pin]
stream: Fuse<IntoStream<St>>,
in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
max: usize,
}
-impl<St> Unpin for TryBufferUnordered<St>
- where St: TryStream + Unpin
-{}
-
impl<St> TryBufferUnordered<St>
where St: TryStream,
St::Ok: TryFuture,
{
- unsafe_pinned!(stream: Fuse<IntoStream<St>>);
- unsafe_unpinned!(in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>);
-
pub(super) fn new(stream: St, n: usize) -> Self {
TryBufferUnordered {
stream: IntoStream::new(stream).fuse(),
@@ -39,37 +34,7 @@ impl<St> TryBufferUnordered<St>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- self.stream.get_ref().get_ref()
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- self.stream.get_mut().get_mut()
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream().get_pin_mut().get_pin_mut()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream.into_inner().into_inner()
- }
+ delegate_access_inner!(stream, St, (. .));
}
impl<St> Stream for TryBufferUnordered<St>
@@ -78,27 +43,31 @@ impl<St> Stream for TryBufferUnordered<St>
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ #[project]
+ let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project();
+
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures. Propagate errors from the stream immediately.
- while self.in_progress_queue.len() < self.max {
- match self.as_mut().stream().poll_next(cx)? {
- Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut.into_future()),
+ while in_progress_queue.len() < *max {
+ match stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
// Attempt to pull the next value from the in_progress_queue
- match self.as_mut().in_progress_queue().poll_next_unpin(cx) {
+ match in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
// If more values are still coming from the stream, we're not done yet
- if self.stream.is_done() {
+ if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index d22e8e8..3c9aee2 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -3,34 +3,27 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, TryStream};
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollect<St, C> {
+ #[pin]
stream: St,
items: C,
}
impl<St: TryStream, C: Default> TryCollect<St, C> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(items: C);
-
pub(super) fn new(s: St) -> TryCollect<St, C> {
TryCollect {
stream: s,
items: Default::default(),
}
}
-
- fn finish(self: Pin<&mut Self>) -> C {
- mem::replace(self.items(), Default::default())
- }
}
-impl<St: Unpin + TryStream, C> Unpin for TryCollect<St, C> {}
-
impl<St, C> FusedFuture for TryCollect<St, C>
where
St: TryStream + FusedStream,
@@ -48,15 +41,18 @@ where
{
type Output = Result<C, St::Error>;
+ #[project]
fn poll(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- loop {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => self.as_mut().items().extend(Some(x)),
- None => return Poll::Ready(Ok(self.as_mut().finish())),
+ #[project]
+ let TryCollect { mut stream, items } = self.project();
+ Poll::Ready(Ok(loop {
+ match ready!(stream.as_mut().try_poll_next(cx)?) {
+ Some(x) => items.extend(Some(x)),
+ None => break mem::replace(items, Default::default()),
}
- }
+ }))
}
}
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 395f166..8c9710b 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -2,26 +2,23 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryConcat<St: TryStream> {
+ #[pin]
stream: St,
accum: Option<St::Ok>,
}
-impl<St: TryStream + Unpin> Unpin for TryConcat<St> {}
-
impl<St> TryConcat<St>
where
St: TryStream,
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(accum: Option<St::Ok>);
-
pub(super) fn new(stream: St) -> TryConcat<St> {
TryConcat {
stream,
@@ -37,21 +34,20 @@ where
{
type Output = Result<St::Ok, St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => {
- let accum = self.as_mut().accum();
- if let Some(a) = accum {
- a.extend(x)
- } else {
- *accum = Some(x)
- }
- },
- None => {
- return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default()))
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryConcat { mut stream, accum } = self.project();
+ Poll::Ready(Ok(loop {
+ if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ if let Some(a) = accum {
+ a.extend(x)
+ } else {
+ *accum = Some(x)
}
+ } else {
+ break accum.take().unwrap_or_default();
}
- }
+ }))
}
}
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 24a9c32..310f991 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -5,24 +5,23 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryFilter<St, Fut, F>
where St: TryStream
{
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
}
-impl<St, Fut, F> Unpin for TryFilter<St, Fut, F>
- where St: TryStream + Unpin, Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F>
where
St: TryStream + fmt::Debug,
@@ -41,11 +40,6 @@ where
impl<St, Fut, F> TryFilter<St, Fut, F>
where St: TryStream
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Ok>);
-
pub(super) fn new(stream: St, f: F) -> Self {
TryFilter {
stream,
@@ -55,37 +49,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F>
@@ -105,29 +69,28 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<St::Ok, St::Error>>> {
- loop {
- if self.pending_fut.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => x,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx));
- self.as_mut().pending_fut().set(None);
- let item = self.as_mut().pending_item().take().unwrap();
-
- if yield_item {
- return Poll::Ready(Some(Ok(item)));
+ #[project]
+ let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let res = ready!(fut.poll(cx));
+ pending_fut.set(None);
+ if res {
+ break pending_item.take().map(Ok);
+ }
+ *pending_item = None;
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index ed7eeb2..ba8e43a 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -5,21 +5,20 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryFilterMap<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending: Option<Fut>,
}
-impl<St, Fut, F> Unpin for TryFilterMap<St, Fut, F>
- where St: Unpin, Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F>
where
St: fmt::Debug,
@@ -34,45 +33,11 @@ where
}
impl<St, Fut, F> TryFilterMap<St, Fut, F> {
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> Self {
TryFilterMap { stream, f, pending: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F>
@@ -92,26 +57,29 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
{
type Item = Result<T, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<T, St::Error>>> {
- loop {
- if self.pending.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(x) => x,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(item);
- self.as_mut().pending().set(Some(fut));
- }
-
- let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx));
- self.as_mut().pending().set(None);
- if let Some(x) = result? {
- return Poll::Ready(Some(Ok(x)));
+ #[project]
+ let TryFilterMap { mut stream, f, mut pending } = self.project();
+ Poll::Ready(loop {
+ if let Some(p) = pending.as_mut().as_pin_mut() {
+ // We have an item in progress, poll that until it's done
+ let item = ready!(p.try_poll(cx)?);
+ pending.set(None);
+ if item.is_some() {
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ // No item in progress, but the stream is still going
+ pending.set(Some(f(item)));
+ } else {
+ // The stream is done
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index 5f81b22..a528639 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -3,34 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlatten<St>
where
St: TryStream,
{
+ #[pin]
stream: St,
+ #[pin]
next: Option<St::Ok>,
}
-impl<St> Unpin for TryFlatten<St>
-where
- St: TryStream + Unpin,
- St::Ok: Unpin,
-{
-}
-
-impl<St> TryFlatten<St>
-where
- St: TryStream,
-{
- unsafe_pinned!(stream: St);
- unsafe_pinned!(next: Option<St::Ok>);
-}
-
impl<St> TryFlatten<St>
where
St: TryStream,
@@ -41,37 +29,7 @@ where
Self { stream, next: None }
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St> FusedStream for TryFlatten<St>
@@ -93,27 +51,23 @@ where
{
type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- loop {
- if self.next.is_none() {
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => self.as_mut().next().set(Some(e)),
- None => return Poll::Ready(None),
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let TryFlatten { mut stream, mut next } = self.project();
+ Poll::Ready(loop {
+ if let Some(s) = next.as_mut().as_pin_mut() {
+ if let Some(item) = ready!(s.try_poll_next(cx)?) {
+ break Some(Ok(item));
+ } else {
+ next.set(None);
}
- }
-
- if let Some(item) = ready!(self
- .as_mut()
- .next()
- .as_pin_mut()
- .unwrap()
- .try_poll_next(cx)?)
- {
- return Poll::Ready(Some(Ok(item)));
+ } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ next.set(Some(s));
} else {
- self.as_mut().next().set(None);
+ break None;
}
- }
+ })
}
}
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs
index b8b8dc2..d85c1fe 100644
--- a/src/stream/try_stream/try_fold.rs
+++ b/src/stream/try_stream/try_fold.rs
@@ -3,19 +3,20 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryFold<St, Fut, T, F> {
+ #[pin]
stream: St,
f: F,
accum: Option<T>,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin, Fut: Unpin, T, F> Unpin for TryFold<St, Fut, T, F> {}
-
impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F>
where
St: fmt::Debug,
@@ -36,11 +37,6 @@ where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(accum: Option<T>);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> {
TryFold {
stream,
@@ -68,43 +64,31 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F>
{
type Output = Result<T, St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- // we're currently processing a future to produce a new accum value
- if self.accum.is_none() {
- let accum = match ready!(
- self.as_mut().future().as_pin_mut()
- .expect("TryFold polled after completion")
- .try_poll(cx)
- ) {
- Ok(accum) => accum,
- Err(e) => {
- // Indicate that the future can no longer be polled.
- self.as_mut().future().set(None);
- return Poll::Ready(Err(e));
- }
- };
- *self.as_mut().accum() = Some(accum);
- self.as_mut().future().set(None);
- }
-
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)) {
- Some(Ok(item)) => Some(item),
- Some(Err(e)) => {
- // Indicate that the future can no longer be polled.
- *self.as_mut().accum() = None;
- return Poll::Ready(Err(e));
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryFold { mut stream, f, accum, mut future } = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ let res = ready!(fut.try_poll(cx));
+ future.set(None);
+ match res {
+ Ok(a) => *accum = Some(a),
+ Err(e) => break Err(e),
+ }
+ } else if accum.is_some() {
+ // we're waiting on a new item from the stream
+ let res = ready!(stream.as_mut().try_poll_next(cx));
+ let a = accum.take().unwrap();
+ match res {
+ Some(Ok(item)) => future.set(Some(f(a, item))),
+ Some(Err(e)) => break Err(e),
+ None => break Ok(a),
}
- None => None,
- };
- let accum = self.as_mut().accum().take().unwrap();
-
- if let Some(e) = item {
- let future = (self.as_mut().f())(accum, e);
- self.as_mut().future().set(Some(future));
} else {
- return Poll::Ready(Ok(accum))
+ panic!("Fold polled after completion")
}
- }
+ })
}
}
diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs
index 2c71107..5fc91df 100644
--- a/src/stream/try_stream/try_for_each.rs
+++ b/src/stream/try_stream/try_for_each.rs
@@ -3,18 +3,19 @@ use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEach<St, Fut, F> {
+ #[pin]
stream: St,
f: F,
+ #[pin]
future: Option<Fut>,
}
-impl<St: Unpin, Fut: Unpin, F> Unpin for TryForEach<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F>
where
St: fmt::Debug,
@@ -33,10 +34,6 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
{
- unsafe_pinned!(stream: St);
- unsafe_unpinned!(f: F);
- unsafe_pinned!(future: Option<Fut>);
-
pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> {
TryForEach {
stream,
@@ -53,20 +50,21 @@ impl<St, Fut, F> Future for TryForEach<St, Fut, F>
{
type Output = Result<(), St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryForEach { mut stream, f, mut future } = self.project();
loop {
- if let Some(future) = self.as_mut().future().as_pin_mut() {
- ready!(future.try_poll(cx))?;
- }
- self.as_mut().future().set(None);
-
- match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => {
- let future = (self.as_mut().f())(e);
- self.as_mut().future().set(Some(future));
+ if let Some(fut) = future.as_mut().as_pin_mut() {
+ ready!(fut.try_poll(cx))?;
+ future.set(None);
+ } else {
+ match ready!(stream.as_mut().try_poll_next(cx)?) {
+ Some(e) => future.set(Some(f(e))),
+ None => break,
}
- None => return Poll::Ready(Ok(())),
}
}
+ Poll::Ready(Ok(()))
}
}
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs
index 19c3e5b..87fd465 100644
--- a/src/stream/try_stream/try_for_each_concurrent.rs
+++ b/src/stream/try_stream/try_for_each_concurrent.rs
@@ -6,24 +6,21 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Future for the
/// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
/// method.
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForEachConcurrent<St, Fut, F> {
+ #[pin]
stream: Option<St>,
f: F,
futures: FuturesUnordered<Fut>,
limit: Option<NonZeroUsize>,
}
-impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F>
-where St: Unpin,
- Fut: Unpin,
-{}
-
impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
where
St: fmt::Debug,
@@ -53,11 +50,6 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: Future<Output = Result<(), St::Error>>,
{
- unsafe_pinned!(stream: Option<St>);
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(futures: FuturesUnordered<Fut>);
- unsafe_unpinned!(limit: Option<NonZeroUsize>);
-
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> {
TryForEachConcurrent {
stream: Some(stream),
@@ -76,15 +68,16 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
{
type Output = Result<(), St::Error>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ let TryForEachConcurrent { mut stream, f, futures, limit } = self.project();
loop {
let mut made_progress_this_iter = false;
- // Try and pull an item from the stream
- let current_len = self.futures.len();
// Check if we've already created a number of futures greater than `limit`
- if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) {
- let poll_res = match self.as_mut().stream().as_pin_mut() {
+ if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
+ let poll_res = match stream.as_mut().as_pin_mut() {
Some(stream) => stream.try_poll_next(cx),
None => Poll::Ready(None),
};
@@ -95,29 +88,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Some(elem)
},
Poll::Ready(None) => {
- self.as_mut().stream().set(None);
+ stream.set(None);
None
}
Poll::Pending => None,
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- self.as_mut().stream().set(None);
- drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new()));
+ stream.set(None);
+ drop(mem::replace(futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
};
if let Some(elem) = elem {
- let next_future = (self.as_mut().f())(elem);
- self.as_mut().futures().push(next_future);
+ futures.push(f(elem));
}
}
- match self.as_mut().futures().poll_next_unpin(cx) {
+ match futures.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
Poll::Ready(None) => {
- if self.stream.is_none() {
+ if stream.is_none() {
return Poll::Ready(Ok(()))
}
},
@@ -125,8 +117,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F>
Poll::Ready(Some(Err(e))) => {
// Empty the stream and futures so that we know
// the future has completed.
- self.as_mut().stream().set(None);
- drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new()));
+ stream.set(None);
+ drop(mem::replace(futures, FuturesUnordered::new()));
return Poll::Ready(Err(e));
}
}
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index a3d6803..624380f 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
/// method.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
+ #[pin]
stream: St,
f: F,
+ #[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Ok>,
done_skipping: bool,
}
-impl<St: Unpin + TryStream, Fut: Unpin, F> Unpin for TrySkipWhile<St, Fut, F> {}
-
impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
where
St: TryStream + fmt::Debug,
@@ -38,20 +39,9 @@ where
impl<St, Fut, F> TrySkipWhile<St, Fut, F>
where St: TryStream,
-{
- unsafe_pinned!(stream: St);
-}
-
-impl<St, Fut, F> TrySkipWhile<St, Fut, F>
- where St: TryStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
- unsafe_unpinned!(f: F);
- unsafe_pinned!(pending_fut: Option<Fut>);
- unsafe_unpinned!(pending_item: Option<St::Ok>);
- unsafe_unpinned!(done_skipping: bool);
-
pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> {
TrySkipWhile {
stream,
@@ -62,37 +52,7 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F>
}
}
- /// Acquires a reference to the underlying stream that this combinator is
- /// pulling from.
- pub fn get_ref(&self) -> &St {
- &self.stream
- }
-
- /// Acquires a mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_mut(&mut self) -> &mut St {
- &mut self.stream
- }
-
- /// Acquires a pinned mutable reference to the underlying stream that this
- /// combinator is pulling from.
- ///
- /// Note that care must be taken to avoid tampering with the state of the
- /// stream which may otherwise confuse this combinator.
- pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
- self.stream()
- }
-
- /// Consumes this combinator, returning the underlying stream.
- ///
- /// Note that this may discard intermediate state of this combinator, so
- /// care should be taken to avoid losing resources when this is called.
- pub fn into_inner(self) -> St {
- self.stream
- }
+ delegate_access_inner!(stream, St, ());
}
impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
@@ -102,34 +62,34 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
{
type Item = Result<St::Ok, St::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.done_skipping {
- return self.as_mut().stream().try_poll_next(cx);
- }
+ #[project]
+ let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
- loop {
- if self.pending_item.is_none() {
- let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) {
- Some(e) => e,
- None => return Poll::Ready(None),
- };
- let fut = (self.as_mut().f())(&item);
- self.as_mut().pending_fut().set(Some(fut));
- *self.as_mut().pending_item() = Some(item);
- }
-
- let skipped = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().try_poll(cx)?);
- let item = self.as_mut().pending_item().take().unwrap();
- self.as_mut().pending_fut().set(None);
+ if *done_skipping {
+ return stream.try_poll_next(cx);
+ }
- if !skipped {
- *self.as_mut().done_skipping() = true;
- return Poll::Ready(Some(Ok(item)))
+ Poll::Ready(loop {
+ if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+ let skipped = ready!(fut.try_poll(cx)?);
+ let item = pending_item.take();
+ pending_fut.set(None);
+ if !skipped {
+ *done_skipping = true;
+ break item.map(Ok);
+ }
+ } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+ pending_fut.set(Some(f(&item)));
+ *pending_item = Some(item);
+ } else {
+ break None;
}
- }
+ })
}
fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index 6266274..8da1248 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -3,7 +3,7 @@ use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use pin_project::{pin_project, project};
/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
///
@@ -67,15 +67,15 @@ where
}
/// Stream for the [`try_unfold`] function.
+#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TryUnfold<T, F, Fut> {
f: F,
state: Option<T>,
+ #[pin]
fut: Option<Fut>,
}
-impl<T, F, Fut: Unpin> Unpin for TryUnfold<T, F, Fut> {}
-
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
where
T: fmt::Debug,
@@ -89,12 +89,6 @@ where
}
}
-impl<T, F, Fut> TryUnfold<T, F, Fut> {
- unsafe_unpinned!(f: F);
- unsafe_unpinned!(state: Option<T>);
- unsafe_pinned!(fut: Option<Fut>);
-}
-
impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
@@ -102,27 +96,30 @@ where
{
type Item = Result<Item, Fut::Error>;
+ #[project]
fn poll_next(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Fut::Error>>> {
- if let Some(state) = self.as_mut().state().take() {
- let fut = (self.as_mut().f())(state);
- self.as_mut().fut().set(Some(fut));
+ #[project]
+ let TryUnfold {f, state, mut fut } = self.project();
+
+ if let Some(state) = state.take() {
+ fut.set(Some(f(state)));
}
- match self.as_mut().fut().as_pin_mut() {
+ match fut.as_mut().as_pin_mut() {
None => {
// The future previously errored
Poll::Ready(None)
}
- Some(fut) => {
- let step = ready!(fut.try_poll(cx));
- self.as_mut().fut().set(None);
+ Some(future) => {
+ let step = ready!(future.try_poll(cx));
+ fut.set(None);
match step {
Ok(Some((item, next_state))) => {
- *self.as_mut().state() = Some(next_state);
+ *state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),