diff options
Diffstat (limited to 'src/stream/try_stream/try_unfold.rs')
-rw-r--r-- | src/stream/try_stream/try_unfold.rs | 33 |
1 files changed, 15 insertions, 18 deletions
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index 6266274..8da1248 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. /// @@ -67,15 +67,15 @@ where } /// Stream for the [`try_unfold`] function. +#[pin_project] #[must_use = "streams do nothing unless polled"] pub struct TryUnfold<T, F, Fut> { f: F, state: Option<T>, + #[pin] fut: Option<Fut>, } -impl<T, F, Fut: Unpin> Unpin for TryUnfold<T, F, Fut> {} - impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> where T: fmt::Debug, @@ -89,12 +89,6 @@ where } } -impl<T, F, Fut> TryUnfold<T, F, Fut> { - unsafe_unpinned!(f: F); - unsafe_unpinned!(state: Option<T>); - unsafe_pinned!(fut: Option<Fut>); -} - impl<T, F, Fut, Item> Stream for TryUnfold<T, F, Fut> where F: FnMut(T) -> Fut, @@ -102,27 +96,30 @@ where { type Item = Result<Item, Fut::Error>; + #[project] fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Item, Fut::Error>>> { - if let Some(state) = self.as_mut().state().take() { - let fut = (self.as_mut().f())(state); - self.as_mut().fut().set(Some(fut)); + #[project] + let TryUnfold {f, state, mut fut } = self.project(); + + if let Some(state) = state.take() { + fut.set(Some(f(state))); } - match self.as_mut().fut().as_pin_mut() { + match fut.as_mut().as_pin_mut() { None => { // The future previously errored Poll::Ready(None) } - Some(fut) => { - let step = ready!(fut.try_poll(cx)); - self.as_mut().fut().set(None); + Some(future) => { + let step = ready!(future.try_poll(cx)); + fut.set(None); match step { Ok(Some((item, next_state))) => { - *self.as_mut().state() = Some(next_state); + *state = Some(next_state); Poll::Ready(Some(Ok(item))) } Ok(None) => Poll::Ready(None), |