aboutsummaryrefslogtreecommitdiff
path: root/src/stream/once.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/once.rs')
-rw-r--r--src/stream/once.rs21
1 files changed, 13 insertions, 8 deletions
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 4f68b0c..21cd14b 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -2,7 +2,7 @@ use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
+use pin_project::{pin_project, project};
/// Creates a stream of a single element.
///
@@ -16,34 +16,39 @@ use pin_utils::unsafe_pinned;
/// # });
/// ```
pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
- Once { future: Some(future) }
+ Once::new(future)
}
/// A stream which emits single element and then EOF.
///
/// This stream will never block and is always ready.
+#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Once<Fut> {
+ #[pin]
future: Option<Fut>
}
-impl<Fut: Unpin> Unpin for Once<Fut> {}
-
impl<Fut> Once<Fut> {
- unsafe_pinned!(future: Option<Fut>);
+ pub(crate) fn new(future: Fut) -> Self {
+ Self { future: Some(future) }
+ }
}
impl<Fut: Future> Stream for Once<Fut> {
type Item = Fut::Output;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let v = match self.as_mut().future().as_pin_mut() {
+ #[project]
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ #[project]
+ let Once { mut future } = self.project();
+ let v = match future.as_mut().as_pin_mut() {
Some(fut) => ready!(fut.poll(cx)),
None => return Poll::Ready(None),
};
- self.as_mut().future().set(None);
+ future.set(None);
Poll::Ready(Some(v))
}