use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_project::{pin_project, project}; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// /// This function is the dual for the `Stream::fold()` adapter: while /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a /// `Stream` from a seed value. /// /// `unfold()` will call the provided closure with the provided seed, then wait /// for the returned `Future` to complete with `(a, b)`. It will then yield the /// value `a`, and use `b` as the next internal state. /// /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` /// will stop producing items and return `Poll::Ready(None)` in future /// calls to `poll()`. /// /// This function can typically be used when wanting to go from the "world of /// futures" to the "world of streams": the provided closure can build a /// `Future` using other library functions working on futures, and `unfold()` /// will turn it into a `Stream` by repeating the operation. /// /// # Example /// /// ``` /// # futures::executor::block_on(async { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::unfold(0, |state| async move { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; /// Some((yielded, next_state)) /// } else { /// None /// } /// }); /// /// let result = stream.collect::>().await; /// assert_eq!(result, vec![0, 2, 4]); /// # }); /// ``` pub fn unfold(init: T, f: F) -> Unfold where F: FnMut(T) -> Fut, Fut: Future>, { Unfold { f, state: Some(init), fut: None, } } /// Stream for the [`unfold`] function. #[pin_project] #[must_use = "streams do nothing unless polled"] pub struct Unfold { f: F, state: Option, #[pin] fut: Option, } impl fmt::Debug for Unfold where T: fmt::Debug, Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Unfold") .field("state", &self.state) .field("fut", &self.fut) .finish() } } impl FusedStream for Unfold where F: FnMut(T) -> Fut, Fut: Future>, { fn is_terminated(&self) -> bool { self.state.is_none() && self.fut.is_none() } } impl Stream for Unfold where F: FnMut(T) -> Fut, Fut: Future>, { type Item = Item; #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { #[project] let Unfold { state, f, mut fut } = self.project(); if let Some(state) = state.take() { fut.set(Some(f(state))); } let step = ready!(fut.as_mut().as_pin_mut() .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); fut.set(None); if let Some((item, next_state)) = step { *state = Some(next_state); Poll::Ready(Some(item)) } else { Poll::Ready(None) } } }