diff options
Diffstat (limited to 'src/stream/try_stream/try_filter.rs')
-rw-r--r-- | src/stream/try_stream/try_filter.rs | 83 |
1 files changed, 23 insertions, 60 deletions
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 24a9c32..310f991 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -5,24 +5,23 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) /// method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryFilter<St, Fut, F> where St: TryStream { + #[pin] stream: St, f: F, + #[pin] pending_fut: Option<Fut>, pending_item: Option<St::Ok>, } -impl<St, Fut, F> Unpin for TryFilter<St, Fut, F> - where St: TryStream + Unpin, Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F> where St: TryStream + fmt::Debug, @@ -41,11 +40,6 @@ where impl<St, Fut, F> TryFilter<St, Fut, F> where St: TryStream { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - unsafe_pinned!(pending_fut: Option<Fut>); - unsafe_unpinned!(pending_item: Option<St::Ok>); - pub(super) fn new(stream: St, f: F) -> Self { TryFilter { stream, @@ -55,37 +49,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F> } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F> @@ -105,29 +69,28 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F> { type Item = Result<St::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<St::Ok, St::Error>>> { - loop { - if self.pending_fut.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - Some(x) => x, - None => return Poll::Ready(None), - }; - let fut = (self.as_mut().f())(&item); - self.as_mut().pending_fut().set(Some(fut)); - *self.as_mut().pending_item() = Some(item); - } - - let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); - self.as_mut().pending_fut().set(None); - let item = self.as_mut().pending_item().take().unwrap(); - - if yield_item { - return Poll::Ready(Some(Ok(item))); + #[project] + let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project(); + Poll::Ready(loop { + if let Some(fut) = pending_fut.as_mut().as_pin_mut() { + let res = ready!(fut.poll(cx)); + pending_fut.set(None); + if res { + break pending_item.take().map(Ok); + } + *pending_item = None; + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + pending_fut.set(Some(f(&item))); + *pending_item = Some(item); + } else { + break None; } - } + }) } fn size_hint(&self) -> (usize, Option<usize>) { |