diff options
Diffstat (limited to 'src/stream/stream/for_each_concurrent.rs')
-rw-r--r-- | src/stream/stream/for_each_concurrent.rs | 34 |
1 files changed, 13 insertions, 21 deletions
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index 18ca4bd..88ff2d3 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -5,23 +5,20 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) /// method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct ForEachConcurrent<St, Fut, F> { + #[pin] stream: Option<St>, f: F, futures: FuturesUnordered<Fut>, limit: Option<NonZeroUsize>, } -impl<St, Fut, F> Unpin for ForEachConcurrent<St, Fut, F> -where St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> where St: fmt::Debug, @@ -41,11 +38,6 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - unsafe_pinned!(stream: Option<St>); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered<Fut>); - unsafe_unpinned!(limit: Option<NonZeroUsize>); - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> { ForEachConcurrent { stream: Some(stream), @@ -74,16 +66,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + #[project] + let ForEachConcurrent { mut stream, f, futures, limit } = self.project(); loop { let mut made_progress_this_iter = false; - // Try and pull an item from the stream - let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` - if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { + if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { let mut stream_completed = false; - let elem = if let Some(stream) = self.as_mut().stream().as_pin_mut() { + let elem = if let Some(stream) = stream.as_mut().as_pin_mut() { match stream.poll_next(cx) { Poll::Ready(Some(elem)) => { made_progress_this_iter = true; @@ -99,18 +92,17 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> None }; if stream_completed { - self.as_mut().stream().set(None); + stream.set(None); } if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + futures.push(f(elem)); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match futures.poll_next_unpin(cx) { Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { - if self.stream.is_none() { + if stream.is_none() { return Poll::Ready(()) } }, |