aboutsummaryrefslogtreecommitdiff
path: root/src/stream/futures_ordered.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/futures_ordered.rs')
-rw-r--r--src/stream/futures_ordered.rs26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index a30cbaa..6dc07ad 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -2,16 +2,18 @@ use crate::stream::{FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
+use pin_project::pin_project;
use core::cmp::Ordering;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::pin::Pin;
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
+#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
struct OrderWrapper<T> {
+ #[pin]
data: T, // A future or a future's output
index: usize,
}
@@ -37,21 +39,18 @@ impl<T> Ord for OrderWrapper<T> {
}
}
-impl<T> OrderWrapper<T> {
- unsafe_pinned!(data: T);
-}
-
impl<T> Future for OrderWrapper<T>
where T: Future
{
type Output = OrderWrapper<T::Output>;
fn poll(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- self.as_mut().data().as_mut().poll(cx)
- .map(|output| OrderWrapper { data: output, index: self.index })
+ let index = self.index;
+ self.project().data.poll(cx)
+ .map(|output| OrderWrapper { data: output, index })
}
}
@@ -203,3 +202,14 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
}
}
+
+impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
+ fn extend<I>(&mut self, iter: I)
+ where
+ I: IntoIterator<Item = Fut>,
+ {
+ for item in iter.into_iter() {
+ self.push(item);
+ }
+ }
+}