diff options
Diffstat (limited to 'src/stream/futures_ordered.rs')
-rw-r--r-- | src/stream/futures_ordered.rs | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index a30cbaa..6dc07ad 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -2,16 +2,18 @@ use crate::stream::{FuturesUnordered, StreamExt}; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::unsafe_pinned; +use pin_project::pin_project; use core::cmp::Ordering; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::pin::Pin; use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] struct OrderWrapper<T> { + #[pin] data: T, // A future or a future's output index: usize, } @@ -37,21 +39,18 @@ impl<T> Ord for OrderWrapper<T> { } } -impl<T> OrderWrapper<T> { - unsafe_pinned!(data: T); -} - impl<T> Future for OrderWrapper<T> where T: Future { type Output = OrderWrapper<T::Output>; fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - self.as_mut().data().as_mut().poll(cx) - .map(|output| OrderWrapper { data: output, index: self.index }) + let index = self.index; + self.project().data.poll(cx) + .map(|output| OrderWrapper { data: output, index }) } } @@ -203,3 +202,14 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) } } + +impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { + fn extend<I>(&mut self, iter: I) + where + I: IntoIterator<Item = Fut>, + { + for item in iter.into_iter() { + self.push(item); + } + } +} |