aboutsummaryrefslogtreecommitdiff
path: root/src/async_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/async_stream.rs')
-rw-r--r--src/async_stream.rs69
1 files changed, 69 insertions, 0 deletions
diff --git a/src/async_stream.rs b/src/async_stream.rs
new file mode 100644
index 0000000..30115df
--- /dev/null
+++ b/src/async_stream.rs
@@ -0,0 +1,69 @@
+use crate::yielder::Receiver;
+
+use futures_core::{FusedStream, Stream};
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+#[doc(hidden)]
+#[derive(Debug)]
+pub struct AsyncStream<T, U> {
+ rx: Receiver<T>,
+ done: bool,
+ generator: U,
+}
+
+impl<T, U> AsyncStream<T, U> {
+ #[doc(hidden)]
+ pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
+ AsyncStream {
+ rx,
+ done: false,
+ generator,
+ }
+ }
+}
+
+impl<T, U> FusedStream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done
+ }
+}
+
+impl<T, U> Stream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ unsafe {
+ let me = Pin::get_unchecked_mut(self);
+
+ if me.done {
+ return Poll::Ready(None);
+ }
+
+ let mut dst = None;
+ let res = {
+ let _enter = me.rx.enter(&mut dst);
+ Pin::new_unchecked(&mut me.generator).poll(cx)
+ };
+
+ me.done = res.is_ready();
+
+ if dst.is_some() {
+ return Poll::Ready(dst.take());
+ }
+
+ if me.done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+ }
+}