diff options
Diffstat (limited to 'src/async_stream.rs')
-rw-r--r-- | src/async_stream.rs | 52 |
1 files changed, 27 insertions, 25 deletions
diff --git a/src/async_stream.rs b/src/async_stream.rs index f60c87e..ff408ab 100644 --- a/src/async_stream.rs +++ b/src/async_stream.rs @@ -1,16 +1,20 @@ use crate::yielder::Receiver; use futures_core::{FusedStream, Stream}; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -#[doc(hidden)] -#[derive(Debug)] -pub struct AsyncStream<T, U> { - rx: Receiver<T>, - done: bool, - generator: U, +pin_project! { + #[doc(hidden)] + #[derive(Debug)] + pub struct AsyncStream<T, U> { + rx: Receiver<T>, + done: bool, + #[pin] + generator: U, + } } impl<T, U> AsyncStream<T, U> { @@ -40,30 +44,28 @@ where type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - unsafe { - let me = Pin::get_unchecked_mut(self); + let me = self.project(); - if me.done { - return Poll::Ready(None); - } + if *me.done { + return Poll::Ready(None); + } - let mut dst = None; - let res = { - let _enter = me.rx.enter(&mut dst); - Pin::new_unchecked(&mut me.generator).poll(cx) - }; + let mut dst = None; + let res = { + let _enter = me.rx.enter(&mut dst); + me.generator.poll(cx) + }; - me.done = res.is_ready(); + *me.done = res.is_ready(); - if dst.is_some() { - return Poll::Ready(dst.take()); - } + if dst.is_some() { + return Poll::Ready(dst.take()); + } - if me.done { - Poll::Ready(None) - } else { - Poll::Pending - } + if *me.done { + Poll::Ready(None) + } else { + Poll::Pending } } |