diff options
Diffstat (limited to 'src/stream/try_stream/and_then.rs')
-rw-r--r-- | src/stream/try_stream/and_then.rs | 74 |
1 files changed, 21 insertions, 53 deletions
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index 809c32a..563ed34 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -5,18 +5,19 @@ use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct AndThen<St, Fut, F> { + #[pin] stream: St, + #[pin] future: Option<Fut>, f: F, } -impl<St: Unpin, Fut: Unpin, F> Unpin for AndThen<St, Fut, F> {} - impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F> where St: fmt::Debug, @@ -30,12 +31,6 @@ where } } -impl<St, Fut, F> AndThen<St, Fut, F> { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option<Fut>); - unsafe_unpinned!(f: F); -} - impl<St, Fut, F> AndThen<St, Fut, F> where St: TryStream, F: FnMut(St::Ok) -> Fut, @@ -45,37 +40,7 @@ impl<St, Fut, F> AndThen<St, Fut, F> Self { stream, future: None, f } } - /// Acquires a reference to the underlying stream that this combinator is - /// pulling from. - pub fn get_ref(&self) -> &St { - &self.stream - } - - /// Acquires a mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_mut(&mut self) -> &mut St { - &mut self.stream - } - - /// Acquires a pinned mutable reference to the underlying stream that this - /// combinator is pulling from. - /// - /// Note that care must be taken to avoid tampering with the state of the - /// stream which may otherwise confuse this combinator. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { - self.stream() - } - - /// Consumes this combinator, returning the underlying stream. - /// - /// Note that this may discard intermediate state of this combinator, so - /// care should be taken to avoid losing resources when this is called. - pub fn into_inner(self) -> St { - self.stream - } + delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for AndThen<St, Fut, F> @@ -85,22 +50,25 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F> { type Item = Result<Fut::Ok, St::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.future.is_none() { - let item = match ready!(self.as_mut().stream().try_poll_next(cx)?) { - None => return Poll::Ready(None), - Some(e) => e, - }; - let fut = (self.as_mut().f())(item); - self.as_mut().future().set(Some(fut)); - } - - let e = ready!(self.as_mut().future().as_pin_mut().unwrap().try_poll(cx)); - self.as_mut().future().set(None); - Poll::Ready(Some(e)) + #[project] + let AndThen { mut stream, mut future, f } = self.project(); + + Poll::Ready(loop { + if let Some(fut) = future.as_mut().as_pin_mut() { + let item = ready!(fut.try_poll(cx)); + future.set(None); + break Some(item); + } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) { + future.set(Some(f(item))); + } else { + break None; + } + }) } fn size_hint(&self) -> (usize, Option<usize>) { |