aboutsummaryrefslogtreecommitdiff
path: root/src/stream/unfold.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/unfold.rs')
-rw-r--r--src/stream/unfold.rs66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index b6f8eae..473bb67 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -1,9 +1,11 @@
+use crate::unfold_state::UnfoldState;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a `Stream` from a seed and a closure returning a `Future`.
///
@@ -45,24 +47,24 @@ use pin_project::pin_project;
/// # });
/// ```
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
- state: Some(init),
- fut: None,
+ state: UnfoldState::Value { value: init },
}
}
-/// Stream for the [`unfold`] function.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Unfold<T, F, Fut> {
- f: F,
- state: Option<T>,
- #[pin]
- fut: Option<Fut>,
+pin_project! {
+ /// Stream for the [`unfold`] function.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Unfold<T, F, Fut> {
+ f: F,
+ #[pin]
+ state: UnfoldState<T, Fut>,
+ }
}
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
@@ -73,44 +75,50 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
- .field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
fn is_terminated(&self) -> bool {
- self.state.is_none() && self.fut.is_none()
+ if let UnfoldState::Empty = self.state {
+ true
+ } else {
+ false
+ }
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
- if let Some(state) = this.state.take() {
- this.fut.set(Some((this.f)(state)));
+ if let Some(state) = this.state.as_mut().take_value() {
+ this.state.set(UnfoldState::Future {
+ future: (this.f)(state),
+ });
}
- let step = ready!(this.fut.as_mut().as_pin_mut()
- .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
- this.fut.set(None);
+ let step = match this.state.as_mut().project_future() {
+ Some(fut) => ready!(fut.poll(cx)),
+ None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"),
+ };
if let Some((item, next_state)) = step {
- *this.state = Some(next_state);
+ this.state.set(UnfoldState::Value { value: next_state });
Poll::Ready(Some(item))
} else {
+ this.state.set(UnfoldState::Empty);
Poll::Ready(None)
}
}