diff options
Diffstat (limited to 'src/stream/unfold.rs')
-rw-r--r-- | src/stream/unfold.rs | 66 |
1 files changed, 37 insertions, 29 deletions
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index b6f8eae..473bb67 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -1,9 +1,11 @@ +use crate::unfold_state::UnfoldState; use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// @@ -45,24 +47,24 @@ use pin_project::pin_project; /// # }); /// ``` pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { Unfold { f, - state: Some(init), - fut: None, + state: UnfoldState::Value { value: init }, } } -/// Stream for the [`unfold`] function. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Unfold<T, F, Fut> { - f: F, - state: Option<T>, - #[pin] - fut: Option<Fut>, +pin_project! { + /// Stream for the [`unfold`] function. + #[must_use = "streams do nothing unless polled"] + pub struct Unfold<T, F, Fut> { + f: F, + #[pin] + state: UnfoldState<T, Fut>, + } } impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> @@ -73,44 +75,50 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Unfold") .field("state", &self.state) - .field("fut", &self.fut) .finish() } } impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { fn is_terminated(&self) -> bool { - self.state.is_none() && self.fut.is_none() + if let UnfoldState::Empty = self.state { + true + } else { + false + } } } impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { type Item = Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); - if let Some(state) = this.state.take() { - this.fut.set(Some((this.f)(state))); + if let Some(state) = this.state.as_mut().take_value() { + this.state.set(UnfoldState::Future { + future: (this.f)(state), + }); } - let step = ready!(this.fut.as_mut().as_pin_mut() - .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); - this.fut.set(None); + let step = match this.state.as_mut().project_future() { + Some(fut) => ready!(fut.poll(cx)), + None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"), + }; if let Some((item, next_state)) = step { - *this.state = Some(next_state); + this.state.set(UnfoldState::Value { value: next_state }); Poll::Ready(Some(item)) } else { + this.state.set(UnfoldState::Empty); Poll::Ready(None) } } |