diff options
Diffstat (limited to 'src/stream/once.rs')
-rw-r--r-- | src/stream/once.rs | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/src/stream/once.rs b/src/stream/once.rs index 4f68b0c..21cd14b 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -2,7 +2,7 @@ use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::{pin_project, project}; /// Creates a stream of a single element. /// @@ -16,34 +16,39 @@ use pin_utils::unsafe_pinned; /// # }); /// ``` pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { - Once { future: Some(future) } + Once::new(future) } /// A stream which emits single element and then EOF. /// /// This stream will never block and is always ready. +#[pin_project] #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Once<Fut> { + #[pin] future: Option<Fut> } -impl<Fut: Unpin> Unpin for Once<Fut> {} - impl<Fut> Once<Fut> { - unsafe_pinned!(future: Option<Fut>); + pub(crate) fn new(future: Fut) -> Self { + Self { future: Some(future) } + } } impl<Fut: Future> Stream for Once<Fut> { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let v = match self.as_mut().future().as_pin_mut() { + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + #[project] + let Once { mut future } = self.project(); + let v = match future.as_mut().as_pin_mut() { Some(fut) => ready!(fut.poll(cx)), None => return Poll::Ready(None), }; - self.as_mut().future().set(None); + future.set(None); Poll::Ready(Some(v)) } |