diff options
Diffstat (limited to 'src/stream/try_stream/try_concat.rs')
-rw-r--r-- | src/stream/try_stream/try_concat.rs | 36 |
1 files changed, 16 insertions, 20 deletions
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 395f166..8c9710b 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -2,26 +2,23 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. +#[pin_project] #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryConcat<St: TryStream> { + #[pin] stream: St, accum: Option<St::Ok>, } -impl<St: TryStream + Unpin> Unpin for TryConcat<St> {} - impl<St> TryConcat<St> where St: TryStream, St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(accum: Option<St::Ok>); - pub(super) fn new(stream: St) -> TryConcat<St> { TryConcat { stream, @@ -37,21 +34,20 @@ where { type Output = Result<St::Ok, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => { - let accum = self.as_mut().accum(); - if let Some(a) = accum { - a.extend(x) - } else { - *accum = Some(x) - } - }, - None => { - return Poll::Ready(Ok(self.as_mut().accum().take().unwrap_or_default())) + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryConcat { mut stream, accum } = self.project(); + Poll::Ready(Ok(loop { + if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) { + if let Some(a) = accum { + a.extend(x) + } else { + *accum = Some(x) } + } else { + break accum.take().unwrap_or_default(); } - } + })) } } |