use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// /// This function is the dual for the `Stream::fold()` adapter: while /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a /// `Stream` from a seed value. /// /// `unfold()` will call the provided closure with the provided seed, then wait /// for the returned `Future` to complete with `(a, b)`. It will then yield the /// value `a`, and use `b` as the next internal state. /// /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` /// will stop producing items and return `Poll::Ready(None)` in future /// calls to `poll()`. /// /// This function can typically be used when wanting to go from the "world of /// futures" to the "world of streams": the provided closure can build a /// `Future` using other library functions working on futures, and `unfold()` /// will turn it into a `Stream` by repeating the operation. /// /// # Example /// /// ``` /// # futures::executor::block_on(async { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::unfold(0, |state| async move { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; /// Some((yielded, next_state)) /// } else { /// None /// } /// }); /// /// let result = stream.collect::>().await; /// assert_eq!(result, vec![0, 2, 4]); /// # }); /// ``` pub fn unfold(init: T, f: F) -> Unfold where F: FnMut(T) -> Fut, Fut: Future>, { Unfold { f, state: Some(init), fut: None, } } /// Stream for the [`unfold`] function. #[must_use = "streams do nothing unless polled"] pub struct Unfold { f: F, state: Option, fut: Option, } impl Unpin for Unfold {} impl fmt::Debug for Unfold where T: fmt::Debug, Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Unfold") .field("state", &self.state) .field("fut", &self.fut) .finish() } } impl Unfold { unsafe_unpinned!(f: F); unsafe_unpinned!(state: Option); unsafe_pinned!(fut: Option); } impl FusedStream for Unfold where F: FnMut(T) -> Fut, Fut: Future>, { fn is_terminated(&self) -> bool { self.state.is_none() && self.fut.is_none() } } impl Stream for Unfold where F: FnMut(T) -> Fut, Fut: Future>, { type Item = Item; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { if let Some(state) = self.as_mut().state().take() { let fut = (self.as_mut().f())(state); self.as_mut().fut().set(Some(fut)); } let step = ready!(self.as_mut().fut().as_pin_mut() .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); self.as_mut().fut().set(None); if let Some((item, next_state)) = step { *self.as_mut().state() = Some(next_state); Poll::Ready(Some(item)) } else { Poll::Ready(None) } } }