diff options
Diffstat (limited to 'src/stream/try_stream/try_filter_map.rs')
-rw-r--r-- | src/stream/try_stream/try_filter_map.rs | 80 |
1 files changed, 24 insertions, 56 deletions
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index ed7eeb2..ba8e43a 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -5,21 +5,20 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilterMap<St, Fut, F> { + #[pin] stream: St, f: F, + #[pin] pending: Option<Fut>, } -impl<St, Fut, F> Unpin for TryFilterMap<St, Fut, F> - where St: Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F> where St: fmt::Debug, @@ -34,45 +33,11 @@ where } impl<St, Fut, F> TryFilterMap<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending: Option<Fut>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilterMap { stream, f, pending: None } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F> @@ -92,26 +57,29 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> { type Item = Result<T, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<T, St::Error>>> { - loop { - if self.pending.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(item); - self.as_mut().pending().set(Some(fut)); - } - - let result = ready!(self.as_mut().pending().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().pending().set(None); - if let Some(x) = result? { - return Poll::Ready(Some(Ok(x))); + #[project] + let TryFilterMap { mut stream, f, mut pending } = self.project(); + Poll::Ready(loop { + if let Some(p) = pending.as_mut().as_pin_mut() { + // We have an item in progress, poll that until it's done + let item = ready!(p.try_poll(cx)?); + pending.set(None); + if item.is_some() { + break item.map(Ok); + } + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + // No item in progress, but the stream is still going + pending.set(Some(f(item))); + } else { + // The stream is done + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { |