aboutsummaryrefslogtreecommitdiff
path: root/src/async_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/async_stream.rs')
-rw-r--r--src/async_stream.rs52
1 files changed, 27 insertions, 25 deletions
diff --git a/src/async_stream.rs b/src/async_stream.rs
index f60c87e..ff408ab 100644
--- a/src/async_stream.rs
+++ b/src/async_stream.rs
@@ -1,16 +1,20 @@
use crate::yielder::Receiver;
use futures_core::{FusedStream, Stream};
+use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
-#[doc(hidden)]
-#[derive(Debug)]
-pub struct AsyncStream<T, U> {
- rx: Receiver<T>,
- done: bool,
- generator: U,
+pin_project! {
+ #[doc(hidden)]
+ #[derive(Debug)]
+ pub struct AsyncStream<T, U> {
+ rx: Receiver<T>,
+ done: bool,
+ #[pin]
+ generator: U,
+ }
}
impl<T, U> AsyncStream<T, U> {
@@ -40,30 +44,28 @@ where
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- unsafe {
- let me = Pin::get_unchecked_mut(self);
+ let me = self.project();
- if me.done {
- return Poll::Ready(None);
- }
+ if *me.done {
+ return Poll::Ready(None);
+ }
- let mut dst = None;
- let res = {
- let _enter = me.rx.enter(&mut dst);
- Pin::new_unchecked(&mut me.generator).poll(cx)
- };
+ let mut dst = None;
+ let res = {
+ let _enter = me.rx.enter(&mut dst);
+ me.generator.poll(cx)
+ };
- me.done = res.is_ready();
+ *me.done = res.is_ready();
- if dst.is_some() {
- return Poll::Ready(dst.take());
- }
+ if dst.is_some() {
+ return Poll::Ready(dst.take());
+ }
- if me.done {
- Poll::Ready(None)
- } else {
- Poll::Pending
- }
+ if *me.done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
}
}