diff options
Diffstat (limited to 'src/stream/try_stream/try_fold.rs')
-rw-r--r-- | src/stream/try_stream/try_fold.rs | 70 |
1 files changed, 27 insertions, 43 deletions
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index b8b8dc2..d85c1fe 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -3,19 +3,20 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryFold<St, Fut, T, F> { + #[pin] stream: St, f: F, accum: Option<T>, + #[pin] future: Option<Fut>, } -impl<St: Unpin, Fut: Unpin, T, F> Unpin for TryFold<St, Fut, T, F> {} - impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> where St: fmt::Debug, @@ -36,11 +37,6 @@ where St: TryStream, F: FnMut(T, St::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = St::Error>, { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_unpinned!(accum: Option<T>); - unsafe_pinned!(future: Option<Fut>); - pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> { TryFold { stream, @@ -68,43 +64,31 @@ impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> { type Output = Result<T, St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - // we're currently processing a future to produce a new accum value - if self.accum.is_none() { - let accum = match ready!( - self.as_mut().future().as_pin_mut() - .expect("TryFold polled after completion") - .try_poll(cx) - ) { - Ok(accum) => accum, - Err(e) => { - // Indicate that the future can no longer be polled. - self.as_mut().future().set(None); - return Poll::Ready(Err(e)); - } - }; - *self.as_mut().accum() = Some(accum); - self.as_mut().future().set(None); - } - - let item = match ready!(self.as_mut().stream().try_poll_next(cx)) { - Some(Ok(item)) => Some(item), - Some(Err(e)) => { - // Indicate that the future can no longer be polled. - *self.as_mut().accum() = None; - return Poll::Ready(Err(e)); + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryFold { mut stream, f, accum, mut future } = self.project(); + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let res = ready!(fut.try_poll(cx)); + future.set(None); + match res { + Ok(a) => *accum = Some(a), + Err(e) => break Err(e), + } + } else if accum.is_some() { + // we're waiting on a new item from the stream + let res = ready!(stream.as_mut().try_poll_next(cx)); + let a = accum.take().unwrap(); + match res { + Some(Ok(item)) => future.set(Some(f(a, item))), + Some(Err(e)) => break Err(e), + None => break Ok(a), } - None => None, - }; - let accum = self.as_mut().accum().take().unwrap(); - - if let Some(e) = item { - let future = (self.as_mut().f())(accum, e); - self.as_mut().future().set(Some(future)); } else { - return Poll::Ready(Ok(accum)) + panic!("Fold polled after completion") } - } + }) } } |